Category:

Datalab is an internal analytics team that powers data-driven decisions at Hootsuite. We build internal tools, data pipelines, and warehouses for other teams and analysts around the company. When I started on datalab and got introduced to all the technologies we are using here, one of my question was “Why do we have multiple databases? Why can’t we just use one?” especially after I learnt that a lot of the data is duplicated to both our Amazon Redshift cluster and MySQL database. There are numerous different databases out there: from MongoDB, MySQL, SQLite to Cockroach DB and Cassandra, and sometimes a combination of different DBMS can benefit your workflow and boost your productivity. So maybe you don’t really have to choose between Row and Column store!

On datalab we mainly use MySQL as the authoritative source of truth and Amazon Redshift, mostly, as read-only analytical data warehouse. In this blog post, I will try to show you how we use them together, how that benefits our productivity, and what are the differences you should be aware of, if you’re thinking of integrating either one (or both) into your workflow.

 

Column Store vs Row Store

The two most common types of databases are row-oriented and column-oriented systems. They have fundamentally different designs which make them efficient in different use cases.

On datalab, as you might guess, we deal with lots of data. We have dedicated analysts who perform all kinds of manipulations on data we collect from all our products to draw big pictures of products usage and perform detailed analyses on particular features and metrics for data driven decisions. To allow our analysts perform those complicated analyses, we have engineers on the team who built services and ETLs to store, update, and transfer the data between our databases. In order to optimize the work we do, we are using not one, but two data warehouses: a row-store and a column-store. In our case, MySQL and Amazon Redshift, respectively.

MySQL, a row based database, was our first data warehouse. It’s been around since 1995 and has become the world’s most popular open source database. As a traditional row oriented DBMS, it stores data as rows on the disk, meaning that all columns of a table are stored together and, if you are to access or alter any row of your table it could be done very easily and efficiently. Because of how records are stored on the disk (row by row, remember?) it is very fast to fetch entire rows.

Let’s imagine we have a user table with the following columns: userid, username, email, address, country, region, joindate, and an index key on joindate, it would look like this:

userid      username      email  address country     region join_date
10034 David Kim dkim@mail.com      NULL Canada North America .       2014-08-02
10035 Marie Joy NULL 1723 Secret Garden Rd     France Europe 2012-09-13
…. ….. …. ….. …. …. ….
 

Now, if we want to find all users that joined Hootsuite after June 2014 we would run the following query:

SELECT * FROM hsusers WHERE joindate >= ‘2017-07-01’ ;

It would be fast, assuming an index on join_date.  Likewise, inserting, updating, or deleting individual rows in this table would be very quick. Say that you wanted to update the address of just a few users, that would also be fast since all the user’s information is stored in one place, you’d only have to alter a few tuples on the disk.

However, these are not always the queries we use. A lot of tables are very sparse with numerous fields in them and we don’t necessarily need to access all the information. Now let’s say that in addition to our user table we now have a table that stores information about events and user payments.

Events table:

event user_id         timestamp
Message Sent         10035 2017-04-26 22:50:30
Image Viewed 10034 2017-03-26 18:48:37
…. ….. …..
 

Payments table:

userid       amount       currency     paymentdate member_plan
10034 12.00 CAD 2017-04-28 22:50:30      3
10035 134.13 CAD 2017-04-28 10:01:45 1
…. ….. …. ….. ….
 

Now we want to find all users who made payments in the last two years and had a low product usage rate. The database would have to join all those tables (user table, events, and payments), and access all the fields, even the ones that won’t be related to the final result. The tables need to be joined on user_id and then the final result could be returned. This can get inefficient with scaling up the data, and performing complex queries.

Accessing individual columns without touching the whole row is almost impossible with row-oriented architecture, since under the hood the database would likely have to read entire records in both tables.

This is when Amazon Redshift, comes in play!

Column-oriented  databases have been around since late 1960s as a niche technology, but have become dramatically popular for analytical workloads only in the last 5-10 years. These type of databases have advantages of data compression, late materialization of tuples, block iteration, code pipelining, and cache locality which makes them faster and more efficient for analytics types of queries. As you probably already guessed, data on the disk is stored as column by column and, as the result, accessing individual fields is extremely fast. So finding all users that made payments in the last two years and had a low product usage rate would be very fast in Redshift.

This is why Redshift is great for data analytics and that is why we use it as our data warehouse; very often we want to know things like, average user age, or revenue for X period of time, or correlation between plan and the size and industry of the organizations. Redshift is great for that!

Now, let’s try a more realistic example. Suppose, we still have our users and payments tables. And let’s imagine someone asked us to calculate monthly revenue of all Hootsuite users per plan. The following query would give us the results we want:

Running an analogous query on our production data with ~16 million records in the user table and ~18 million records in the payment table took 7 minutes to finish in MySQL and (ta-daa) 4.8 seconds in Redshift!

Yes, the difference is huge!

We already talked about the benefits of column-store for these kind of queries, but let’s dive a little bit deeper into what’s happening under the hood that makes Redshift in this case, so much more efficient.

First is late materialization: with late materialization of tuples, Redshift is able to effectively select needed columns, construct pairs and perform operations on the compressed data, before reconstructing that into the final tuples and returning the result. While MySQL would have to read all the tables, join them together, and perform needed operations.

Another benefit of Redshift system design is efficient computing. Redshift is a cluster based system, meaning that it consists of multiple nodes and computing can be distributed to different nodes in the cloud system in the way that would optimize the process the most.

But..

Well.. Redshift is not perfect for all queries. For the same design reasons it is not efficient for huge data updates or queries like the following:

In real life our user table would be pretty sparse, containing all sorts of information about our users. Since Redshift would store it by columns, it has to reconstruct the whole user table together by accessing individual columns that might even be stored on different machines and therefore involve sending information over the network to return the final result. This is a much slower process than fetching rows that are stored together in MySQL.

But this is not all..

Another thing that is good to know about Redshift is Vacuuming.

If you delete a record in Redshift it doesn’t actually get deleted from the disk, but instead gets marked as deleted or “dead”. Same thing with updates: the original record would be marked as deleted and, instead of modifying it, a new one is created with the updated information. You can imagine how much garbage data we would have after thousands and thousands of updates and deletes. This is what is vacuuming is used for: to get rid of all that garbage and free up the disk space. Another use case is making sure your records are always stored in order. If you specified a sort key on a table, your data would be sorted, but as you are inserting new records, Redshift doesn’t go and look for a proper place to insert that record into, but instead it just places it in the unsorted area on the disk, until you run Vacuuming that will sort that unsorted area and put the records into proper places for high efficiency. But the only catch there, is that it’s not done automatically, you have to schedule and perform vacuuming manually, which is one more thing to worry about.

This diagram illustrates how unsorted region of your table grows as new records get inserted

And everything is not that simple..

Let’s get back to our events table, and let’s say we have a sort key on the timestamp column. In this case, as new records get added, their timestamp is likely to be greater than the existing records’ timestamps. Say we had an outage and lost events data for the last four days (that would be a lot!). But we are lucky to have it somewhere else and be able to backfill the table. In this case, when we insert lost data into our events table and then perform vacuuming to make sure it’s sorted, only unsorted region would be overwritten since redshift knows that everything above is already sorted and vacuuming would be very fast.

If inserted data overlaps with existing data, then more regions of the table need to be overwritten. In this case, our new records are likely to have increasing timestamps, which will make our data be more or less in order, and would result in minimal overlap. But what if we also wanted to sort our data on userid, so that Redshift could efficiently find users? New records will have both very high and very low userid values. Now, data overlap will be much bigger, and if we insert many new records, vacuuming operation will need to re-write nearly entire table to keep all the records sorted on the disk by both timestamp and user_id:

 

And as the table grows, vacuuming costs will grow too, since costs of the merge step are proportional to the size of the table – we have one table that takes at least 6 hours to vacuum!  

There are strategies go around that reduce the costs of the problem: for example, if you have a huge table and you actually want to sort the data by both timestamp and user_id, you could split that table into smaller tables by timestamp ranges, and then create a view that would union those tables together. Then, when you insert a new data, it is likely that you will only ever need to modify the table with the most recent timestamp range, so you will only need to vacuum that small sub-table (much faster). But everything comes at a cost: this also means that your maintenance process will be more complex with a need for creating new tables as the dataset grows, switching vacuuming to those tables, update views, and having poor performance of certain types of queries.

Now, let’s get back to MySQL for a minute.

Another great thing about MySQL is the ability to validate and restrict the data. Redshift lacks many of these functions. One of the things I learnt the hard way was that Redshift UNIQUE key does not enforce actual uniqueness of records… you can define a UNIQUE key on a table, but it is more for you to be aware that that column is supposed to have unique member_id or name or something else, whereas MySQL would actually ensure that, and would not let you insert existing value into the table. The reason for this is, again, lack of traditional indexes in Redshift – it would be very inefficient for Redshift to check if a given value is unique, so there is no uniqueness constraints, and enforcing uniqueness is left to the application layer.

In other words, if you want to have some restrictions on your data, MySQL would do a great job. This is why a lot of the times we would load the data into MySQL first and then copy it over to our Redshift cluster for analysts to use. Luckily, we can dump data from MySQL to S3 quite efficiently, and bulk load it from S3 to Redshift blazingly fast, so this is actually a fairly simple and quick process.

If you are wondering why no one has come up with a database that would be something in the middle and would have benefits of both row and column based DBMS, I would recommend reading this paper that discusses the fundamental architectural differences between those two and proves that it is impossible for a row-oriented database to gain all the advantages of column-store while applying a number of optimizations.  The long and short of it, though, is that storing data as rows vs. columns, both on disk and in memory, comes with largely inescapable tradeoffs – the different designs will always have their pros and cons.

In the meantime, Hootsuite, as a lot of other companies, use a combination of databases for analytics. Our Redshift cluster serves primarily as a query layer, allowing us to perform analytical queries and generate reports relatively fast, and our MySQL database lets us validate data, and do work requiring regular inserting, updating and deleting of individual records (or small batches). When developing new ETLs and services we might use both Redshift and MySQL depending on the type of queries they would require. But generally speaking, having that flexibility and advantages of both DBMS, allows our team to be more productive without waiting hours to retrieve certain data; and we find it worth the overhead of copying data from one system to another.

About the Author

Daria is a co-op student on DataLab. She is in the Computer Science program with Software Engineering option at University of Victoria. In her spare time she likes drinking good coffee, traveling, and capturing nature with her camera.

 

 

 

 

 

While working on Hootsuite’s Facebook Real-Time service for the past few months, I have had an extremely mind-opening experience dealing with back end development and the underlying architecture which makes all of the internal services work together. I will highlight the architecture of a Hootsuite service with reference to Facebook Real-Time.

Overview

The Facebook Real-Time service consists of two microservices; Facebook Streaming and Facebook Processing. A microservice is an approach of service oriented architectures which divides a service into smaller, more flexible, and specialized pieces of deployable software. This design feature was decided to ensure each service does one thing very well. It allows scaling of modules of a service according to the resource usage it needs, and allows greater control over it. One key reason is that Facebook Processing does a lot of JSON parsing which consumes more CPU. The way we utilize microservices, is by implementing each microservice as a specialized piece of software. In the case of Facebook Real-Time, Facebook Streaming specializes in collecting event payload data from Facebook, while Facebook Processing parses this data into specific event types.

Facebook Streaming

Facebook Streaming collects large payloads of information from Facebook when certain real-time events occur; a streaming service. Data streaming, in this case, is the act of dynamic data being generated by Facebook on a continual basis and being sent to the Facebook Streaming Microservice. These events can include actions such as a post on a Facebook page, or the simple press of the ‘like’ button on a post. Facebook Processing parses these large payloads of data into specific event types.

Webhooks are used in the collection of events from a Facebook page. They register callbacks to Facebook to receive real-time events. When data is updated on Facebook, their webhook will send an HTTP POST request to a callback url belonging to the Facebook Streaming Microservice and this will send the event payloads.

 

Facebook Processing

The Facebook Processing Microservice parses the event payload from Facebook Streaming into more specific events. So a published post will be one kind of event, and a ‘like’ will be another type of event. Here, the number of events which are assigned event types, can be controlled. This is important as event payloads are large, and requires a lot of CPU to parse. Limiting the set of event payloads being parsed at once, reduces CPU usage. So instead of parsing the event payloads at the rate they are received from Facebook Streaming, it can consume a set of these event payloads at a time, while the rest are in queue until the set of consumed events are already parsed.

We have also built a registration endpoint into the Facebook Processing service. Instead of manually adding facebook pages to the database of pages registered for streaming, the endpoint can be called by a service and will register the specified Facebook page.

 

Event Bus

A payload consists of batches of events from different Facebook pages; we call this a ‘Raw Event’. These raw events are published from Facebook Streaming to the Event Bus. The Event Bus is a message bus which allows different micro-services Apache Kafka technology and consists of a set of Kafka clusters with topics. A topic corresponds to a specific event type and consumers can subscribe to these topics. The event data corresponding to these topics will be collected by the consumer. A service can consume events from the event bus or produce events to the event bus, or both! Each service is configured to know which topics to consume or produce.

Event messages are formatted using protocol buffers. Protocol buffers (Protobuf)  are a mechanism for serializing structured data. So the structure of a type of event only needs to be defined once, and it can be read or written easily from a variety of data streams and languages. We decided to use protocol buffers because it has an efficient binary format and can be compiled into implementations in all our supported languages. These include Scala, PHP, and Python. Event payloads can also be easily made backwards compatible, which avoids a log of ‘ugly code’ compared to using JSON. These are just some key examples, but there exist many other benefits to using Protobuf. With the growing number of services at Hootsuite, there was the problem where the occurrence of an event would need to be communicated to other services, asynchronously. The value which the Event Bus provides is that a service where the event is happening does not need to know about all the other services which the event might affect.

Giving an overview of how the Facebook Real-Time Service interacts with the Event Bus, raw events are first published to the Event Bus from the Facebook Streaming Microservice and consumed by Facebook Processing, where the events are assigned specific event topics and sent back to the Event Bus. This allows for specific events to be consumed by other services. Additionally, the events stay on the event bus for a period of time until they expire. In our case, because we wanted to publish raw events and then parse them at a different rate than the events are being sent to us, this allowed us to use the Event Bus as a temporary storage tool and enable Facebook Real-Time to consist of two, separate microservices. It also allows us to offset the producer versus consumer load used for processing events in the Facebook Processing service.

 

Service Discovery

Skyline routing is used to send HTTP requests to the Facebook Real-Time service. Skyline is a service discovery mechanism which ‘glues’ services together; it allows services to communicate with each other and indicate if they are available and in a healthy state. This makes our services more reliable and we are able to build features faster. Skyline routing allows a request to be sent to a service without knowing the specific server which the service is hosted on. The request is sent, and redirected to the appropriate server corresponding to the service. It routes requests from clients to instances of a service, so it can reroute the request to another server if the service worker fails. This also includes when a service is being restarted, so if there are several instances of the service running, then requests will go to another instance of the service while the other instance is restarting. This functionality also allows for new instances of services to be set up if it is being overloaded and there are too many events in queue, improving response time.

In addition, the client can access all the services routed by skyline via a single URL based namespace (localhost:5040), by specifying the service name and associated path. The request will be routed to the corresponding address where the service is hosted.

In conclusion, a microservice publishes events to the event bus, which contains a large pool of events. These events can be consumed by another microservice, which will effectively use the event data. And the services can communicate with each other via HTTP using Skyline routing, a service discovery mechanism.

 

About the Author

Xintong is a Co-op Software Developer on Hootsuite Platform’s Promise team. She works closely with the Facebook Streaming Microservies which is built on Hootsuite’s deployment pipeline and uses the event bus to communicate event information to the Inbound Automation Service. Xintong also enjoys the arts and likes to invest her creativity into fine arts and music.