Friday, March 23, 2012

mySQL Shards, Gearman, rabbitMQ, NodeJS, JSONP Push for Fault Tollerant Realtime Notification Feed

Over the past few weeks, I have been tweaking my feed system to be truly real-time. The hypothesis is, if end users are pushed content without forcing them to refresh a page they will interact on the content-which in turn turns a consumer into a producer of content causing virality within subnetworks of the Graph. The Graph is all the connections on the site while subnetworks are personal connections i.e. your friends, their friends etc. Additionally to note there are many more consumers then producers.

My hypothesis was wrong. Or is it?

I couldn't accept the conclusions drawn from the usage data. It just makes sense that making something easier for a user should increase the chances for interaction. Thus I went digging and found a problem with my implementation of rabbitMQ. Before I go into the problem let's talk about the set up.

A user is connected to the wwws through a load balancer. The www servers serve dynamic content to the end user. When a user produces content, that content is committed to a sharded database, based on their entityId. EntityIds are globally unique and there is a lookup table all in memory that says entityId belongs to shard X-and shard X is available for this user. After the data is committed a message is sent to Gearman, where Gearman workers will pump messages into RabbitMQ.

Now you may be asking yourself, why in the hell are you writing to a queue to write to another queue? (I too do not like this) Sometimes if to many messages are produced for rabbitMQ, rabbitMQ will block the producer. If the producer was the commit process (WWW Tier) the end user would have to wait until rabbitMQ unblocks to see the response and/or all the threads will block crashing the system and producing downtime. Gearman removes this problem.

The same user is also long polling a Node Server on a different subdomain (which requires a JSONP Long polling method). This server sits in front of rabbitMQ. NodeJS holds a persistent connection to rabbitMQ, keeps a single queue subscribe per user online, and missed messages when the user is not connected are resent if the user re-connects to Node. If the user does not reconnect then Node will remove the user from the list and destroy their queue. Only messages are sent to online users from PHP by asking node is this user online.

The end result to a user is if a person who is online comments, likes, shares their content the feed should be updated for all the people who can see the event and are also online, the content owner should be notified and it should fade in nicely.

In dev this works great, in production I would notice weird times that messages would be missed, causing me to refresh the page. This was the problem; I was missing messages. So what is going on? To understand the issue I use a method which I rule out the known and all that is left is the unknown. Most of the time assumptions are the cause of the problem.

Tracing the flow, DB commits are 100% correct. Publishing return codes show 100% correct behavior based on the return code, yet rabbitMQ's rabbitmqadmin.py process was not displaying what the code was indicating.

The flow is as follows
#!/bin/bash



#

# declare the exchange as in-memory only and do not delete

#

./rabbitmqadmin.py declare exchange name=test_exchange type=direct durable=false auto_delete=false



#

# list exchanges

#

./rabbitmqadmin.py list exchanges



#

# declare a queue with the same settings as the exchange

#

./rabbitmqadmin.py declare queue name=test_queue auto_delete=false durable=false



#

# list the queue

# 

./rabbitmqadmin.py list queues



#

# established a route from the exchange to the queue by binding

# 

./rabbitmqadmin.py declare binding source=test_exchange destination_type=queue destination=test_queue routing_key=myroute



#

# list the route

#

./rabbitmqadmin.py list bindings



#

# publish the messages 3 should be waiting for consumption

# 

./rabbitmqadmin.py publish exchange=test_exchange routing_key=myroute payload="this is a messages"

./rabbitmqadmin.py publish exchange=test_exchange routing_key=myroute payload="this is a messages"

./rabbitmqadmin.py publish exchange=test_exchange routing_key=myroute payload="this is a messages"

sleep 5;



#

# show the queue message numbers

#

./rabbitmqadmin.py list queues 



This is basically the flow that my PHP layer  uses. This shell script is my control case. It works and does exactly what I want. If a consumer is not present the message should queue. I found that PHP does not due this or even send the message at times.

 $ret = $this->channel->publish(json_encode($data), $this->routingKey, AMQP_IMMEDIATE, array('Content-type' => 'text/json', 'delivery_mode' => 1, 'expiration' => self::DEFAULT_MESSAGE_TIMEOUT));



The AMQP Pecl lib is far from mature. First expiration does not work from my tests. Second AMQP_IMMEDIATE causes memory leaks and inconsistent results. This was found out after turning off all the switches to duplicate my control, then searching google about AMQP_IMMEDIATE problems.

Below shows this Fix. This stat that I am monitoring indicates people interacting. Notifications are being shown to the end user automatically if the user is online.




Still something was not correct.  Good thing I have ganglia. Take a look at these stats. Notice that messages unack.  It's huge. This means to me that messages are being missed and not being sent to users.



NodeJS is an event based system. Things happen asynchronously which state bugs are a plenty under load. I found such a state bug which caused an "object" from being initialized before the connection was ready.  Once fixed see the smooth growth in the first image (Node Fix).


Now for some stats. One Linux-AMI Amazon EC2-west c1.xlarge runs RabbitMQ, and NodeJS.  A peak of 9876 users where connected to Node, sending thousands of messages a second, with a 5 min load average of 1.62 where rabbit uses the most CPU/Memory and node uses very little CPU and memory. All and all the shards are handling 10s of thousand of writes a second, gearman is handling 10s of thousands of job insertions of second. Rabbit is handling thousands of messages a second, and for the end user within 5-30 seconds all their friends see their activity.

In conclusion is my hypothesis correct? Time will tell especially now that everything is working smoothly.

PS: Things I like to fix are message ordering and getting rid of Gearman as a layer.

No comments: