Wednesday, February 01, 2012

Basic ETL with Gearman and MySQL in a few lines of PHP code

Gearman is awesome. If you do not know what it is, its a queue and load balancing system for an arbitrary number of workers which enables distributed computing across many nodes. Some of the same guys who worked on mySQL source worked on Gearman.

Feel free to search my blog on other gearman uses.

The Problem:

We store a lot of stats, make a lot of changes and we want to see the result of the stats in realtime. Our stat system is pretty slick. For each tag increment the application increments a count and group said tag by minute, hour, month with a hash tag numeric representation of the text for compact writes. This means 1 tag write produces 4 SQL statements. We track over 239211 distinct tags at around 10K Writes per second on a single mySQL instance on EC2 in a RAID-10 EBS xtra-large Config (although because EBS mirrors internally I can just raid-0 but I was too scared).

Once the mySQL instance hits disk (EBS) our throughput becomes very unstable, possibly slowing down the site. The solution was to defer these writes but how can I do it without building a logging system, aggregator, loader and having a bunch of moving parts? Really I want to only spend 10 mins on this problem and use existing monitoring code.  So my 10 min solution:

 3 mins to write the code
 6 mins testing
 55 seconds of patting myself on back
 5 seconds to deploy

Solution Detail:

Since Gearman workers connect to the GearmanD server's Job QUEUE and loop for more Jobs. This means program stays in memory for the length of the process (until worker restart). This means I can transform the data in application memory. Since the program is persistently connected to the DB that means I can periodically load the data in chunks.

In stead of having 100s of possible concurrent connections doing writes I can control the writes based on the number of workers. Innodb is very fast and consistent at low levels of concurrency (less then 50).

Since I can drain the queue from GearmanD and transform the data locally I do not really need to worry about running out of memory on the queue server. The consumer is faster then the producer. I can combine 1000s of writes into a single write.

Let's look at some code:
<?
    require_once("includes/config.php");
    require_once("includes/DB/EventTrackerDB.php");


    class EventTrackerETL {
       
        //
        // keep track of distinct tags
        //
        public static $eventTable  = array();

        //
        // the next flush
        //
        public static $nextWrite   = 0;

        //
        // keep a stat of total writes
        //
        public static $totalEvents = 0;

        //
        // total number of events
        //
        const MAX_NUMBER_OF_EVENTS = 30000;

        //
        // number of seconds to pause
        //
        const FLUSH_INTERVAL = 20; // seconds

        /*
         * transform all the tags into a sum of the counts entered
         *  @params string $event    - the tag being incremented
         *  @params int $count       - the supplied count many times its just 1
         *  @params int $timeEntered - EPOC timestamp
         *  @retuns void
         */
        public static function transform($event, $count, $timeEntered) {

            //
            // initialize or increment a tag
            //
            if (isset(self::$eventTable[$event])){
                self::$eventTable[$event] += $count;
            } else {
                self::$eventTable[$event] = $count;
            }
            
            //
            // flush if we hit the max number of events
            //
            if (sizeof(self::$eventTable) > self::MAX_NUMBER_OF_EVENTS){
                return self::load();
            }

            //
            // flush if its our time
            //
            if (self::$nextWrite < time()){
                return self::load();
            }
            return;
        }

        /*
         * flush the stored tags to the database
         *  @returns void
         */
        protected static function load() {

            //
            // write transformed events to the db
            //
            $thisRun = 0;
            foreach(self::$eventTable as $event => $sum) {
                EventTrackerDB::singleton()->updateEvent($event, $sum);
                $thisRun++;
                self::$totalEvents++;
            }

            $msg = "EventTracker write complete $thisRun events this run and a total of " . self::$totalEvents . " events written so far";
            Debugger::log("OT", $msg);

            //
            // re-init
            //
            self::$nextWrite = time() + self::FLUSH_INTERVAL;
            self::$eventTable = array();
        }
    }
?>

In summary with gearman I am able to process 250K events in seconds. The queue never builds up and there is special code to handle kills (not SIGKILL).

No comments: