Friday, February 24, 2012

Asyncronous Shard Queries in PHP using mysqlnd enabling the feed load 10 times faster.

A few years ago I wrote about Asynchronous Shard Queries verses Synchronous Shard Queries, and in this post I talked about having to write a server to handle this for me in Java. Now I do it in PHP and got great results that are posted below.

Building the Feed was taking 100ms up to 40 seconds on the initial load, if the feed is out of cache. This is not acceptable for me as an engineer or the users I serve. Although the 40 seconds was rare it still is wrong.

The problem is as I added more shards to handle our data growth, the feed got proportionally slower. To build the feed for large users I would have to hit each server or at the very least a large percentage of them. Adding capacity made things slower in my Synchronous World.

My options to fix this issue where, do what I did in the past which, is boring, or try something new. In the past I wrote a server using Java with a Jetty Core that took a request and sent a merged response to get around php not having the ability to be parallel in a web context. It worked flawlessly but if that server went down users would not be able to see their info. Really its just another moving piece, another thing for me to monitor and at my company there is just 4 engineers to handle 3 million (and growing) DAU.

The new option was to use a mysqlnd option that enables asynchronous queries. Since I am running PHP 5.3.10 its an option that is available as a shared mod. and works with PDO except for this feature.

PDO is like Perl's DBI (but better) its an abstract that enables you to switch SQL database servers without having to change your method calls. The problem is PDO doesn't have the concept of asynchronous queries but this is really not an issue. I suggest for you to always write wrappers around used API's so you can switch the underlying (vendor) api without having to change multiple places in your code. I did such thing to PDO we call it Shard which is a wrapper around the PDO object. Then through polymorphism I am able to change query, connect, fetch equivalents to support this mysqlnd feature.

Below is the code

ShardDB::getAsyncShardInstance($pool)->setSide($userId)->query($query)->fetchAll();



ShardDB is a static class that has a method getAsyncShardInstance for a given shard pool. Really the construct design pattern is a singleton which returns an async shard object that extends shard. (I love objects things just work and is clean).

setSide is optional to say keep the query on the side which the calling user uses unless that side is down.

query connects to the servers if not connected and executes the sql.

fetchAll polls for the responses and is equivalent to PDO's fetchAll.

Here is the query code:


public function query($query, $args = array()) {

        foreach($this->connectionList as $shardId => $connection) {
            Debugger::timer('async_query');
            $connection->query($query, MYSQLI_ASYNC);
            Debugger::log("ASYNC_QUERY", $query, 'async_query');
        }
        return $this;
}


Then fetch all
public function fetchAll() {
        $connectionCount = count($this->connectionList);
        Debugger::timer('async_poll');
        $results = array();
        $processed = 0;
        do {

            $links = $errors = $reject = array();
            foreach ($this->connectionList as $i => $link) {
                $links[] = $errors[] = $reject[] = $link;
            }

            if (!mysqli_poll($links, $errors, $reject, $this->defaultTimeout)) {
                $spinned++;
                continue;
            }
            
            foreach ($links as $link) {
                if ($result = $link->reap_async_query()) {
                    while ($row = $result->fetch_object()){
                        $results[] = $row;
                    }
                    $result->free();
                    $processed++;
                }
            }
                
        } while($processed++ < $connectionCount);
        Debugger::log("ASYNC_QUERY", "Poll is done - Spinned $spinned times", 'async_poll');
        return $results;
} 


The affect of this code is illustrated below.




The blue line is greater than 2 seconds, which is bad then the line goes away the the change above. There is still some work needed to totally get rid of the greater than 2 second calls but what I got from this one change was worth the effort.

Thanks to mySQL and the internet super hero for building this feature. It was like it was built just for me!

Monday, February 06, 2012

SpyMemcached Transcoder with PHP PDO Objects using ZLIB

My technology stack services more then 2 Million Daily Active users.  Its very basic. PHP talks to mySQL, Memcache, RabbitMQ, Gearman and Facebook.  Now that we have more Java specifically to support our SmartFox Server and other services, blurring the lines between what data is set in PHP and what data is read in Java is very necessary.

Java J-Connect makes reading mySQL Data as simple IMHO as PHP's PDO. What is hard is reading PHP's serialized format from PHP's Memcache library.

