Nodejs RabbitMQ for real

2 months ago

This article is a distributed crawler framework based on Node.js and RabbitMQ . It is suitable for crawling crawlers with a small amount of data but high concurrency.

The current requirement is that the user sends a request to our Api Server, and then our Api Server does not actually crawl the data. Instead, it puts the task in the queue, and then the real Crawler Server crawls the data. Finally, the data is returned to the Api Server in response to the user's request. So the architecture diagram looks like this:

1. Connect to the RabbitMQ service

scheduler/rabbit.js

/**
 * author: Shawn
 * time  : 2017/9/6 14:58
 * desc  :
 * update: Shawn 2017/9/6 14:58
 */


let amqp = require('amqplib/callback_api');


let mqConn;


/**
 * Create RabbitMQ connection
 */
function createMqConnection() {
    amqp.connect('amqp://localhost', function (err, conn) {
        if (err) {
            console.log('error --> ', err);
        } else {
            mqConn = conn;
            console.log('RabbitMQ connected');
        }
    });
}


/**
 * Get the RabbitMQ connection
 *
 * @returns {*}
 */
function getMqConnection() {
    return mqConn;
}


module.exports = {
    createMqConnection: createMqConnection,
    getMqConnection: getMqConnection,
};

2. Add tasks to the queue

Because the user will eventually return the data to the user after making the request, we need to use the RPC mode of RabbitMQ (a simple understanding is that it can receive the task and return the execution result of the task). The following is a direct weight:

/**
 * Add an Rpc task
 *
 * It is not recommended to process the data passed in by Api here, nor the data returned by Worker. 
 * The data is best processed at both ends of the architecture, which is handed to Api
 * And Feature processing, which guarantees the versatility of RabbitMQ, the data is only related to Api and Feature
 *
 * @param startingData
 * @param uuid
 * @param callback
 */
function newRpcTask(startingData, uuid, callback) {
    rabbit.getMqConnection().createChannel(function (err, ch) {
        if (err) return handleError(err);

        ch.assertQueue('', {exclusive: true}, function (err, q) {
            if (err) return handleError(err);

            let corr = uuid;
            console.log('Starting data: %s', startingData.toString());

            ch.consume(q.queue, function (msg) {
                if (msg.properties.correlationId === corr) {
                    console.log('Return data: %s', msg.content.toString());
                    // The data returned by Feature should not be processed and returned to Api for processing.
                    callback(msg.content.toString())
                }
            }, {noAck: true});

            ch.sendToQueue(FIBONACCI_QUEUE, Buffer.from(startingData.toString()), {correlationId: corr, replyTo: q.queue});
        });
    });
}

3. Worker takes the task from the queue and executes the return

Here I encapsulate a base class for BaseWorker. Its implementation class only needs to be rewritten doFeature()to implement the relevant business logic. Then call startRpcConsumer(), get the task from the queue and then call doFeature()it to complete the related operations. For specific usage, see fibonacciWorker.js in the project, which is a Fibonacci calculation logic.


let amqp = require('amqplib/callback_api');

class BaseWorker {
    constructor() {
        this.queueName = '';
    }


    /**
     * Consumer ordinary event
     */
    startConsumer() {
        amqp.connect('amqp://localhost', (err, conn) => {
            if (err) return BaseWorker.handleError(err);
            conn.createChannel((err, ch) => {
                if (err) return BaseWorker.handleError(err);

                ch.assertQueue(this.queueName, { durable: true });
                ch.prefetch(1); // The scheduler receives only one task at a time, and receives a new task after the oldTask ack

                console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", this.queueName);
                ch.consume(this.queueName, function (msg) {
                    var secs = msg.content.toString().length;

                    console.log(" [x] Received %s", msg.content.toString(), 'Start processing...');
                    setTimeout(function () {
                        console.log(" [x] Done");
                        // Tell RabbitMQ to delete the message after completing the task. 
                        //If the scheduler hangs, the message is automatically assigned to the other scheduler
                        ch.ack(msg);
                    }, secs * 1000);
                }, { noAck: false });
            });
        });
    }


    /**
     * Spend Rpc events and return results
     *
     * It is not recommended to process the data passed in by Api here, nor the data returned by Worker. 
     * The data is best processed at both ends of the architecture, which is handed to Api
     * And Feature processing, which guarantees the versatility of RabbitMQ, the data is only related to Api and Feature
     */
    startRpcConsumer() {
        amqp.connect('amqp://localhost', (err, conn) => {
            if (err) return BaseWorker.handleError(err);

            conn.createChannel((err, ch) => {
                if (err) return BaseWorker.handleError(err);

                ch.assertQueue(this.queueName, { durable: false });
                ch.prefetch(1);
                console.log(' [x] Awaiting RPC requests');
                ch.consume(this.queueName, (msg) => {
                    if (msg !== null) {
                        this.doFeature(msg, ch);
                    } else {
                        console.log('msg is null.');
                    }
                });
            });
        });
    }


    /**
     * Delete message after completing business
     *
     * @param msg
     * @param ch
     * @param result
     */
    ackMsg(msg, ch, result) {
        ch.sendToQueue(msg.properties.replyTo,
            Buffer.from(result.toString()),
            { correlationId: msg.properties.correlationId });

        ch.ack(msg);
    }


    /**
     * Error handling
     *
     * @param err
     */
    static handleError(err) {
        console.log('Error --> ', err);
    }


    /**
     * Business processing interface, the subclass can implement the interface
     *
     * @param msg
     * @param ch
     */
    doFeature(msg, ch) {
    }
}


module.exports = BaseWorker;

4. Server

When starting the server, you must first connect to RabbitMQ. Then in the Api, instead of directly manipulating the business logic, the request is sent to the RabbitMQ queue as a task. For example, in this case task.newRpcTask(), the user request generates a task and then adds it to the queue, and gets the final callback. The result is returned to the user.

let express = require('express');
let app = express();
let rabbit = require('../scheduler/rabbit');
let task = require('../scheduler/task');

/**
 * Create a RabbitMQ connection
 */
rabbit.createMqConnection();


/**
 * Ordinary mission Api
 */
app.get('/newTask', function (req, res) {
    let queryData = req.query;
    console.log('queryData = ', queryData);
    res.send('push new task');
    task.newTask(queryData.num)
});


/**
 * Rpc task Api
 */
app.get('/newRpcTask', function (req, res) {
    let queryData = req.query;
    console.log('queryData = ', queryData);

    // When passing data to the RabbitMQ layer, consider the format of the delivery. 
    //The RabbitMQ layer should not process the data and finally process it when it is handed over to the Feature.
    task.newRpcTask(queryData.num, generateUuid(), function (result) {
        res.send('fibonacci = ' + result);
    });
});

app.listen(3002);


function generateUuid() {
    return Math.random().toString() +
        Math.random().toString() +
        Math.random().toString();
}

Finally, the results of the demonstration

RabbitMQ only needs to be installed on the RabbitMQ server, and other devices can be connected via the amqp protocol.

  1. If your Rabbit Server does not need to be started, I have started it on Windows, not in the details.
  2. Start Api Server: node server.js(if using the PM2 startup command is pm2 start server.js)
  3. Start the worker on all Worker Servers: node fibonacciWorker.js(if using the PM2 startup command is pm2 start fibonacciWorker.js)

test

http://localhost:3002/newRpcTask?num=8