Wednesday, August 05, 2015

San Francisco mySQL Meetup August 26 2015

Shots Architecture and how we handle extreme load spikes

I invite you to come out and join me in a talk about the above heading. I will describe many things and walk through the cases of what technology is used, where, why and how. The event information is located here. I'll also touch on, how cost is reduced, how we handle celebrity's load when they promote and what's next to make the system even more automatic and solid.

Thanks for for all the work they do and for allowing me to give a talk.

Monday, June 01, 2015

Reporting Across Shards

If you have chosen to split your data across boxes, and architected your app to not query across boxes there is still a case where you will need to. Data mining, reports and data health checks require hitting all servers at some point. The case I am going over is sessions and figuring out the Session Length without taking averages of averages which is wrong.

Let's assume you have a session table of the following

mysql> describe sessions;
| Field    | Type                | Null | Key | Default | Extra |
| user_id  | bigint(20) unsigned | NO   | PRI | 0       |       |
| added_ms | bigint(20) unsigned | NO   | PRI | 0       |       |
| appVer   | varchar(8)          | YES  |     | NULL    |       |
| device   | bigint(20) unsigned | YES  | MUL | NULL    |       |
| start    | int(10) unsigned    | NO   | MUL | NULL    |       |
| stop     | int(10) unsigned    | NO   |     | NULL    |       |

The data is federated (distributed) by user_id. This table exists across 1000s of servers. How do you get the average session length for the month of May?

  • The question already scopes the process to hit every single server
  • Second we can't just take AVG((stop-start)) and then sum and divide that by the number of shards
  • We can't pull all the data in memory
  • We don't want to have to pull the data and upload it to BigQuery or Amazon RedShift
  • We want a daily report at some point
SELECT SUM((stop-start)) as sess_diff, count(*) as sess_sample FROM sessions WHERE start BETWEEN $start AND $stop AND stop>start
The above SQL statement says for the connection to a single server give me the sum of the session delta and count the corresponding rows in the set. In this case the SUM of SUMs (sum of session_delta) is the numerator and the sum of sess_sample is the denominator.
Now do this across all servers and finally write some client code to take a few rows < 1000 to report the number.

$total = 0;
$sessions_diff = 0;