In PHP there are two main C backed Libraries. There is Memcache the original PHP library which I happen to use, and Memcached which is the library I wanted to use but didn't deploy since EC2 package system conflicted and cause issues (I fixed them but to late to deploy). Memcache stores data in PHP's serialized format and compresses it via ZLIB, while Memcached can store data as PHP's serialized format, JSON, Binary Serialized (which is rather awesome), JSON Array Notation and has a multitude of compressing formats none of which are pure ZLIB that I noticed.

Here is the problem. Spymemcached is a lib for talking to memcache but can't unserialized PHP serialized format (or read it natively and return a string) and cannot Decompress ZLIB but can Decompress GZIP. Now a great speed up would be to use PHP's serialized data set from PHP and share memcache resources from PHP and Java just like what is done for the mySQL resources.

What needs to be done? Well, build your own Transcoder for Spymemcached. Fortunately Spymemcached documented an interface to do just that.

What is needed. Implement the spymemcached Interface defined here. Use org.lorecraft.phparser to unserialize PHP data  defined here. Return the Object.





 Below is the code.




package com.schoolfeed.spymemcached;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.InflaterInputStream;

import net.spy.memcached.CachedData;
import net.spy.memcached.compat.CloseUtil;
import net.spy.memcached.transcoders.BaseSerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import org.lorecraft.phparser.*;


public class PHPSerializedTranscoder extends BaseSerializingTranscoder implements Transcoder <Object> {
 
 static final int COMPRESSED=2;
 /**
  * Get a serializing transcoder with the default max data size.
  */
 public PHPSerializedTranscoder() {
  this(CachedData.MAX_SIZE);
 }

 /**
  * Get a serializing transcoder that specifies the max data size.
  */
 public PHPSerializedTranscoder(int max) {
  super(max);
 }
 
 /**
  * decode the byte data from Memcache decompress it if necessary and return the Object
  * @param CacheData - the byte data is turned into a object
  * @returns Object 
  */
 public Object decode(CachedData d){
  byte[] data=d.getData();
  
  Object rv=null;
  String ds="N;";
  
  if((d.getFlags() & COMPRESSED) != 0) {
   getLogger().debug("Looks like d is compressed");
   data=decompress(d.getData());
  }
  
  ds=decodeString(data);
   
  getLogger().debug("DECODED: [" + ds + "] about to SerializedPhpParser");
  
  SerializedPhpParser sp = new SerializedPhpParser(ds);
  
  try {
   rv = sp.parse();
   getLogger().debug("Parse was cool!!");
  } catch(Exception e){
   getLogger().debug("Not a PHP Object? : " +  ds);
   rv = ds;
  }
  
  return rv;
 }
 
 /**
  * PHP Memcache stores compress data in ZLIB format override the base class decompress method to handle ZLIB
  * 
  * @param byte array - raw data from Memcache
  * @returns byte array
  */
 protected byte[] decompress(byte[] in) {
  ByteArrayOutputStream bos=null;
  final int BUFFER = 2048;
  if(in != null) {
   ByteArrayInputStream bis=new ByteArrayInputStream(in);
   bos=new ByteArrayOutputStream();
   InflaterInputStream iis = null;
   try {
    iis = new InflaterInputStream(bis);

    byte[] buf=new byte[BUFFER];
    int r=-1;
    while((r=iis.read(buf, 0, BUFFER)) > 0) {
     bos.write(buf, 0, r);
    }
   } catch (IOException e) {
    getLogger().warn("Failed to decompress data", e);
    bos = null;
   } finally {
    CloseUtil.close(iis);
    CloseUtil.close(bis);
    CloseUtil.close(bos);
   }
  }
  
  return bos == null ? null : bos.toByteArray();
 }
 
 /**
  * encode -- not implemented yet
  *
  */
 public CachedData encode(Object o){
  int flags = 0;
  byte[] b=null;
  return new CachedData(flags, b, getMaxSize());
 }
 
 /**
  * no need to async Decode let's do it realtime
  */
 public boolean asyncDecode(CachedData d) {
  return false;
 }

}


