Wednesday, August 10, 2016

Tech Stack at Shots Quick Post

The Shots APP we use the following technology to serve many millions of Photos, Videos and Cached Links.

LAMP

RedHat Enterprise 6 on the Front ends and DBs. Amazon Linux (Centos) on Elastic Search and Go servers
Apache 2+
Percona 5.6 XTRADB with some minor custom stuff (sharded)
PHP

we have a little bit of Python, JAVA and a lot of GO!


One of the current features which has wildly been successful is sharing links on mobile, which is very hard. Mobile is not built for links but fortunately Instagram and YouTube are. To make this work; we have the client read from the clipboard. The client makes a call home where the link is sent to a distributed worker system which fetches the content of the HTML page, finds the media, manipulates the media and then distributes the media on our CDN. Links only last for a few days.

This is like a poor man's AMP and only took us a few days to write. We even retranscode videos to make sure the format fits our timeline and doesn't hog up to much bandwidth.

MySQL keeps state so the same link is not rebuilt and everything is fronted with Redis - since redis supports pipelined commands - which is great for a feed our size. The next feed version will be a go-tao-like system.

All in all for 3 days of work, the system works great and scales linearly. It is near real time. Give it a try. Link a Instagram or Youtube url and you will see for yourself.


Some things that I'd like to do in the future is use QUIC on a websocket layer. To have a non-blocking messaging system which is blazing fast and works on spotty networks and integrate ROCKSDB.

But that's another post.


Monday, September 28, 2015

Wish there is another String DataType called LIST but there is not

I believe the future of SQL is to take a lot of primitives that are Computer Science fundamentals and add them as datatypes to expand on the allowed columns today. The idea is of the ilk of a merging of noSQL and SQL  for solving problems to make it easier for a new person to develop.


For instance, what would be awesome is a LIST type, where the list contains a distinct number of string items mapped to a bit, much like SET yet you don't need to predefine all the items in a set.


Here is a good example as how I would use a list type:


Imagine you need permissions on a per row basis. Some rows are public, some are private, some are viewable by a small set of people. (Less than 64).

Let's take the example of Finding all rows that are public or are viewable by only me.

When creating a row

INSERT INTO resource_permissions (resource_id, perm_bit, list_dt) VALUES(1, 2, "dathan, sam, fred")

perm_bit is 0 private, 1 = public, 2 public to a list of people

When selecting rows that I "dathan" can see

SELECT resource_id FROM resource_permissions WHERE perm_bit = 1 UNION SELECT resource_id FROM resource_permissiongs WHERE perm_bit = 2 AND FIND_IN_LIST(list_dt, "dathan");


What the above statement says is give me all the public resource_ids and resource_ids that I "dathan" can see.


Right now I can't do this, I have to use a medium_blob and a LIKE

SELECT resource_id FROM resource_permissions WHERE perm_bit = 1 UNION SELECT resource_id FROM resource_permissions WHERE perm_bit = 2 AND list_dt LIKE "%:dathan:%"


As you can see I'm able to simulate the desired behavior but I can't use an index, I don't want to use a FULLTEXT_INDEX due to overhead and other issues that out of scope for this post. Nor do I want to manage UDF's or Stored procedures. The last two are not desirable yet can also simulate the behavior I am looking for.


Some primitives from REDIS or other noSQL solutions would be awesome additions for SQL as a hole IMHO.


My two cents.

Also in 5.7 maybe the JSON Column Type might be of some use.







Tuesday, September 22, 2015

Golang websockets (wss) and "OOP"

Golang is awesome. My 1st Language back in 1994 was C. Then the following year my Computer Science Dept. switched from C/Pascal to C++. I even like C++ but I like C more mainly because of nostalgia.

Enter Go. The Syntax for me is a mix between JSON, and C. I love it. I've created 3 new servers all doing a ton of TPS. What I would like to share with you is some GO code to that handels websockets


If you are building a server using websockets, over secure TCP your browser behaves slightly differently than a client side application using a websocket library. Specifically when working with wss (secure websockets) across domains.

Its up to the client to respect Origin, so a client implementing a websocket doesn't have to set the Origin Header, but your browser does. This is done on purpose and its a good thing. To get websockets to work over secure sockets, let's make our assumptions consistent and do not report Origin errors with On the fly overriding methods. The power of Go.

// going to override the handshakeserver := websocket.Server{
   Handshake:func(config *websocket.Config, req *http.Request) error {
      return nil;
   },
   Handler:websocket.Handler(nsp.handle),
}

above says override the method in the libararies (golang.org/x/net/websocket) with the supplied local function and return nil for error - which means all is good.

