Tuesday, May 23, 2017

Golang (Go) and BoltDB

I've been using Go for some time now (3 years) and I am constantly impressed with the language's ease of use. I originally started my career in C-Unix System Programming, then Java, then PHP and now I am rather language agnostic. Out of all the languages I know, go is the most fun and there is a strong community behind it.

BoltDB is yet another NoSQL Key-Value store, designed to be embedded and I happened across it for a small use case. I use GO to crawl sites and parse HTML DOM in a very concurrent manner to gather data for analysis from a variety of remote web sources. BoltDB is used to keep state as I transfer from my local mac book to a remote server and it is very easy to use. Basically, I needed a portable embedded database that is fast and resilient without setting up MySQL and keeping the schema in sync between dev and production. This is not user facing just a set of go packages that help me keep state so I can know where to pick up from in case of some sort of error, like I turn off my laptop or some random panic.

Let's look at BoltDB usage. Below is my struct, everything is a string because I am not formatting or typing things yet.

type TableRow struct {       

       Title string       
       Time string       
       Anchor string      
       Price string       
       Notified string // could make this a Time Struct but let's be simple

Next, I create my.db if it doesn't exist. The function check looks to see if there are errors and panics.  The line defer db.Close() will close the db at the end of the function which these calls are made from. The function addRecord will create a bucket called parser_bucket which is a const and add the key byte with value triggering a bucket creation if this is the first run. It is something fast to make a point and yes there are more efficient ways to do this.

db, err := bolt.Open("my.db", 0644, &bolt.Options{Timeout: 10 * time.Second})
defer db.Close()
addRecord(db, []byte("start"), "starting") // create bucket when it doesn't exist

The function addRecord takes 3 arguments; db - the boltdb struct, key a byte array and a value which can be anything, in our case, TableRow the struct above. The function is lower case so it is not "public".  The interface v is marshaled into a byte array and stored in boltdb after it checks that the bucket is created. Finally, the addRecord function returns an error if an error occurred.

func addRecord(db *bolt.DB, key []byte, v interface{}) error {
       value, err := json.Marshal(v)
       return db.Update(func(tx *bolt.Tx) error {              
                  bkt, err := tx.CreateBucketIfNotExists([]byte(bucket))
                  if err != nil {                     
                     return err              
                  fmt.Printf("Adding KEY %s\n", key)              
                  return bkt.Put(key, value)       

To get a TableRow out of the database a read transaction is performed in BoltDB. This method is capitalized so it is a package public method. GetRecord returns a table row or panics if an error occurred.

func GetRecord(db *bolt.DB, key string) *TableRow {
       row := TableRow{}       err := db.View(func(tx *bolt.Tx) (error) {
              bkt := tx.Bucket([]byte(bucket))              
              if bkt == nil {                     
                 return fmt.Errorf("Bucket %q not found!\n", bucket)

              val := bkt.Get([]byte(key))
              if len(val) == 0 {                     
                 fmt.Printf("key %s does not exist\n", key)                     
                 return nil              
              err := json.Unmarshal(val, &row)
              return err
       return &row

Calling this function returns a TableRow reference. There are no real pointers in go but I conceptualize this internally as a pointer.

This is it. This is all there really is to BoltDB. Read Transactions, Write Transactions that are concurrency-safe. You can even run the Unix command strings on the database file so see if you stored the data correctly as a sanity check and you should see json from the output (if that is your serializer).

In conclusion, BoltDB is fast, so far safe and does exactly what I need. Store State, without expecting an external DB. Embedded databases are awesome and go is awesome. Give it a try.

Tuesday, November 22, 2016

INNODB Tablespace Copy in Go Lang

I just uploaded a quick tool that I think you will find useful if you need to consolidate, expand innodb databases if tablespaces are in use.

This golang application will copy an entire innodb database from one server to another server via scp.
innodb-tablespace-copy follows the algorithm described here. This golang application copies 4 tables in parallel after setting up the remote environment. Then in parallel import the tablespaces. I've only used this application on Percona XTRADB 5.6 but it should work for all flavors of innodb that are out there.

Note to recover from interruption, this is done manually either by discarding the tablespace or by dropping the remote database.

Feel free to add to it and make it better :)

Wednesday, August 10, 2016

Tech Stack at Shots Quick Post

The Shots APP we use the following technology to serve many millions of Photos, Videos and Cached Links.


RedHat Enterprise 6 on the Front ends and DBs. Amazon Linux (Centos) on Elastic Search and Go servers
Apache 2+
Percona 5.6 XTRADB with some minor custom stuff (sharded)

we have a little bit of Python, JAVA and a lot of GO!

One of the current features which has wildly been successful is sharing links on mobile, which is very hard. Mobile is not built for links but fortunately Instagram and YouTube are. To make this work; we have the client read from the clipboard. The client makes a call home where the link is sent to a distributed worker system which fetches the content of the HTML page, finds the media, manipulates the media and then distributes the media on our CDN. Links only last for a few days.

This is like a poor man's AMP and only took us a few days to write. We even retranscode videos to make sure the format fits our timeline and doesn't hog up to much bandwidth.

MySQL keeps state so the same link is not rebuilt and everything is fronted with Redis - since redis supports pipelined commands - which is great for a feed our size. The next feed version will be a go-tao-like system.

All in all for 3 days of work, the system works great and scales linearly. It is near real time. Give it a try. Link a Instagram or Youtube url and you will see for yourself.

Some things that I'd like to do in the future is use QUIC on a websocket layer. To have a non-blocking messaging system which is blazing fast and works on spotty networks and integrate ROCKSDB.

But that's another post.

Monday, September 28, 2015

Wish there is another String DataType called LIST but there is not

I believe the future of SQL is to take a lot of primitives that are Computer Science fundamentals and add them as datatypes to expand on the allowed columns today. The idea is of the ilk of a merging of noSQL and SQL  for solving problems to make it easier for a new person to develop.

For instance, what would be awesome is a LIST type, where the list contains a distinct number of string items mapped to a bit, much like SET yet you don't need to predefine all the items in a set.

Here is a good example as how I would use a list type:

Imagine you need permissions on a per row basis. Some rows are public, some are private, some are viewable by a small set of people. (Less than 64).

Let's take the example of Finding all rows that are public or are viewable by only me.

When creating a row

INSERT INTO resource_permissions (resource_id, perm_bit, list_dt) VALUES(1, 2, "dathan, sam, fred")

perm_bit is 0 private, 1 = public, 2 public to a list of people

When selecting rows that I "dathan" can see

SELECT resource_id FROM resource_permissions WHERE perm_bit = 1 UNION SELECT resource_id FROM resource_permissiongs WHERE perm_bit = 2 AND FIND_IN_LIST(list_dt, "dathan");

What the above statement says is give me all the public resource_ids and resource_ids that I "dathan" can see.

Right now I can't do this, I have to use a medium_blob and a LIKE

SELECT resource_id FROM resource_permissions WHERE perm_bit = 1 UNION SELECT resource_id FROM resource_permissions WHERE perm_bit = 2 AND list_dt LIKE "%:dathan:%"

As you can see I'm able to simulate the desired behavior but I can't use an index, I don't want to use a FULLTEXT_INDEX due to overhead and other issues that out of scope for this post. Nor do I want to manage UDF's or Stored procedures. The last two are not desirable yet can also simulate the behavior I am looking for.

Some primitives from REDIS or other noSQL solutions would be awesome additions for SQL as a hole IMHO.

My two cents.

Also in 5.7 maybe the JSON Column Type might be of some use.

Tuesday, September 22, 2015

Golang websockets (wss) and "OOP"

Golang is awesome. My 1st Language back in 1994 was C. Then the following year my Computer Science Dept. switched from C/Pascal to C++. I even like C++ but I like C more mainly because of nostalgia.

Enter Go. The Syntax for me is a mix between JSON, and C. I love it. I've created 3 new servers all doing a ton of TPS. What I would like to share with you is some GO code to that handels websockets

If you are building a server using websockets, over secure TCP your browser behaves slightly differently than a client side application using a websocket library. Specifically when working with wss (secure websockets) across domains.

Its up to the client to respect Origin, so a client implementing a websocket doesn't have to set the Origin Header, but your browser does. This is done on purpose and its a good thing. To get websockets to work over secure sockets, let's make our assumptions consistent and do not report Origin errors with On the fly overriding methods. The power of Go.

// going to override the handshakeserver := websocket.Server{
   Handshake:func(config *websocket.Config, req *http.Request) error {
      return nil;

above says override the method in the libararies (golang.org/x/net/websocket) with the supplied local function and return nil for error - which means all is good.

Anytime that ORIGIN is sent the server (non browser clients and even the browser doesn't have to do this) ignore the origin handshake

http.Handle(nsp.path, websocket.Handler(server.Handler));

Next we handle the websocket with the supplied handler in the server called nsp.handle. nsp.handle is a string of a function name that takes in a websocket connection. nsp.path means for a given http connected path execute the handler.

This is awesome. Everything works, but what is cooler is how GOLANG handles OOP. The term used in GOLANG is embedding, and changing the type or executed method (method overriding), thats called Shadowing

Here is an example

package main
import "datarepo"
type DataLayer struct {

////https://github.com/luciotato/golang-notes/blob/master/OOP.md#golang-embedding-is-akin-to-multiple-inheritance-with-non-virtual-methods//func NewDataLayer(subject string, class string ) DataLayer {

   ret := DataLayer{ datarepo.DataRepoAccess{Subject: subject, Classof: class}}
   return ret;


//// wrapper method to add in an counter//func(dl *DataLayer) Execute() ([]byte, error){ // shadowed   Reporter.increment("api_layer_cmd", 1)
   var base = dl.DataRepoAccess;
   return base.Execute()

DataLayer is a Wrapper Design Pattern Around datarepo.DataRepoAccess a structure I wrote that handles talking to the backend. datarepo.DataRepoAccess has a method called Execute. In the example above Execute is "Shadowed" or overridden. This new method counts the number of times the base class is called.

These months of coding go has been so much fun. I love learning new things but also getting my work done on time. Go enables me to do both. The analogy that I can compare learning go to is like learning to SnowBoard. In the beginning it's like getting your ass smacked with a cold wet shovel but once you get it you got it.

Wednesday, August 05, 2015

San Francisco mySQL Meetup August 26 2015

Shots Architecture and how we handle extreme load spikes

I invite you to come out and join me in a talk about the above heading. I will describe many things and walk through the cases of what technology is used, where, why and how. The event information is located here. I'll also touch on, how cost is reduced, how we handle celebrity's load when they promote and what's next to make the system even more automatic and solid.

Thanks for sfmysql.org for all the work they do and for allowing me to give a talk.

Monday, June 01, 2015

Reporting Across Shards

If you have chosen to split your data across boxes, and architected your app to not query across boxes there is still a case where you will need to. Data mining, reports and data health checks require hitting all servers at some point. The case I am going over is sessions and figuring out the Session Length without taking averages of averages which is wrong.

Let's assume you have a session table of the following

mysql> describe sessions;
| Field    | Type                | Null | Key | Default | Extra |
| user_id  | bigint(20) unsigned | NO   | PRI | 0       |       |
| added_ms | bigint(20) unsigned | NO   | PRI | 0       |       |
| appVer   | varchar(8)          | YES  |     | NULL    |       |
| device   | bigint(20) unsigned | YES  | MUL | NULL    |       |
| start    | int(10) unsigned    | NO   | MUL | NULL    |       |
| stop     | int(10) unsigned    | NO   |     | NULL    |       |

The data is federated (distributed) by user_id. This table exists across 1000s of servers. How do you get the average session length for the month of May?

  • The question already scopes the process to hit every single server
  • Second we can't just take AVG((stop-start)) and then sum and divide that by the number of shards
  • We can't pull all the data in memory
  • We don't want to have to pull the data and upload it to BigQuery or Amazon RedShift
  • We want a daily report at some point
SELECT SUM((stop-start)) as sess_diff, count(*) as sess_sample FROM sessions WHERE start BETWEEN $start AND $stop AND stop>start
The above SQL statement says for the connection to a single server give me the sum of the session delta and count the corresponding rows in the set. In this case the SUM of SUMs (sum of session_delta) is the numerator and the sum of sess_sample is the denominator.
Now do this across all servers and finally write some client code to take a few rows < 1000 to report the number.

$total = 0;
$sessions_diff = 0;

foreach ($rows as $shard_id => $result) {

    $sessions_diff = \bcadd($sessions_diff, $result[0]['sess_diff']);
    $total = \bcadd($total, $result[0]['sess_sample']);

Now the session_avg = sessions_diff/total

Tada a query that can take hours if done on a traditional mining server is done in ms.

Wednesday, April 01, 2015

Federating THE friends table in a Sharded mySQL environment without downtime or users noticing

A friends table is the cornerstone of social applications. Its purpose is to define relationships and help answer the question what are my friends doing.

Here is an example friend’s table:

 CREATE TABLE `friends` (
  `user_id` bigint(20) unsigned NOT NULL,
  `friend_id` bigint(20) unsigned NOT NULL,
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `user_id-auto_ts` (`user_id`,`auto_ts`),
  KEY `friend_id-auto_ts` (`friend_id`,`auto_ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

With the table above we can get a list of user_ids a user follows (following), or a list of people who follow said user (followers), or get a list of mutual follows. This is a very simple table structure yet very powerful.

The problem is this table doesn't scale on a single server, when you have millions of users, each user has many friends, all users are semi to deeply connected the table becomes a problem. Mix this with a huge request rate, with lots of concurrency a single server just doesn't scale.

One can replicate the friends table but what starts to cause lag is when many users start adding or removing friends at once. So, how can we distribute this table across many servers holding a small % of the friend graph?

Let's look at the friends table.  It defines whom a user follows and who follows the user ordered by insertion time.

Let's create two tables:

CREATE TABLE `following` (
  `user_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `friend_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `mutual` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT 'Flag to denote mutual connections',
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `user_id-auto_ts` (`user_id`,`auto_ts`)

CREATE TABLE `followers` (
  `user_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `friend_id` bigint(20) unsigned NOT NULL DEFAULT '0',
  `mutual` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT 'Flag to denote mutual connections',
  PRIMARY KEY (`user_id`,`friend_id`),
  KEY `friend_id-auto_ts` (`friend_id`,`auto_ts`)

The 'following' table defines whom a said user follows. The table is federated by the user_id so this table exists on the user_id's shard.

The 'followers' table fines that is following the said user. On every follow instead of writing one row, we now write two rows. One write on the following user's shard. One write on the followed users shard. Thus the followers table is federated by friend_id.

This can be best described by an example on reads:

How many people am I user_id 3306 following?

Connect to my Shard-x, execute the query

SELECT COUNT(*) FROM following WHERE user_id = 3306

How many people are following me (user_id 3306)

Connect to my Shard-x, execute the following query

SELECT COUNT(*) FROM followers WHERE user_id = 3306

Now let's look at a write, of me (user_id:3306) following friend_id:11211

3306 is on Shard-x
11211 is on Shard-y

So, 1st we write to the fact that 3306 is following 11211. We connect to Shard-x and execute the transaction

INSERT INTO following (user_id, friend_id, mutual, auto_ts) VALUES(3306, 11211, 0, NOW());

Now connect to Shard-y to write the followers row. If the connection fails rollback the transaction on 3306's Shard-x, otherwise

INSERT INTO followers (user_id, friend_id, mutual, auto_ts) VALUES(3306, 11211, 0, NOW());
if affected rows == 1 (no error)
COMMIT on Shard-x
COMMIT on Shard-y

Now we can answer the main questions.

But what about something like. Give me my friends photos sorted by last upload time 10 at a time?

Well here is the magic sauce. We are going to do a FANOUT reads and hit all the shards, which my friends are on. For my environment this is much better than a FANOUT of writes, since we like to customize in real-time the feed as well as duplicating the data 10000s of times becomes very expensive quickly as servers start turning cold. We can go into this topic a bit more in another post.

Now I execute the query across from friends shards

SELECT p.id FROM photos p JOIN followers f ON(f.friend_id=p.user_id) WHERE f.user_id = 3306 ORDER BY p.id DESC LIMIT 10;

If I have a 1000 friends and 100 shards, each friend has 10 photos I am going to get back 1000 rows.

But the Order is not what I am going to display because I want to display the latest 10 photos. Thus I will need to sort in memory on the application server and take a slice of the results.

But what if I want the 2nd page?
SELECT p.id FROM photos p JOIN followers f ON (f.friend_id=p.user_id) WHERE f.user_id = 3306 p.id < [LAST_ID_FROM_FIRST_PAGE] ORDER BY p.id DESC LIMIT 10

In the application we pass the last_id from the 1st page and execute the same FANOUT on reads again do the same logic and return the photos.

Your questions might be, but isn't this slow because people with large networks will have to hit every shard each time and you have to loop - execute - read on each connection?

This can be mitigated with memory, pipelining and parallel SQL execution.

If you're social graph is like twitter where all active users follows 100K users and the feed doesn't change dynamically writing the data to each shard may be for you. But, again this is out of scope for this post.

What about answering the question mutual connections?

On ever write of a friend relationship, do a select to see if the followed person follows the follower. Then mark the row on both shards as mutual.

For all my personal cases, this distributed friends table solves all my needs. Lots of friend writes from importing friends from say an address book or email or other social network friend graph and a large concurrency is not going to affect me SINCE the table has been removed from a Single Point and is now distributed across many servers.

Reads are fast because only a % of data is on each shard, 90% of the queries hit only that shard for a given user.

Feed type queries are fast because the SQL is executed in parallel if we have to go to the SQL Layer. Most data is cached, reducing the need to FANOUT on reads.

Finally federating without downtime or users notices requires a backfill script and writes to the old friends table as well as writes to the new friend tables. Once this is done, fix all the queries to use the new format. Then sit back and feel good that good work was done :)