This is a stop-gap solution until we make the transition to Memcached with JSON encoding. Then I can use Jackson-JSON - which is a fast JSON encoder/decoder for Java enabling a great portable message protocol between the two stacks and nearly any other language we might add to the system (like Python).

Wednesday, February 01, 2012

Basic ETL with Gearman and MySQL in a few lines of PHP code

Gearman is awesome. If you do not know what it is, its a queue and load balancing system for an arbitrary number of workers which enables distributed computing across many nodes. Some of the same guys who worked on mySQL source worked on Gearman.

Feel free to search my blog on other gearman uses.

The Problem:

We store a lot of stats, make a lot of changes and we want to see the result of the stats in realtime. Our stat system is pretty slick. For each tag increment the application increments a count and group said tag by minute, hour, month with a hash tag numeric representation of the text for compact writes. This means 1 tag write produces 4 SQL statements. We track over 239211 distinct tags at around 10K Writes per second on a single mySQL instance on EC2 in a RAID-10 EBS xtra-large Config (although because EBS mirrors internally I can just raid-0 but I was too scared).

Once the mySQL instance hits disk (EBS) our throughput becomes very unstable, possibly slowing down the site. The solution was to defer these writes but how can I do it without building a logging system, aggregator, loader and having a bunch of moving parts? Really I want to only spend 10 mins on this problem and use existing monitoring code.  So my 10 min solution:

 3 mins to write the code
 6 mins testing
 55 seconds of patting myself on back
 5 seconds to deploy

Solution Detail:

Since Gearman workers connect to the GearmanD server's Job QUEUE and loop for more Jobs. This means program stays in memory for the length of the process (until worker restart). This means I can transform the data in application memory. Since the program is persistently connected to the DB that means I can periodically load the data in chunks.

In stead of having 100s of possible concurrent connections doing writes I can control the writes based on the number of workers. Innodb is very fast and consistent at low levels of concurrency (less then 50).

Since I can drain the queue from GearmanD and transform the data locally I do not really need to worry about running out of memory on the queue server. The consumer is faster then the producer. I can combine 1000s of writes into a single write.

Let's look at some code:
<?
    require_once("includes/config.php");
    require_once("includes/DB/EventTrackerDB.php");


    class EventTrackerETL {
       
        //
        // keep track of distinct tags
        //
        public static $eventTable  = array();

        //
        // the next flush
        //
        public static $nextWrite   = 0;

        //
        // keep a stat of total writes
        //
        public static $totalEvents = 0;

        //
        // total number of events
        //
        const MAX_NUMBER_OF_EVENTS = 30000;

        //
        // number of seconds to pause
        //
        const FLUSH_INTERVAL = 20; // seconds

        /*
         * transform all the tags into a sum of the counts entered
         *  @params string $event    - the tag being incremented
         *  @params int $count       - the supplied count many times its just 1
         *  @params int $timeEntered - EPOC timestamp
         *  @retuns void
         */
        public static function transform($event, $count, $timeEntered) {

            //
            // initialize or increment a tag
            //
            if (isset(self::$eventTable[$event])){
                self::$eventTable[$event] += $count;
            } else {
                self::$eventTable[$event] = $count;
            }
            
            //
            // flush if we hit the max number of events
            //
            if (sizeof(self::$eventTable) > self::MAX_NUMBER_OF_EVENTS){
                return self::load();
            }

            //
            // flush if its our time
            //
            if (self::$nextWrite < time()){
                return self::load();
            }
            return;
        }

        /*
         * flush the stored tags to the database
         *  @returns void
         */
        protected static function load() {

            //
            // write transformed events to the db
            //
            $thisRun = 0;
            foreach(self::$eventTable as $event => $sum) {
                EventTrackerDB::singleton()->updateEvent($event, $sum);
                $thisRun++;
                self::$totalEvents++;
            }

            $msg = "EventTracker write complete $thisRun events this run and a total of " . self::$totalEvents . " events written so far";
            Debugger::log("OT", $msg);

            //
            // re-init
            //
            self::$nextWrite = time() + self::FLUSH_INTERVAL;
            self::$eventTable = array();
        }
    }
?>

In summary with gearman I am able to process 250K events in seconds. The queue never builds up and there is special code to handle kills (not SIGKILL).