Anytime that ORIGIN is sent the server (non browser clients and even the browser doesn't have to do this) ignore the origin handshake

http.Handle(nsp.path, websocket.Handler(server.Handler));


Next we handle the websocket with the supplied handler in the server called nsp.handle. nsp.handle is a string of a function name that takes in a websocket connection. nsp.path means for a given http connected path execute the handler.


This is awesome. Everything works, but what is cooler is how GOLANG handles OOP. The term used in GOLANG is embedding, and changing the type or executed method (method overriding), thats called Shadowing


Here is an example

package main
import "datarepo"
type DataLayer struct {
   datarepo.DataRepoAccess}

////https://github.com/luciotato/golang-notes/blob/master/OOP.md#golang-embedding-is-akin-to-multiple-inheritance-with-non-virtual-methods//func NewDataLayer(subject string, class string ) DataLayer {

   ret := DataLayer{ datarepo.DataRepoAccess{Subject: subject, Classof: class}}
   ret.New();
   return ret;

}

//// wrapper method to add in an counter//func(dl *DataLayer) Execute() ([]byte, error){ // shadowed   Reporter.increment("api_layer_cmd", 1)
   var base = dl.DataRepoAccess;
   return base.Execute()
}


DataLayer is a Wrapper Design Pattern Around datarepo.DataRepoAccess a structure I wrote that handles talking to the backend. datarepo.DataRepoAccess has a method called Execute. In the example above Execute is "Shadowed" or overridden. This new method counts the number of times the base class is called.


These months of coding go has been so much fun. I love learning new things but also getting my work done on time. Go enables me to do both. The analogy that I can compare learning go to is like learning to SnowBoard. In the beginning it's like getting your ass smacked with a cold wet shovel but once you get it you got it.

Wednesday, August 05, 2015

San Francisco mySQL Meetup August 26 2015

Shots Architecture and how we handle extreme load spikes


I invite you to come out and join me in a talk about the above heading. I will describe many things and walk through the cases of what technology is used, where, why and how. The event information is located here. I'll also touch on, how cost is reduced, how we handle celebrity's load when they promote and what's next to make the system even more automatic and solid.

Thanks for sfmysql.org for all the work they do and for allowing me to give a talk.

Monday, June 01, 2015

Reporting Across Shards

If you have chosen to split your data across boxes, and architected your app to not query across boxes there is still a case where you will need to. Data mining, reports and data health checks require hitting all servers at some point. The case I am going over is sessions and figuring out the Session Length without taking averages of averages which is wrong.


Let's assume you have a session table of the following

mysql> describe sessions;
+----------+---------------------+------+-----+---------+-------+
| Field    | Type                | Null | Key | Default | Extra |
+----------+---------------------+------+-----+---------+-------+
| user_id  | bigint(20) unsigned | NO   | PRI | 0       |       |
| added_ms | bigint(20) unsigned | NO   | PRI | 0       |       |
| appVer   | varchar(8)          | YES  |     | NULL    |       |
| device   | bigint(20) unsigned | YES  | MUL | NULL    |       |
| start    | int(10) unsigned    | NO   | MUL | NULL    |       |
| stop     | int(10) unsigned    | NO   |     | NULL    |       |
+----------+---------------------+------+-----+---------+-------+


The data is federated (distributed) by user_id. This table exists across 1000s of servers. How do you get the average session length for the month of May?

  • The question already scopes the process to hit every single server
  • Second we can't just take AVG((stop-start)) and then sum and divide that by the number of shards
  • We can't pull all the data in memory
  • We don't want to have to pull the data and upload it to BigQuery or Amazon RedShift
  • We want a daily report at some point
SELECT SUM((stop-start)) as sess_diff, count(*) as sess_sample FROM sessions WHERE start BETWEEN $start AND $stop AND stop>start
The above SQL statement says for the connection to a single server give me the sum of the session delta and count the corresponding rows in the set. In this case the SUM of SUMs (sum of session_delta) is the numerator and the sum of sess_sample is the denominator.
Now do this across all servers and finally write some client code to take a few rows < 1000 to report the number.

$total = 0;
$sessions_diff = 0;

foreach ($rows as $shard_id => $result) {

    $sessions_diff = \bcadd($sessions_diff, $result[0]['sess_diff']);
    $total = \bcadd($total, $result[0]['sess_sample']);
}


Now the session_avg = sessions_diff/total

Tada a query that can take hours if done on a traditional mining server is done in ms.


Wednesday, April 01, 2015

Federating THE friends table in a Sharded mySQL environment without downtime or users noticing


A friends table is the cornerstone of social applications. Its purpose is to define relationships and help answer the question what are my friends doing.

Here is an example friend’s table:

 CREATE TABLE `friends` (
  `user_id` bigint(20) unsigned NOT NULL,
  `friend_id` bigint(20) unsigned NOT NULL,
  `auto_ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `user_id-auto_ts` (`user_id`,`auto_ts`),
  KEY `friend_id-auto_ts` (`friend_id`,`auto_ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci


With the table above we can get a list of user_ids a user follows (following), or a list of people who follow said user (followers), or get a list of mutual follows. This is a very simple table structure yet very powerful.

The problem is this table doesn't scale on a single server, when you have millions of users, each user has many friends, all users are semi to deeply connected the table becomes a problem. Mix this with a huge request rate, with lots of concurrency a single server just doesn't scale.

One can replicate the friends table but what starts to cause lag is when many users start adding or removing friends at once. So, how can we distribute this table across many servers holding a small % of the friend graph?

Let's look at the friends table.  It defines whom a user follows and who follows the user ordered by insertion time.

Let's create two tables:

CREATE TABLE `following` (
  `user_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `friend_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `mutual` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT 'Flag to denote mutual connections',
  `auto_ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `user_id-auto_ts` (`user_id`,`auto_ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8


CREATE TABLE `followers` (
  `user_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `friend_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `mutual` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT 'Flag to denote mutual connections',
  `auto_ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `friend_id-auto_ts` (`friend_id`,`auto_ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8


The 'following' table defines whom a said user follows. The table is federated by the user_id so this table exists on the user_id's shard.

The 'followers' table fines that is following the said user. On every follow instead of writing one row, we now write two rows. One write on the following user's shard. One write on the followed users shard. Thus the followers table is federated by friend_id.

This can be best described by an example on reads:

How many people am I user_id 3306 following?

Connect to my Shard-x, execute the query

SELECT COUNT(*) FROM following WHERE user_id = 3306


How many people are following me (user_id 3306)

Connect to my Shard-x, execute the following query

SELECT COUNT(*) FROM followers WHERE user_id = 3306


Now let's look at a write, of me (user_id:3306) following friend_id:11211

3306 is on Shard-x
11211 is on Shard-y

So, 1st we write to the fact that 3306 is following 11211. We connect to Shard-x and execute the transaction

BEGIN
INSERT INTO following (user_id, friend_id, mutual, auto_ts) VALUES(3306, 11211, 0, NOW());
// DO NOT COMMIT YET


Now connect to Shard-y to write the followers row. If the connection fails rollback the transaction on 3306's Shard-x, otherwise

BEGIN
INSERT INTO followers (user_id, friend_id, mutual, auto_ts) VALUES(3306, 11211, 0, NOW());
if affected rows == 1 (no error)
COMMIT on Shard-x
COMMIT on Shard-y


Now we can answer the main questions.


But what about something like. Give me my friends photos sorted by last upload time 10 at a time?

Well here is the magic sauce. We are going to do a FANOUT reads and hit all the shards, which my friends are on. For my environment this is much better than a FANOUT of writes, since we like to customize in real-time the feed as well as duplicating the data 10000s of times becomes very expensive quickly as servers start turning cold. We can go into this topic a bit more in another post.


Now I execute the query across from friends shards

SELECT p.id FROM photos p JOIN followers f ON(f.friend_id=p.user_id) WHERE f.user_id = 3306 ORDER BY p.id DESC LIMIT 10;

If I have a 1000 friends and 100 shards, each friend has 10 photos I am going to get back 1000 rows.

But the Order is not what I am going to display because I want to display the latest 10 photos. Thus I will need to sort in memory on the application server and take a slice of the results.


But what if I want the 2nd page?
SELECT p.id FROM photos p JOIN followers f ON (f.friend_id=p.user_id) WHERE f.user_id = 3306 p.id < [LAST_ID_FROM_FIRST_PAGE] ORDER BY p.id DESC LIMIT 10

In the application we pass the last_id from the 1st page and execute the same FANOUT on reads again do the same logic and return the photos.

Your questions might be, but isn't this slow because people with large networks will have to hit every shard each time and you have to loop - execute - read on each connection?

This can be mitigated with memory, pipelining and parallel SQL execution.

If you're social graph is like twitter where all active users follows 100K users and the feed doesn't change dynamically writing the data to each shard may be for you. But, again this is out of scope for this post.

What about answering the question mutual connections?

On ever write of a friend relationship, do a select to see if the followed person follows the follower. Then mark the row on both shards as mutual.

For all my personal cases, this distributed friends table solves all my needs. Lots of friend writes from importing friends from say an address book or email or other social network friend graph and a large concurrency is not going to affect me SINCE the table has been removed from a Single Point and is now distributed across many servers.

Reads are fast because only a % of data is on each shard, 90% of the queries hit only that shard for a given user.

Feed type queries are fast because the SQL is executed in parallel if we have to go to the SQL Layer. Most data is cached, reducing the need to FANOUT on reads.

Finally federating without downtime or users notices requires a backfill script and writes to the old friends table as well as writes to the new friend tables. Once this is done, fix all the queries to use the new format. Then sit back and feel good that good work was done :)

Thursday, March 12, 2015

Long time since an update but great stuff coming along

So, its been a long time since I contributed anything to my blog. That will end very soon. Things coming up is writing about the architecture of Shots, Shard optimizations, Data Organization and Grouping, Java, Golang and some cool other stuff. Also how to handle Justin Biebers traffic, which is INSANE.


In the meantime if you live in the San Francisco California Bay Area, you want to work with the coolest founders on the planet, make a big difference in peoples lives, know mySQL / redis / memcache / Some C style language or want to learn contact me. I have a great job for you!




Thursday, July 10, 2014

Manually Switch Slaves to new Masters in mySQL 5.6 (XTRADB 5.6)

I'm really excited about Fabric which was recently announced. Everything it does has been a variety of scripts for  me or manual tasks, but before I can integrate Fabric into my system I must know more about it. When dealing with live-data and moving servers around I still do things manually just because it makes me feel better to know that if data is lost, I was the cause for doing something dumb. Basically I need to know everything about Fabric including line by line execution until I will deploy it.


Here are my steps for switching and replacing a Shard Slave.

Imagine having a setup in the following Config.

Shard Server  10.0.30.123 - this is the master endpoint

The Global Shard which holds Friend Info to join against is 

10.0.1.1

10.0.30.123 --- replicates from ---> 10.0.1.1

Now the Shard Server
10.0.30.123 has 3 slaves, thus 10.0.30.123 is set up to log-slave-updates
The 3 slaves are 10.0.18.78, 10.0.22.76, 10.0.22.77 and I want to make 10.0.22.76 the new master for the said Shard with 10.0.22.77 as its slave. So, what I have is

3 slaves --- replicates from ---> 10.0.30.123 --- replicates from ---> 10.0.1.1

what I will end up with is

10.0.22.77 -- replicates from ---> 10.0.22.76 ---> 10.0.1.1

I am getting rid of 10.0.30.123 and 10.0.18.78


Here are the steps.

Tell 10.0.22.77 and 10.0.22.76 to SLAVE UNTIL the next binary log in 10.0.123.1
  • ssh to each box
  • STOP SLAVE (using mysql 5.6) on 10.0.22.7[6-7]
  • SHOW SLAVE STATUS\G -- get Master_Log_File : master-bin.000612
  • START SLAVE UNTIL MASTER_LOG_FILE='master-bin.000613', MASTER_LOG_POS=4
Now what I did here was tell the slaves to replicate until the next bin log is reached
mysql> START SLAVE UNTIL MASTER_LOG_FILE='master-bin.000613', MASTER_LOG_POS=4;
Query OK, 0 rows affected, 2 warnings (0.01 sec)

mysql> SHOW WARNINGS;
+-------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Level | Code | Message                                                                                                                                                                                  |
+-------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Note  | 1278 | It is recommended to use --skip-slave-start when doing step-by-step replication with START SLAVE UNTIL; otherwise, you will get problems if you get an unexpected slave's mysqld restart |
| Note  | 1753 | UNTIL condtion is not supported in multi-threaded slave mode. Slave is started in the sequential execution mode.                                                                         |
+-------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set (0.00 sec) // notice the minor bug in the spelling :)

I also get a warning that says my multiple SQL threads are now a single one which is fine.


My next step is to ssh to 10.0.30.123

  • FLUSH LOGS - this tells 10.0.30.123 to rotate all log files including mysql-bin.000613
Now on the slaves I wait until they stop

Once both stop, on 10.0.22.76 I issue RESET MASTER // I don't care about what was replicated at this point and saved already in the binlogs - I've already verified that they are in-sync with CHECKSUM TABLE

On 10.0.22.77 I issue the command
 
STOP SLAVE; CHANGE MASTER TO MASTER_LOG_FILE='master-bin.000001', MASTER_LOG_POS=4, MASTER_HOST='10.0.22.76'; START SLAVE;

if you get an error  

Fatal error: The slave I/O thread stops because master and slave have equal MySQL server UUIDs; these UUIDs must be different for replication to work.
stop mysql, remove auto.cnf in your $DATADIR (/var/lib/mysql)

On 10.0.22.76 I issue 

START SLAVE


Now I wait until the SLAVE catches up to the MASTER 10.0.30.123 (remember this works because of log-slave-update)


Next in my dbconfig.php file I change all references to 10.0.30.123 to 10.0.22.76

Verify everything is in sync (USE CHECKSUM TABLE ACROSS TABLES/SERVERS ) and push out the new config

After the push Make sure to restart all daemons or queue workers, they may cache the database config

Now do this all over again to make

10.0.22.76 replicate from 10.0.1.1


In conclusion, this is just to manual and screams for automation.  Soon it will be with Fabric which manages this process once I get around to rolling that out.