foreach ($rows as $shard_id => $result) {

    $sessions_diff = \bcadd($sessions_diff, $result[0]['sess_diff']);
    $total = \bcadd($total, $result[0]['sess_sample']);

Now the session_avg = sessions_diff/total

Tada a query that can take hours if done on a traditional mining server is done in ms.

Wednesday, April 01, 2015

Federating THE friends table in a Sharded mySQL environment without downtime or users noticing

A friends table is the cornerstone of social applications. Its purpose is to define relationships and help answer the question what are my friends doing.

Here is an example friend’s table:

 CREATE TABLE `friends` (
  `user_id` bigint(20) unsigned NOT NULL,
  `friend_id` bigint(20) unsigned NOT NULL,
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `user_id-auto_ts` (`user_id`,`auto_ts`),
  KEY `friend_id-auto_ts` (`friend_id`,`auto_ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

With the table above we can get a list of user_ids a user follows (following), or a list of people who follow said user (followers), or get a list of mutual follows. This is a very simple table structure yet very powerful.

The problem is this table doesn't scale on a single server, when you have millions of users, each user has many friends, all users are semi to deeply connected the table becomes a problem. Mix this with a huge request rate, with lots of concurrency a single server just doesn't scale.

One can replicate the friends table but what starts to cause lag is when many users start adding or removing friends at once. So, how can we distribute this table across many servers holding a small % of the friend graph?

Let's look at the friends table.  It defines whom a user follows and who follows the user ordered by insertion time.

Let's create two tables:

CREATE TABLE `following` (
  `user_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `friend_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `mutual` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT 'Flag to denote mutual connections',
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `user_id-auto_ts` (`user_id`,`auto_ts`)

CREATE TABLE `followers` (
  `user_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `friend_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `mutual` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT 'Flag to denote mutual connections',
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `friend_id-auto_ts` (`friend_id`,`auto_ts`)

The 'following' table defines whom a said user follows. The table is federated by the user_id so this table exists on the user_id's shard.

The 'followers' table fines that is following the said user. On every follow instead of writing one row, we now write two rows. One write on the following user's shard. One write on the followed users shard. Thus the followers table is federated by friend_id.

This can be best described by an example on reads:

How many people am I user_id 3306 following?

Connect to my Shard-x, execute the query

SELECT COUNT(*) FROM following WHERE user_id = 3306

How many people are following me (user_id 3306)

Connect to my Shard-x, execute the following query

SELECT COUNT(*) FROM followers WHERE user_id = 3306

Now let's look at a write, of me (user_id:3306) following friend_id:11211

3306 is on Shard-x
11211 is on Shard-y

So, 1st we write to the fact that 3306 is following 11211. We connect to Shard-x and execute the transaction

INSERT INTO following (user_id, friend_id, mutual, auto_ts) VALUES(3306, 11211, 0, NOW());

Now connect to Shard-y to write the followers row. If the connection fails rollback the transaction on 3306's Shard-x, otherwise

INSERT INTO followers (user_id, friend_id, mutual, auto_ts) VALUES(3306, 11211, 0, NOW());
if affected rows == 1 (no error)
COMMIT on Shard-x
COMMIT on Shard-y

Now we can answer the main questions.

But what about something like. Give me my friends photos sorted by last upload time 10 at a time?

Well here is the magic sauce. We are going to do a FANOUT reads and hit all the shards, which my friends are on. For my environment this is much better than a FANOUT of writes, since we like to customize in real-time the feed as well as duplicating the data 10000s of times becomes very expensive quickly as servers start turning cold. We can go into this topic a bit more in another post.

Now I execute the query across from friends shards

SELECT FROM photos p JOIN followers f ON(f.friend_id=p.user_id) WHERE f.user_id = 3306 ORDER BY DESC LIMIT 10;

If I have a 1000 friends and 100 shards, each friend has 10 photos I am going to get back 1000 rows.

But the Order is not what I am going to display because I want to display the latest 10 photos. Thus I will need to sort in memory on the application server and take a slice of the results.

But what if I want the 2nd page?
SELECT FROM photos p JOIN followers f ON (f.friend_id=p.user_id) WHERE f.user_id = 3306 < [LAST_ID_FROM_FIRST_PAGE] ORDER BY DESC LIMIT 10

In the application we pass the last_id from the 1st page and execute the same FANOUT on reads again do the same logic and return the photos.

Your questions might be, but isn't this slow because people with large networks will have to hit every shard each time and you have to loop - execute - read on each connection?

This can be mitigated with memory, pipelining and parallel SQL execution.

If you're social graph is like twitter where all active users follows 100K users and the feed doesn't change dynamically writing the data to each shard may be for you. But, again this is out of scope for this post.

What about answering the question mutual connections?

On ever write of a friend relationship, do a select to see if the followed person follows the follower. Then mark the row on both shards as mutual.

For all my personal cases, this distributed friends table solves all my needs. Lots of friend writes from importing friends from say an address book or email or other social network friend graph and a large concurrency is not going to affect me SINCE the table has been removed from a Single Point and is now distributed across many servers.

Reads are fast because only a % of data is on each shard, 90% of the queries hit only that shard for a given user.

Feed type queries are fast because the SQL is executed in parallel if we have to go to the SQL Layer. Most data is cached, reducing the need to FANOUT on reads.

Finally federating without downtime or users notices requires a backfill script and writes to the old friends table as well as writes to the new friend tables. Once this is done, fix all the queries to use the new format. Then sit back and feel good that good work was done :)

Thursday, March 12, 2015

Long time since an update but great stuff coming along

So, its been a long time since I contributed anything to my blog. That will end very soon. Things coming up is writing about the architecture of Shots, Shard optimizations, Data Organization and Grouping, Java, Golang and some cool other stuff. Also how to handle Justin Biebers traffic, which is INSANE.

In the meantime if you live in the San Francisco California Bay Area, you want to work with the coolest founders on the planet, make a big difference in peoples lives, know mySQL / redis / memcache / Some C style language or want to learn contact me. I have a great job for you!

Thursday, July 10, 2014

Manually Switch Slaves to new Masters in mySQL 5.6 (XTRADB 5.6)

I'm really excited about Fabric which was recently announced. Everything it does has been a variety of scripts for  me or manual tasks, but before I can integrate Fabric into my system I must know more about it. When dealing with live-data and moving servers around I still do things manually just because it makes me feel better to know that if data is lost, I was the cause for doing something dumb. Basically I need to know everything about Fabric including line by line execution until I will deploy it.

Here are my steps for switching and replacing a Shard Slave.

Imagine having a setup in the following Config.

Shard Server - this is the master endpoint

The Global Shard which holds Friend Info to join against is --- replicates from --->

Now the Shard Server has 3 slaves, thus is set up to log-slave-updates
The 3 slaves are,, and I want to make the new master for the said Shard with as its slave. So, what I have is

3 slaves --- replicates from ---> --- replicates from --->

what I will end up with is -- replicates from ---> --->

I am getting rid of and

Here are the steps.

Tell and to SLAVE UNTIL the next binary log in
  • ssh to each box
  • STOP SLAVE (using mysql 5.6) on[6-7]
  • SHOW SLAVE STATUS\G -- get Master_Log_File : master-bin.000612
Now what I did here was tell the slaves to replicate until the next bin log is reached
Query OK, 0 rows affected, 2 warnings (0.01 sec)

| Level | Code | Message                                                                                                                                                                                  |
| Note  | 1278 | It is recommended to use --skip-slave-start when doing step-by-step replication with START SLAVE UNTIL; otherwise, you will get problems if you get an unexpected slave's mysqld restart |
| Note  | 1753 | UNTIL condtion is not supported in multi-threaded slave mode. Slave is started in the sequential execution mode.                                                                         |
2 rows in set (0.00 sec) // notice the minor bug in the spelling :)

I also get a warning that says my multiple SQL threads are now a single one which is fine.

My next step is to ssh to

  • FLUSH LOGS - this tells to rotate all log files including mysql-bin.000613
Now on the slaves I wait until they stop

Once both stop, on I issue RESET MASTER // I don't care about what was replicated at this point and saved already in the binlogs - I've already verified that they are in-sync with CHECKSUM TABLE

On I issue the command

if you get an error  

Fatal error: The slave I/O thread stops because master and slave have equal MySQL server UUIDs; these UUIDs must be different for replication to work.
stop mysql, remove auto.cnf in your $DATADIR (/var/lib/mysql)

On I issue 


Now I wait until the SLAVE catches up to the MASTER (remember this works because of log-slave-update)

Next in my dbconfig.php file I change all references to to

Verify everything is in sync (USE CHECKSUM TABLE ACROSS TABLES/SERVERS ) and push out the new config

After the push Make sure to restart all daemons or queue workers, they may cache the database config

Now do this all over again to make replicate from

In conclusion, this is just to manual and screams for automation.  Soon it will be with Fabric which manages this process once I get around to rolling that out.

Tuesday, June 03, 2014

CTO of Shots on Core Technology, Culture and Working on the greatest App in the World

MySQL has opened a lot of avenues and opportunities for me. I am the CTO of Shots and I got here because I was a mySQL DBA, who can code in a variety of languages, understand data access, layout and design fast backends that scales to well over 100 million users, manage a team, give back to the community and prove myself through constant good work. Plus I've made every single mistake, so I know what not to do.

At Shots we of course use Percona XTRA DB 5.6, with Memcache, Redis, ElasticSearch, HAProxy, FluentD with Logstash plugins, Ruby, PHP 5.4, Go, Java, Erlang and AWS which are managed via a custom CHEF build. We use chef server like chef solo :)

In four months we grabbed over 1 million ACTIVE users all on IOS, which are mainly from US, UK, Australia, Canada and Brazil. We are able to handle Justin Bieber's traffic that is insane. Currently we have no downtime (yet, always plan for downtime). We Moved DC to AWS us-west-1, and S3 from S3-east crappy to S3 west. We grow organically, and are at the cusp of hitting our hockey stick growth, all on 12 Servers currently :)

There are 4 of us. Everything described here is what I handle, yet I say "we" throughout the description of this post and this is a good segway for our culture. As a team we build for our consumers, our Shotties, a positive, bully free, app that works across all platforms to keep humans interacting with humans and not all the other cruft you find on social networks. Team and Community Focus is our culture, with the confidence to build the best App in the world.

It's a photo status update or better known as a selfie app, which everyone from Shaq, King Bach, Justin Bieber to Floyd Money Mayweather and many others are using. Its cool to have these folks but we are not building a platform for just them we are building a platform for you. For the Teens who are different and like Cosplay, for the teen that loves Bieber, for the person who wants to remember the moment, for you.

I invite you all to use it. I invite you or for you to ask your friends if they would like to join us because now I am hiring an IOS and Andriod Dev. If you or they want to work for a startup, be apart of something cool with the purpose of changing the world and the way we interact with one another online, join us. The requirements are live in the Bay Area, can code and want to make a change :)

Monday, October 28, 2013

MariaDB 10.0.4, BeanStalkD, Geographic Replication, Event Tracker for stats gathering at 60K stats a second

Every company needs to see stats to understand how the application is performing, and how users are using the application(s). Typically a stat for most basic questions and even some advance questions can be summarized as  "What is said event over time?".  We call this EventTracker.

To add to the complexity of generating stats, how do you get stat events from a DataCenter (DC) in Singapore, a DC in Western Europe, a DC in Oregon to a database for querying in West Virginia - near real-time? I used multisource replication, and the BLACKHOLE storage-engine to do so with MariaDB.

Above is an image that shows a Webserver in some part of the world sends Events for tracking various interrupts to a BeanstalkD queue at time T in the same region. Each Region has a set of Python workers that grab events from BeanStalkD and writes the event to a local DB. Then the TSDB Database, a MariaDB 10.0.4 instance, replicates from each BlackHole StorageEngine BeanStalkD Worker server.

The obvious question might be why not use OpenTSDB? The TSDB daemon couldn't handle the onslaught of stats/second. The current HBase TSDB structure is much larger compared to a compressed INNODB row for the same stat. Additionally a region may loose connectivity to another region for some time so I would need to queue in some form or another events until the network was available again. Thus the need for a home grown solution. Now back to my solution.

The Structure for the event has the following DDL.

Currently we are using 32 shards defined by each bigdata_# database. This allows us to scale per database and our capacity plan is not DISK IO but based on diskspace.

MariaDB [(none)]> show databases;
| Database           |
| bigdata_0          |
| bigdata_1          |
| bigdata_10         |
| bigdata_11         |
| bigdata_12         |
| bigdata_13         |
| bigdata_14         |
| bigdata_15         |
| bigdata_16         |
| bigdata_17         |
| bigdata_18         |
| bigdata_19         |
| bigdata_2          |
| bigdata_20         |
| bigdata_21         |
| bigdata_22         |
| bigdata_23         |
| bigdata_24         |
| bigdata_25         |
| bigdata_26         |
| bigdata_27         |
| bigdata_28         |
| bigdata_29         |
| bigdata_3          |
| bigdata_30         |
| bigdata_31         |
| bigdata_4          |
| bigdata_5          |
| bigdata_6          |
| bigdata_7          |
| bigdata_8          |
| bigdata_9          |
| information_schema |
| mysql              |
| performance_schema |
35 rows in set (0.16 sec)

The database bigdata_0 is the only database that is slightly different than the rest. It has a table defined as EventTags that is not in the rest of the databases. EventTags is the map of eventId to tagName where eventId is just a numerical representation of a part of the md5 of the tagName. Each numerical representation falls into an address space that denotes the range of which database a tag should belong to. We use the EventTags table for the front-end to search for a stat to plot on a graph.


CREATE TABLE `EventTags` (
  `eventId` bigint(20) unsigned NOT NULL DEFAULT '0',
  `tagName` varchar(255) NOT NULL DEFAULT '',
  `popularity` bigint(20) unsigned NOT NULL DEFAULT '0',
  PRIMARY KEY (`eventId`,`tagName`(25)),
  KEY `eventTag` (`tagName`(25))

EventDay contains the actual value of the stat combined for the last 1 minute (currently). Our granularity allows down to a second but we found seeing events for the last minute is fine. The SQL produced for events are the following.

  `eventId` bigint(20) unsigned NOT NULL,
  `createDate` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 'min Blocks',
  `count` bigint(20) unsigned NOT NULL DEFAULT '0',
  PRIMARY KEY (`eventId`,`createDate`)

MariaDB's multi-source replication then downloads the statement based binary logs from each of the workers in some part of the world and applies the SQL to a combined database that represents all regions. Note that the central database is  of the same Structure BUT now the engine is Compressed INNODB with KEY BLOCK SIZE set to 8.

The front-end sits on top of the central database and we record everything from a single item being sold to load on our auto-scaling web-farm. Which allows us to do some interesting plots like Items sold as a function of Load over time. I(L(t))

Currently with this method we are producing 60K events a second that translates to a few thousand database updates a second across 4 replication threads (8 threads total). Keeping all data up to date from within the last minute near realtime.

Sunday, July 07, 2013

Speaking at RAMP: Scale Patterns and handling exponential growth without downtime

I will be in Budapest talking about Scale and Rapid Growth. I will start off with Flickr's Five minute conversation to take a direction on how to scale the backend to getting 90 million users in 3 weeks after going Viral.

RAMP will also have live streaming broadcasted at TNW,  HWSW and on USTREAM.