Tuesday, June 14, 2011

Building a realtime Feed with RabbitMQ Node.js AMQP and mySQL

Being a backend programmer, I rarely code in JavaScript. Today I code a lot of cross browser JavaScript with the fantastic lib jquery and its various extensions. Front end programming is hard to get correct since IE sucks but that's another Blog post that is out of scope. I certainly have more of an appreciation for the people that focus in this area. I can finally say I am really well rounded in doing CSS, JavaScript, Server tuning, DBA, PHP, C etc since I work the entire stack. Now for the setup and disclaimer. This post is not meant to provide a How-To but to explain the approach. If you would like a How To let me know on what.


With Node.js the gap between the discipline of Front-End programming and Backend Programming is narrowing. To code in Node you need to know JavaScript. Node is great at handing off data and pumping data to the requester in an event based model that is fantastically fast and small. I love node, especially since node has access to the browser cookie.

My use of node is very simple.

Browser makes a long polling jsonp connection (json with padding basically the server can call code loaded on your browser) to Node.js.

Node.js will send back any data that is waiting for the client, or is sent to the client during the connection lifetime, which recycles every 50 seconds. Node source of data is from RabbitMQ.

RabbitMQ an erlang server which talks the AMQP protocol (shared by many message queue servers) is a message queue that can be made into a distributed fault tolerant system and hands off the message payloads to Node.js which sends the message to the users who are online and subscribe to an exchange. The source of data for RabbitMQ comes from the database commit in Apache application space (PHP).

MySQL is the persistent store that handles page reload or init requests, while the previous two components enable the real-time feed.

Memcache holds a persons friend list.


Notice Flash is not used since I wanted a pure JavaScript version and reduce the amount of download needed to service the request. Additionally web sockets are not finalized yet and does not work across all browsers while long polling do, Socket.io is cool but requires flash as well.


So the ascii diagram shows this flow:

Browser --> Apache+PHP --> DB
if DB commit ok
Apache+PHP-->RabbitMQ-->Node.js-->Friends of Browser

Since Node is a gateway lets focus more on RabbitMQ which handles the logic. I'm using RabbitMQ in a pub-sub model. To understand Rabbit knowing its vocabulary is important.

Producer: Produces the message
Exchange: The bridge between Producer and the Queue(s). Note the producer has no idea what the queues are in concept, all it needs to know is the exchange will take the message from the producer and send it to the correct queue(s).
Queue: On disk queue or in memory (distributed) queue based on durability settings holds the message from Producers
Consumer: Node.js it reads all events that Node subscribed to and hands it off to the connected user.


I wrote a benchmark in node to test the capability of this setup and its quite impressive: let's take a quick look:

require("./amqp.connection");

 connection.addListener('ready', function() {
   puts("connected to " + connection.serverProperties.product);
   var e = connection.exchange('[userid]_feedExchange', {type: 'fanout'});
   e.on('exchangeDeclareOk', function(data){
       e.publish('routingKeyWhichIgnoredBecauseOfFanout', {message: "This is the message payload"});

       setTimeout(function () {
          // wait one second to receive the message, then quit
          connection.end();
       }, 1000);
});

The require (line 1) sets up the connection: global.connection = amqp.createConnection(options);
Line 3 sets up a listener which fires off code when the connection is established.
Line 5 declares the exchange. If it doesn't exist create it, else keep going. Notice that the exchange is dynamically created from the userId, so queue binds can happen against this exchange
Line 6 listens for another event exchangeDeclareOK, if OK send the message which is on line 7
Line 9 sets a timeout to cleanly close the connection else you will get

net.js:392
    throw new Error('Socket is not writable');
          ^
Error: Socket is not writable
    at Connection._writeOut (net.js:392:11)
    at Connection.write (net.js:378:17)
    at Connection._sendMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:1011:8)
    at Object.cb (/usr/local/node/lib/node_modules/amqp/amqp.js:1799:21)
    at Exchange._tasksFlush (/usr/local/node/lib/node_modules/amqp/amqp.js:1306:10)
    at Exchange._onMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:1772:8)
    at Exchange._onChannelMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:1338:14)
    at Connection._onMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:900:28)
    at AMQPParser.onMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:807:12)
    at AMQPParser._parseMethodFrame (/usr/local/node/lib/node_modules/amqp/amqp.js:454:10)

shell returned 1


Since this is a fanout approach, RabbitMQ is sending the message to each "User" that subscribed to the exchange.

Here is an example of a consumer in node:
#!/bin/env node

require("./amqp.connection");
require("./utilities");


connection.addListener('ready', function () {
        puts("connected to " + connection.serverProperties.product);
        var e = {};
        var q = {};
        var qNames = {};
        e = connection.exchange('[userid]_feedExchange', {type: 'fanout'});
        for(var i = 0; i < 1000; i++){
            var queueName = "_" + 100223 + i;
            q[queueName] = connection.queue(queueName, {autoDelete: true});
            var f = function(){
                var k=i; //variables are scoped by function and not brace so to get a copy of i we need to send the k at the time k is declared for the local lambda
                q[queueName].on('queueDeclareOk', function (args) {
                    q[args.queue].bind(e, "*");
                    puts(k+': queue opened: ' + args.queue + ' Message Count:'+ args.messageCount + " ConsumerCount: " + args.consumerCount);
                    q[args.queue].subscribe(function(json) {
                        console.log(k + ": " + json.message);
                    });
                });
            }();
        }
});

This consumer binds 1000 queues starting from 100223 to the [userid]_feedExchange in the producer section

On line 15 I create a hashmap of queues to access later when line 18 event queueDeclareOK is thrown.
Line 19 binds each queue to the exchange
Line 21 subscribes each queue to any message sent from the exchange (if we used routing we can listen to certain types of messages - another post).

Very simple, it is fast and works. On a single processor to send a message to 1000 consumers takes

real 0m0.301s
user 0m0.220s
sys 0m0.028s

using very little memory

==16049==
==16049== HEAP SUMMARY:
==16049== in use at exit: 1,275,122 bytes in 140 blocks
==16049== total heap usage: 3,798 allocs, 3,658 frees, 11,036,111 bytes allocated
==16049==
==16049== Searching for pointers to 140 not-freed blocks
==16049== Checked 505,492 bytes
==16049==
==16049== LEAK SUMMARY:
==16049== definitely lost: 1,600 bytes in 74 blocks
==16049== indirectly lost: 1,048,736 bytes in 31 blocks
==16049== possibly lost: 172,080 bytes in 3 blocks
==16049== still reachable: 52,706 bytes in 32 blocks
==16049== suppressed: 0 bytes in 0 blocks
==16049== Rerun with --leak-check=full to see details of leaked memory
==16049==
==16049== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 33 from 8)
--16049--
--16049-- used_suppression: 33 dl-hack3-cond-1
==16049==
==16049== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 33 from 8)

No comments: