When I started at Credit Karma in late 2014, we were introducing Kafka into our data architecture. We were building out what would become the core data infrastructure for the company, which is and was growing rapidly. In a mere three months we were pushing 175k JSON events per minute into Kafka. At that time, we also introduced a new data warehouse, Vertica, to help us scale our analytics and reporting.
As we pondered the design for the application to read from Kafka and push data to Vertica, some challenges became apparent:
- Kafka only provides AT-LEAST once messaging semantics. This meant you had to remove duplicates as you read messages from Kafka.
- We had semi-structured data (JSON) in Kafka, however, Vertica is a SQL-compliant columnar store.
- As with most columnar stores, the most efficient way to load data is with a COPY Command and a TSV file containing a few hundreds of thousands of events, or rows. Data from Kafka had to be written to disk, which is an inherently slow operation when compared to anything operating in memory or the network.
Akka Actors take the stage
To ensure our new application could scale as our data ingest rates grew, we opted to use Akka Actors, a framework that utilizes the Actor model as its core concurrency abstraction. We chose this because it was proven to scale and we loved the simplicity of the API. While the basic Actor model provides a simple, scalable concurrency paradigm you still need to think about how work can pile up in your system. The sole way to interact with an actor is via a message, which means slow actors can have messages queue up in memory while fast actors continue to send them messages.
We built out the initial implementation using Akka Actors, with each actor owning one of the three challenges above. Our initial version was fairly effective.
We made an Extractor for each Kafka topic that created three child actors:
- A Reader for the direct Kafka interaction, which grabbed the next item from a Kafka Consumer iterator.
- A Deduplicator, which had a sliding hash table in memory, used to check if there were any duplicates and that also aged out old items (so we didn’t eventually run out of memory)
- A processor to convert our JSON into a line of TSV in preparation for writing to disk
The Extractor also wrote to disk and signaled a DB Loader actor to load the file when a file was ready (when it was big enough or a certain amount of time had passed). This worked well for a while. The setup is shown below:
Our goal with this flow was to push as many rows into Vertica as quickly as possible while accounting for all of the problems mentioned earlier. This ran for a while without any real issues, loading data into Vertica from Kafka. However, as time passed, we added more topics with additional events. Because writing to disk was the slowest part of the process we would eventually find messages piling up at the Extractor.
Running into a wall
Skip forward 14 months. Our system was using predictive modeling to match our 60 million members with thousands of available financial offers. We began logging the results of every single prediction as a single event. Once this topic came online we effectively doubled the number of events being generated. Before this new topic, we were pushing about 350k events per minute. Afterwards it was closer to 700k events per minute. All of a sudden the system fell over – we were halting and barely processing anything.
As you can see in the graph, we would process 10s of millions of events then grind to a halt. The red line is our target 750k mark, so it’s easy to see we were not keeping up with the ingest of data. It turns out we were getting slammed with garbage collection pauses.
We had actors getting overwhelmed and creating objects faster than the system could process/write them to disk or garbage collection could run. The system was killed by long garbage collection pauses. You can see in the Kamon-StatsD metrics below that often there was more time spent doing a garbage collection than not.
Our system had no concept of backpressure.
Akka Streams to drive high throughput
Luckily, by the time we found the problem Lightbend had introduced the Akka Streams API, a framework optimized for high throughput. Akka Streams comes with backpressure out of the box. Converting our existing actors to streams required only the following steps:
- The Reader was removed, and became the Source of the stream. We did this using the from Iterator source
- The Deduplicator became a custom processing stage (since this took an item and returned 0 or 1 items)
- Because our processor was effectively stateless, we merely converted the actor into a map stage
- We wound up not changing the Extractor/Vertica interaction due to time constraints. However, we were still able to integrate our disk writing actor with the stream and get backpressure by using the Subscriber trait
This was a big improvement! We were able to make four distinct pieces that the Akka Streams API allowed us to put together. With raw actors you have to connect all the pieces yourself, providing actor refs appropriately and hoping the messages you send are correct. With the streams API you can easily use the built-in processing stages to compose many little pieces.
Once the stream solution was deployed, our long garbage collection pauses were gone and we were loading data into Vertica consistently and in large batches. Here are our garbage collection times after the switch to Akka Streams:
We were excited. This was an order of magnitude of difference. We were consistently sitting at our highest levels from our other graph. The streams performed significantly better:
Data loading speeds as of writing this. Our ingest rates continue to go up.
Closing thoughts
Akka Actors are an extremely powerful and configurable abstraction. However, if you’re not careful your application can spend too much time garbage collecting message objects. When your application requires high throughput, take a look at Akka Streams to help manage the workload.
Thank you to Dustin Lyons, Aris Vlasakakis, Andy Jenkins, and Eva Vander Giessen.