Friday, April 01, 2011

Building a Facebook Feed Like system on a Sharded mySQL System

Building a Feed can be broken down into a few key questions.

Who can see what?
How many people can see it?

These key answers really dictates the design of data structure from a global read method to a global write method. A global read method can be summarized using some sql.

SELECT * FROM Activity WHERE userId IN (?,?,?,?,?) AND createDate > NOW() - INTERVAL 2 WEEKS;

Above is actually not an optimal SELECT, what is really done in my case is foreach '?' do a parallel query to each shard group of

SELECT * FROM Activity WHERE userId = ? AND createDate > NOW() - INTERVAL 2 WEEKS;

The reason why the 1st query is not optimal is due to the fact that their are two ranges in the 1st query. The IN clause is a range and createDate is a range thus you can't use a composite key (userId, createDate) the query is only using userId.


A global write method can be summarized with the following query:

SELECT * FROM Activity WHERE rowOwnerId = ? AND createDate > NOW() - INTERVAL 2 WEEKS.

Now you may be wondering if you are doing a global write, why is there a select? For each friend that is following the person who has activity done to said person which either they initiated or is acted upon, write a row for said person and their friends. Thus one action can create 5000+ writes such that the number of writes are proportional to the number of friends that is following the person being acted upon. Writes are N+1.

Global writes allows for you're view of the Feed to be fast but is really hard to keep it in real-time in sync, additionally its very expensive since a copy of the pointer can be copied N times and to get real speed and to avoid additional reads, the content is copied for each friend write. In global writes you have to create queues which succumbs to queuing theory. Additional to these considerations edits and deletes are very hard to keep in sync as well as need special consideration for new friends joining. These things can expose system lag constantly. That being said it can still be done, it's a lot of work that is expensive in terms of hardware cost and developer time.


The system I have built supports both yet currently implements a global read method. The mysql table structure follows.

userId bigint unsigned NOT NULL DEFAULT 0 - this is the person being acted upon.
parentOwnerId bigint unsigned NOT NULL DEFAULT 0 - this is the person who created the content
parentId bigint unsigned NOT NULL DEFAULT 0 - this defines the pointer for the actual content
parentType smallint unsigned NOT NULL DEFAULT 0 - this defines that the content is a wall post or a friend event or

itemType smallint unsigned NOT NULL DEFAULT 0 - this defines the actual action, which could be a comment to the parent Type or just the parent itself
itemId bigint unsigned NOT NULL DEFAULT 0 - this defines the pointer to the item for content retrieval
createDate timestamp not NULL DEFAULT 0 - this indicates when the event entered the system
modifiedDate timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP - this defines when the item was added to for a later feature.


The primary key in this mix is
userId, itemType, parentId

this defines that the user acted upon will only have 1 row foreach type of a parentId. This means if there are 1000 comments to a status update, the activity table will have 1 row for that user that made the comment even if that same user was the person who made 1000 comments. Note that this activity table was updated 1000 times because modifiedDate is a timestamp that gets updated on every action.

As you can tell the Activity Table is centered to the entire system and the actual content tables are what I call branches to Activity. Let's take an example to explain this.

user 1000 adds a status update, status update is a table called WallPosts.

WallPosts.posterId - 88888
WallPosts.itemOwnerId - 1000
WallPosts.createDate - NOW()
WallPosts.posterId - 1000
WallPosts.post - This is my status update.

Activity.userId - 1000
Activity.parentOwnerId - 1000
Activity.parentId - 88888
Activity.parentType - 1
Activity.itemType - 1
Activity.itemId - 88888
Activity.createDate - NOW()
Activity.modifiedDate - NOW()


So now friends of 1000 will query userId 1000's shard for the WallPost. Now let's have userId 2000 leave a comment in a table called Comments

Comments.commentId - 88889
Comments.itemOwnerId - 1000
Comments.itemType - 1
Comments.commenterId - 2000
Comments.createDate - NOW()
Comments.comment - " I left a comment on your status update!"

This says that friend 2000 left a comment on userId's 1000 (their shard) of "I left a comment on your status update!"


Activity.userId - 2000
Activity.parentOwnerId - 1000
Activity.parentType - 1
Activity.parentId - 88888
Activity.itemType - 2 //(left a comment)
Activity.itemId - 88889
Activity.createDate - NOW()
Activity.modifiedDate - NOW()


This says that friends of userId -2000, 2000 left a comment on UserId 1000's status update where the comment and the status update both reside on 1000's shard. The content can be reached via

88889 == CommentId

and friends of 2000 now know that 2000 left a comment for 1000. Thus the viral effect within 1/2 degree's of userId 1000.


There is a lot more going on behind the scenes like Children of parents, Using the combined read throughput of memcache, sharded mysql system and permission filtering but this is the general idea.


I am going to continually update this post, so keep checking back here as I add diagrams, flows, links and details looks at grouping events.

Note: By no means am I saying that a global write or Fan-Out write is not as good as a Fan-Out on Load or global read. I'm taking an approach to make it cheap and not trying to optimize to soon.

Here are some good links that I found after writing this post:

http://www.quora.com/What-are-the-scaling-issues-to-keep-in-mind-while-developing-a-social-network-feed


http://www.quora.com/What-are-best-practices-for-building-something-like-a-News-Feed?q=news+feed

5 comments:

Mark said...

I've used a global write system when I've encountered the problem. I get around the write lag by queueing the writes so the posting action is quick.

I break fetching content down into two steps. The first step gets all the content identifiers, sorted in the appropriate order, and the second step does the actual retrieval.

By performing the retrieval as a second step, I get several wins.

1. No primary data duplication. Updates only have to happen in a single spot, and the distributed writes consist of storing pointers along with any static information needed for sorting (e.g. creation date) only. Metadata is duplicated in a couple places; more on that below.

2. Common caching. Every data retrieval operation uses the same strategy, so there is exactly one cache entry for each piece of content, shared by the entire system. Data is almost always found in the cache from a previous retrieval operation. Updates to the data need only update a single cache entry.

3. Orthogonal caching. Any caching of a list of items is separate from the items themselves. Thus, I can present an old list of items, with each item being completely up to date.

The downfalls are:

1. If the cache system is down, distributed data makes data retrieval painful from the distributed reads.

2. Indexed sorting is limited to static values only: sorting by creation date is easy, sorting by views is not.

To get around the sorting by views, comments, likes, etc., issue, I use different strategies. Most other data retrieval operations are contextual. If I'm looking at a piece of content or a user, that is my local context. Generally a piece of content will only fit into three contexts: a) as something created by a user, b) as something in response to another piece of content, and c) as a "featured" item on some global page (possibly human edited, or a newest or other list page). For each "local" context, I keep a table much like used for the feed concept. I include additional data that I need to sort by. Because the contexts are few in number, it is simple to update them as needed.

For instance, while content is store on the creator's shard, when I'm looking at a blog, I have a local table that contains all the response identifiers, plus the number of likes, creation date, etc., of each response. This makes it trivial to sort the responses.

For site wide "global" stuff, like most popular content, lists are continually recalculated in the background and stored in a table. It's okay if these tables are stale.

Each list is also cached, and the cache value is recalculated on update. This makes the writes more expensive, but keeps reading fresh and very fast. The overhead for the cache recreation is not that bad. Each sorting of the list is covered by an index.

I haven't yet come up with a good way to sort non-local contexts by updating values, such a feed. Global writes are acceptable at creation since they only happen once, but updating for view counts, etc., would be insane. I imagine the answer lies in using a map-reduce solution.

Dathan Vance Pattishall said...

Wow great Post Mark! we should follow up this discussion. For a realtime feel to the feed and filtering the need to query all your friends, I was going to keep a list of people active which filters the friend fetch to grab only things in which people added to the system since the last fetch. This gets pass my concern of constantly polling the entire friend graph slice of data. I'm going to really focus on this in the next few weeks to really explore everything I can prior to launch. Keep sharing!

Dathan Vance Pattishall said...

somehow mark's comment was killed off adding this now:



I've used a global write system when I've encountered the problem. I get around the write lag by queueing the writes so the posting action is quick.

I break fetching content down into two steps. The first step gets all the content identifiers, sorted in the appropriate order, and the second step does the actual retrieval.

By performing the retrieval as a second step, I get several wins.

1. No primary data duplication. Updates only have to happen in a single spot, and the distributed writes consist of storing pointers along with any static information needed for sorting (e.g. creation date) only. Metadata is duplicated in a couple places; more on that below.

2. Common caching. Every data retrieval operation uses the same strategy, so there is exactly one cache entry for each piece of content, shared by the entire system. Data is almost always found in the cache from a previous retrieval operation. Updates to the data need only update a single cache entry.

3. Orthogonal caching. Any caching of a list of items is separate from the items themselves. Thus, I can present an old list of items, with each item being completely up to date.

The downfalls are:

1. If the cache system is down, distributed data makes data retrieval painful from the distributed reads.

2. Indexed sorting is limited to static values only: sorting by creation date is easy, sorting by views is not.

To get around the sorting by views, comments, likes, etc., issue, I use different strategies. Most other data retrieval operations are contextual. If I'm looking at a piece of content or a user, that is my local context. Generally a piece of content will only fit into three contexts: a) as something created by a user, b) as something in response to another piece of content, and c) as a "featured" item on some global page (possibly human edited, or a newest or other list page). For each "local" context, I keep a table much like used for the feed concept. I include additional data that I need to sort by. Because the contexts are few in number, it is simple to update them as needed.

For instance, while content is store on the creator's shard, when I'm looking at a blog, I have a local table that contains all the response identifiers, plus the number of likes, creation date, etc., of each response. This makes it trivial to sort the responses.

For site wide "global" stuff, like most popular content, lists are continually recalculated in the background and stored in a table. It's okay if these tables are stale.

Each list is also cached, and the cache value is recalculated on update. This makes the writes more expensive, but keeps reading fresh and very fast. The overhead for the cache recreation is not that bad. Each sorting of the list is covered by an index.

I haven't yet come up with a good way to sort non-local contexts by updating values, such a feed. Global writes are acceptable at creation since they only happen once, but updating for view counts, etc., would be insane. I imagine the answer lies in using a map-reduce solution.

Mark said...

Killing my posts? Not fair lol


The big issue I see with pulling things in on request is latency. There's the connection overhead to each shard, the waiting for the query to return on each shard, and the writing of the local copy. None of that can happen in parallel while using a language like PHP, though you could batch the local writes.

I would also avoid duplicating the data if at all possible. If you're talking about substantial info, such as a 10 KB blog post, then multiple that by say 1000 friends... you're talking 10 MB of storage for 10 KB of text. Western Digital will love you. Granted, storing a bigint pointer isn't free, but it's a lot smaller.

That's another thing: I have one central table where I can lookup the shard for an item. I need this to get the content from a URL anyway. So if I'm looking at a page, the flow goes like this:

1. Parse the ID.
2. Attempt to get the item from cache. If in cache, go to step 6.
3. Attempt to get the shard pointer from cache. If in cache, go to 5.
4. Get the shard pointer from master lookup table and cache it.
5. Get the content from the shard and cache it.
6. Display data

That makes the user happy. If there are responses or other similar things for the page, calculate those:

7. Attempt to get list of responses from cache. If in cache, go to 9.
8. Get list from the same shard the content is on and cache it.
9. Attempt to get the items from cache. If all in cache, go to 13.
10. Attempt to get the shard pointers from cache. If all cached, go to 12.
11. Get the uncached shard pointers from master look up table and cache them.
12. Get the uncached items from the shards and cache them.
13. Display data.

And run 7 through 13 in parallel for whatever other lists of content need to be shown.

Some of the cache lookups can be combined to save on cache round-trips. For instance, it's a good idea to request the shard pointers at the same time as requesting the content itself in case the items aren't found. When the site's busy, the majority of read requests come entirely from cache, and the database is only touched to increment a view count (which doesn't trigger a recache; item caches expire every hour). The number of round trips to memcache can add up, so combining them whenever possible is a good idea.

That's the best I've come up with, anyway.

Connor Gurney said...

Thanks for this. Very useful for my new social network!