|||

Video Transcript

X

Building a Service-oriented Architecture with Rails and Kafka

This blog post is adapted from a talk given by Stella Cotton at RailsConf 2018 titled "So You’ve Got Yourself a Kafka."

Hey, everybody. We're gonna get started. I hope that you're here to listen to me talk about Kafka, 'cause that's the room that you are in. So, yeah. First things first, my name is Stella Cotton. I am an engineer at Heroku. And like I said, I'm gonna talk to you today about Kafka. You might have heard that Heroku offers Kafka as a service. We have got a bunch of hosted plans, from tiny plans to giant plans. We have an engineering team that's strictly dedicated to doing cool stuff to get Kafka running on Heroku in super high capacity. I am not on that team. If you were here to see that talk, this is the wrong talk. I don't actually know anything about running a Kafka cluster or tuning Kafka to handle super high load. So who am I? I am a super regular Rails engineer, like many of you. I wasn't actually familiar with Kafka at all when I joined Heroku. It was like this mysterious technology that suddenly was everywhere. And all these hosted Kafka solutions, not just on Heroku, but on other providers, and Kafka-like systems like Kinesis, they just sprang up. And it seemed important, but I wasn't sure why. And then when I joined Heroku, I am suddenly in this world where not only is Heroku an important part of our product offering, but it's actually a really integral part of our system architecture overall. So today I'd like to talk about three areas that I hope will help other Rails engineers become more familiar with Kafka. We're gonna start with what Kafka is. We'll talk about how Kafka can power your services. And a few, just like two practical considerations and challenges that were kind of unfamiliar to me when I started using event-driven systems.

So what is Kafka? The docs on kafka.apache.org, that's actually, the docs are super good, by the way. They describe it as a distributed streaming platform. But that doesn't really mean a lot to me as a web developer. But one of the classic use cases that people talk about for Kafka is data flow. So if you're running an e-commerce website, for example, you want to know more about what your users are doing on that platform. You want to track each page they visit, each button they click. If you have a lot of users, this can be a lot of data. And if we want to be able to send that data from our web applications to a data store that our analytics team uses, how could we record and stream that high volume of data? One way is to use Kafka. And the basic data structure that powers Kafka is this idea of an append-only log. And when you think about logs, people here, what do you think about? For most web developers, that's gonna be an application log. When something notable happens in our web applications, we're gonna log it in chronological order, we're gonna append each record to the record prior. And then once that record is persisted in this application log, it's gonna be there indefinitely until we truncate earlier versions of our logs. So in a similar fashion, Kafka is also an append-only log. Kafka has an idea of producers. And those are gonna be applications that produce log events. You can have one producer of events or you can have multiple producers of events. But unlike an application log, which is typically written so that you as a human can consume the log later on using your eyeballs, in the Kafka world, applications are gonna be the consumers of these events. Like with producers, you can have one consumer. You can have a bunch of consumers. And a big question for web developers is often what's different about Kafka than something like Sidekiq or Resque? And in Sidekiq and Resque, events are typically gonna be added to a queue. But once something picks it off to actually do the work, that disappears. In Kafka, it doesn't matter how many consumers are reading these events, those events are gonna continue to persist for other consumers to consume them until a specific retention period is over. So let's go back to our original example, our e-commerce app. We want this e-commerce application to create an event each time a user does something on the platform. We write each of these events or records, which is what Kafka calls events, and you want to do that to a user event log. And Kafka is gonna call a log a topic, basically. And if you have multiple services, they can all write events to this user event topic. And each of these Kafka records that we write, it's gonna have a key and a value, like a hash, and a timestamp. And Kafka's not gonna do any validation on this data. It's just gonna pass along binary data no matter what kind of format it's in. So in this scenario, we're using JSON. But you can use a lot of different kinds of data formats. And the communication that happens between these clients that are writing to Kafka and reading off of Kafka is gonna happen over a persisted TCP socket connection. So it's not gonna have a TCP handshake for every single event.

So how can our Rails apps specifically interact with a Kafka cluster? We can use one of a few Ruby libraries. Ruby Kafka is a lower-level library for producing and consuming events. It's gonna give you a lot of flexibility, but it also has a lot more configuration. So if you want to use a simpler interface without a lot of boilerplate setup, there's DeliveryBoy and Racecar, which are also maintained by Zendesk. And also Phobos. And these are gonna be wrappers around Ruby Kafka to kind of abstract away a lot of that configuration. You've also got Karafka, which is a different standalone library. And similarly to Ruby Kafka, it has a wrapper called WaterDrop. And it's based on the same implementation that we saw before with DeliveryBoy. So my team, we actually use a custom gem that pre-dates Ruby Kafka. So I haven't actually used these in production, but Ruby Kafka is gonna be the gem that Heroku is gonna recommend that you use if you look at our dev center documentation. So a brief look at how you can use DeliveryBoy. We're gonna use the simple version. Like I mentioned earlier, built on top of Ruby Kafka. And we can use it to publish events to our Kafka topic. First, we're gonna install the gem. The usual, run a generator. And it's gonna generate this config file, which you're probably pretty familiar if you've used databases. And DeliveryBoy is meant for getting things up and running very, very quickly. So you don't have to do any other configuration except the brokers. Like, give them the address and the port, which it might be local host if you're running it locally. And the DeliveryBoy docs will tell you how you can configure more things, but this is like the MVP. And using DeliveryBoy, we can write an event outside the thread of our web execution to a user event topic. You can see we passed in user event is the topic. And each of these topics can be made of one or more partitions. And in general, you want two or more partitions. Partitions are a way to partition the data that you're sending in a topic. And that allows you to scale your topic up as you have more and more events being written to that topic. And you can have multiple services that are writing to the same partitions. And it's the producer's job to say which partition you're gonna send those events to. DeliveryBoy is gonna help you balance events across those partitions. You can let it just assign your event randomly to a partition or you can actually give it a specific key. And why would you want to give a partition key? It's so that specific kinds of events go inside a single partition. Because Kafka only guarantees that events are delivered in order, like in our application log, inside a partition, not inside a whole topic. So for example, if you want to make sure that your user events related to a specific user all goes to the same partition, you'd pass in a user ID there. And that you would make sure that every event related to a user shows up in order so it doesn't look like they're clicking all around on the website out of order because it's going into different partitions. And under the hood, Ruby Kafka is using a hashing function. So it's gonna convert this into an integer and do a mod to divide by the number of partitions. So as long as your partition count stays the same, you can guarantee that it always goes to the same place. It's a little tricky if you start to increase your partitions. So now we're writing events to our user topic. That's really the only thing we really need to do if you have a Kafka cluster running. You can also use another gem called racecar. Same thing, wrapper around Ruby Kafka maintained by Zendesk. And it's the same, very little upfront configuration. Just this generator. This config file will look familiar 'cause it's pretty much the same. But this will also create a folder called consumers in your application too. And an event consumer subscribes to our user event topic with that subscribes_to method at the topic. And this is just gonna print out any data that it returns. So we're gonna run that consumer code inside of its own process. So it's gonna be running separately from the process that runs our web application. And in order to consume events, racecar is gonna create a group of consumers. It's gonna be a collection one or more Kafka consumers. And it's gonna read off of that user event topic. And each consumer inside that consumer group is gonna be assigned one or more partitions. And each of these are gonna keep track of where they are in these partitions using an offset, which is like a bookmark. A digital bookmark. And the best part is that when one consumer goes away if it fails, those topics are gonna get reassigned to other consumers. So as long as you've got at least one consumer process running, you've got a good availability story. So we talked a little bit about what is Kafka under the hood. And let's talk more about the technical features that make it valuable. So one is that Kafka can handle extremely high throughput.

One of the key performance characteristics that I thought was pretty interesting is that Kafka has the message broker, rather, Kafka does not use the message broker like the Kafka cluster to track where all the consumers are. Like a traditional enterprise queuing system like AMQP is going to actually have the event infrastructure itself keep track of all those consumers. And so as your number of consumers scales up, your event infrastructure is actually going to have more load on it. 'Cause it's gonna have to track a larger and larger state. And another thing is that consumer agreement is actually not trivial. It's not easy for the broker itself to know where the consumers are. Because do you mark that message has been processed as soon as it gets sent over the network? Well if you do and the consumer is down and it can't process it, how does the consumer say hey, event infrastructure, actually can you resend that? You could try a multi-acknowledgment process like with TCP. But it's gonna add performance overhead. So how does Kafka get around this? It just pushes all of that work out to the the consumer itself. The consumer service is in charge of telling where am I, what bookmark am I at in that ordered commit log. And so this is kind of cool because it means that reading and writing event data is constant time, O(1). So the consumer either knows that it wants, it has an offset, it says give me this specific place in the log. Or I want to start at the very beginning and read all of the events. Or I want to start at the very end. And so there's no scanning over large sets of data to figure out where it needs to be. And so the more data that exists, it doesn't matter. It doesn't change the amount of time to look up. And so Kafka is going to perform similarly whether you have a very small amount of data in your topics or a large amount of data. And it also runs as a cluster on one or more servers. It could be scaled out horizontally by adding more machines. Extremely reliable. And data is written to disk and then replicated across multiple brokers. So it has this scalable and fault-tolerant profile. So if you'd like to know more about the actual data distribution replication side of Kafka, this is a pretty good blog post. And by the way, the slides will be on the website and I'll tweet them out at the end. So don't bother trying to write down these links, 'cause that would be a little complicated. And so to give you a more concrete number around what scalable looks like, Netflix, LinkedIn, and Microsoft are literally sending over a trillion messages per day through their Kafka clusters.

So Kafka is cool. It's awesome. It can get data from one place to another. But we're not in a data science track. We're a track about services. So why do you care about this technology in the context of services? So some of the properties that make Kafka valuable for event pipeline systems are also gonna make it a pretty interesting fault tolerant replacement for RPC between services. If you aren't familiar with the term RPC, there's a lot of different arguments about what it means. But it means Remote Procedure Call. And tl;dr, it's a shorthand for one service talking to another service using a request, like an API call. So what does this mean in practice? Let's go back to our example of the e-commerce site. User is gonna place an order. It's gonna hit this create API in our order service. And when that happens, you'll want to create an order record, charge their credit card, send out a confirmation email. And so in a monolithic system, this is gonna usually be like this method call. It's gonna have blocking execution. You might using Sidekiq or something to handle sending the email. But as your system grows more and more complex, you might start extracting these out to their own service. And you can use RPC, Remote Procedure Calls, to talk between these services. So what's some of the challenges that you might find with using RPC? And so one is that the upstream service is responsible for the downstream service's availability. If the email system is having a really bad day, the upstream service is responsible for knowing whether or not that email service is available. And if not, retrying any failing requests. Or failing altogether. So how might an event stream help in this situation? So in this event-oriented world, an upstream service, our order is API, is going to write an event Kafka saying that an order was created. Because Kafka has this at least once guarantee, it means that that event is gonna be written to Kafka at least once and will be available for a downstream consumer to consume. So if our email service is down, that event is still there, that request is still there. And when the downstream consumer comes back online, it can pick back up, it can use its bookmark and say, I need to pick back up from XYZ and continue to process those events in order. So another challenge that larger and larger organizations find is coordination. In increasingly complex systems, integrating a new downstream service means a change to an upstream service. So if you'd like to integrate a new fulfillment provider, for example, it's gonna kick off a fulfillment process when an order is created. In an RPC world, you need to change that upstream service to make an API call out to your new fulfillment service. In an event-oriented world, you would add a new consumer inside the fulfillment service that's gonna create that order, consume the order created event topic. So there are some upsides. What's the big downside? In our first example, the dependency between those services is super clear. You look at that method and you see oh, this, this, this, and this. The upstream service knows exactly which downstream services depend on knowing that the order was created. By abstracting away that connection, you gain speed, you gain independence, but you do sacrifice clarity.

So you've got a Kafka, woo! You're comfortable with the trade-offs, you understand what you're getting into. You'd like to start incorporating events into your service arena and architecture. So what might this look like from an architectural perspective? Martin Fowler discusses how when people talk about event-driven applications, they can actually be talking about incredibly different kinds of applications. And I found this personally to be true, even inside of Heroku. And it can be a big pain point when you're having a discussion about challenges, about trade-offs, in these kind of systems. So he's trying to bring a shared understanding to what an event-driven system is. And he's started outlining a few architectural patterns that he sees most frequently. So I'm gonna kind of zoom through this to cover these. But you can learn more on his website. Or even better, he gave a keynote at GOTO Chicago that really covers this in-depth. So the first pattern he talks about is an Event Notification pattern. This is the absolute bare minimum, simplest, event-driven architecture. One service simply notifies the downstream services that an event happened. The event has very little information, it's just saying an order was created at this time. If the downstream services needs more information about what happens, it's gonna need to make a network call back up to the order service to fill that information out. The second pattern he talks about is Event Carried State Transfer. And in this pattern, the upstream is actually gonna augment that event with additional information so that your downstream consumer can keep a local copy of that data and not have to do that network call. And it can be actually super straightforward when everything that the downstream services needs is encapsulated inside that event generated by the upstream service, which is awesome. Except one of the challenges here is that you might need data from multiple systems. So for example, our order service, it's gonna create an order, write the event. Fulfillment service is gonna consume that event. And the fulfillment service is gonna need certain details about the order, which is totally fine, 'cause we have that order information. And it's gonna be passed along inside the event. But if it also needs to talk to a customer service, for example, to know who to send a package to, it's gonna either need to make a network call to actually retrieve that information like we saw earlier or it needs to find a way to persist a local copy of the data that it needs. And the idea is that you can also consider building that local copy off of events that the customer service is gonna write. And so the fulfillment service is gonna consume a separate set of events from your customer service that it can then persist locally and join it inside of its own database. A third pattern that Fowler talks about is Event-Sourced Architecture. This takes the idea of an event-driven system even further. And it's saying that not only is each piece of communication between your services kicked off by an event, but it says that by storing every single event or storing a representation of every single event and replaying all of these events, you could drop all your databases, completely rebuild the state of your application as it exists in this moment just by replaying that event stream. So Fowler talks about an audit log being a good use for this scenario. He also talks about this really interesting high-performant trading system, which is not really something that's relevant to my interest but might be relevant to yours. But having an audit log of everything that happens and actually relying on it to rebuild your application state is two very different levels of technical commitment. Like, seriously different. There are gonna be additional challenges that come into play when you're looking to be able to recapture this state of the world. So one is code changes. So I worked on a payment system in the past. And a big challenge that we had was if you tried to recalculate prior financial statements, they depended on business logic that lived inside the code, hardcoded in the past. And that had changed. And so if you tried to recalculate all that money that you needed to send out to users based on the state of the world that day, you might actually get different values. And that's a problem. And that was not an event-driven system, so this is an issue even in regular systems. But it's something that you really need to understand when you're relying on the ability to rebuild your state of the world from events. And in a similar fashion, another challenge is that that state of the world might not be the same for third-party integrations. Like, that might have changed and an API callout to a third-party provider might return a different set of data, or even an internal provider. Fowler talks about some more strategies for handling these issues. It's not an insurmountable task, but it is a big deal. And the final pattern that Fowler talks about is Command Query Responsibility Segregation, which if you've ever talked to anybody about event-driven architectures, you might, like I would say 50% of the time, they think you're talking about this. And you may or may not be talking about this. So you might be familiar with CRUD. And that's a way of encapsulating logic for creating, reading, updating, deleting a record in a database inside your service. And it's at the heart of Rails controllers, like this is very Railsy. But the idea of CQRS is that instead of thinking about all those things as existing in the same domain, that you can actually split them out. So the service that writes to your system and the service that reads from your system are actually split. And at its simplest, one service is responsible for writing orders. So any method or API call that feels like an order.create, an order.update_taxes, like whatever, that's gonna go into your order updater service. It's gonna go one direction. And reading methods or API calls like get order by ID, those are gonna live in another service. And it's not just an event-oriented architecture thing, you can do this with API calls. But you'll often see event-oriented architectures, you'll see event systems nestled in the diagrams usually at the place where commands are actually written. The writer service, this command handler, is gonna read off of the event stream, process these commands, store them to a write database, and then any queries are gonna happen to a read-only database. When do you want to use it? CRUD is much simpler. Separating out read and write logic into two different services really does add a lot of overhead. The biggest reason I think people use it is performance. So if you're in a system where you have a large difference between reads and writes or a system where your writes are super, super slow compared to your reads, you can optimize performance separately for those systems. But it's an increase in complexity, so you really need to understand the trade-offs. I found this blog post to be pretty good. Succinct, like the title says, when to use and not use CQRS. And I would also, if you're thinking about implementing any of these patterns in your system, take a look at Fowler's blog posts and watch his keynote to get a really deep dive into the trade-offs and considerations for these.

So we talked about what Kafka is. We talked about how this technology can change the way your services communicate with each other. So next, let's talk about two practical considerations that you might run into in integrating events into your Rails applications. It's definitely not a comprehensive list, but it was like, two things that really surprised me when I got started. So the first thing to consider are slow consumers. The most important thing to keep in mind in an event-driven system is that your service needs to be able to process events as quickly as the upstream service produces them. Otherwise, you are gonna drift more slowly and slowly behind and you may not be realizing it because you're not seeing timeouts, you're not seeing API calls fail. You're just slower, you're just at a different state of the world than everybody upstream. And also, you might start to see timeouts. So one place where you will see timeouts are gonna be on your socket connection with the Kafka brokers. If you're not processing events fast enough and completing that round trip, your socket connection can time out. And then reestablishing that actually has a time cost. It's expensive to create those sockets. So that can add latency to an already slower system, making things worse. So if your consumer is slow, how do you speed it up? So there's gonna be a lot of talks at RailsConf today and RailsConf prior about performance in Rails. But a more Kafka-specific example is that you can increase the number of consumers in your consumer group so that you can process more events in parallel. So what does that look like in practice? So you remember in racecar, you run a consumer process by passing in a class name like UserEventConsumer. If you're using a procfile for something like foreman or on Heroku, you can actually start racecar with the same class multiple times. And it's gonna automatically have each of those processes have it be an individual consumer that's gonna join the same consumer group. And so you want at least two of these consumer processes running. So if one goes down, you're gonna have the other failed partitions be reassigned. But it also means that you can parallelize work across as many consumers as you have topic partitions. So of course, with any scaling issue, you can't just add consumers forever. Eventually you're going to hit scaling limits on shared resources like databases. So you just can't scale forever, but it will give you a little bit of wiggle room. And if you don't actually care about the strict ordering inside those topic partitions, you can use a queuing system like Sidekiq to help manage work in parallel and take a little bit of pressure off of the system. And it's extremely valuable, in this case, to have metrics and alerting like paging around how far behind you are from when an event was added to the queue. So Ruby Kafka is instrumented with ActiveSupport notifications, but it also has a StatsD and Datadog reporters that are automatically included. And you want to know if you're drifting a certain amount behind when the events are added. And the Ruby Kafka documentation, which is also super good, has a lot of suggestions for what you should monitoring in your Rails systems. And going one step further, on my team, before we put a new service into production, 'cause we use Kafka for a lot of our services, we'll run gameday failure scenarios on staging but consuming off the production stream where we're move our offset back by a certain amount to pretend like our service has been down for a day or two and see how long it takes us to catch up. Because if you have a serious outage on your downstream service, you're gonna want to know, how much wiggle room do you have? And talk about what failure modes are acceptable for your service. Because if you're running an order service, for example, you might want to optimize for finishing those orders no matter what. It might take you four hours, it might take you a day. But you want it to be done. But if you're running a chat bot service, users are going to be extremely confused if four hours later they start seeing things appear inside of their chat. So you'll want to talk about what that means to process data at a delay. And there are also, so a big thing in Kafka to talk about is this exactly once versus at least once. You can actually design a Kafka system to have an exactly once guarantee with newer versions of Kafka. So it means that consumers could assume that a message in a Kafka queue has only been sent once. There's never a chance that messages will be not sent at all or that it'll be sent more than once. And Confluent has a blog that talks a little bit more about this. But the reality is with Ruby Kafka, you should assume that your messages will be delivered at least once. So it's gonna be there, but you might see it more than once. So it's really important. And one thing that this means is that you need to design consumers to expect duplicated events. So you can either rely on your database and use UPSERT for item potency. Or, hmm. Yes, thank you so much. Got the dance in too. All right, so design your consumers for failure. Rely on UPSERT to lean on your database. Or you can include a unique identifier in each event. And you could just skip events that you've already seen before. But either of those things are gonna make your application more resilient in the face of failure. Because you can move back an offset and replay events and not be worried that you're gonna duplicate the same one twice. And the second thing that was really surprising to me was Kafka's very permissive attitude towards data. You can send anything in bytes and it's gonna send it back out, it's not gonna do any verification. And this feature makes it extremely flexible because you don't need to adopt a specific format, like a serialization format, to get started. Like we talked about before, freedom isn't free. Freedom is a blessing and a curse. What happens when a service upstream decides to change an event that it produces? If you just change that event payload, there's a really good chance that one of your downstream consumers is gonna break and your colleagues are gonna be really upset. And they're gonna start seeing exceptions, it could take down their service. But you have no idea because you didn't really know that anybody was using that data. So before you start using events in your architecture, choose a data format, preferably one, not multiple ones, and evaluate how that data format can help you register schemas and evolve them over time. It's an issue in RPC systems as well. But the explicitness of those calls means that somebody changing an upstream service has a built-in method for being like, oh wait, somebody might actually be using this information. Especially when that's like an outgoing event. Schema registries can provide a centralized view of schemas that are in use in your system. This is also, it's just like much easier to think about validation, about evolution of schemas, before you actually do all of this event-driven architecture than afterwards. And there are a lot of different data formats. But I'm gonna talk about two. JSON, which most people in this room are probably familiar with. It's the format of the web. And Avro, which might be new to some of you. And I know that there are probably at least two Australians in the audience. There's like a 75% chance I'm gonna slip up on the next two slides and call it Arvo, but I do not mean afternoon, I mean Avro. So ignore me. So JSON's a pretty common data format. Pros: human readable, super nice. As a human, I really appreciate that. And there's also JSON support in basically every language. But there are a couple of cons. The payloads, like the actual size of JSON payloads, can be really large. It requires you to send keys along with the values, which is nice 'cause it's flexible, so it doesn't matter what the order is in. But it's also the same for every payload. So when you send them over, it can mean that those payloads are larger than other formats. So if size is a big concern for you, JSON could be a bad fit. There's still not built-in documentation inside that payload. So you just see a value, but you don't actually know what it means. And schema evolution is challenging. There's no built-in support for aliasing one key to another if you want to rename a field or sending a default key value so that you can evolve over time. So Confluent, they're the team that built Apache Kafka. And they now have a hosted Kafka product. They recommend using Avro, like many other protocol formats. It's not human readable over the wire, you have to serialize and de-serialize it because it's sent in binary. But the upside is that there is super robust schema support. A full Avro object is sent over and it includes this idea of the schema and the data. So it has support for types. It's got primitive ones like ints, complex ones like dates. And it includes embedded documentation, so you can say what each of those fields actually do or mean in your system. And it has some built-in tools that help you evolve your schemas in backwards-compatible ways over time. So that's pretty cool. Avro::Builder is a gem created by Salsify. And that actually creates a very Ruby-ish DSL that helps you create these schemas. So if you're curious to learn more about Avro, the Salsify engineering blog here has a really great write-up on Avro and Avro::Builder.

So that is it for my talk today. I have a couple more slides though, so don't start leaving. I always feel really weird when people are like, and thank you, and then they have a couple more slides. But, so if you're curious to learn more actually about Heroku and how we run our hosted Kafka products or how we use it internally, we do have two talks that my co-workers, Jeff and Pavel, have given that you can watch. And if you're interested in Postgres, related to Kafka, but different, my co-worker Gabe is giving a talk literally in the next talk spot in room 306 about Postgres 10, performance, and you. And then finally, I will be at the Heroku booth today after this and during lunch if you want to come by and say hi. You can ask me questions, you can get swag. We've got T-shirts and socks. And we also have some jobs, the usual. So yeah, thank you so much.

In recent years, designing software as a collection of services, rather than a single, monolithic codebase, has become a popular way to build applications. In this post, we'll learn the basics of Kafka and how its event-driven process can be used to power your Rails services. We’ll also talk about practical considerations and operational challenges that your event-driven Rails services might face around monitoring and scaling.

What is Kafka?

Suppose you want to know more information about how your users are engaged on your platform: the pages they visit, the buttons they click, and so on. A sufficiently popular app could produce billions of events, and sending such a high volume of data to an analytics service could be challenging, to say the least.

Enter Kafka, an integral piece for web applications that require real-time data flow. Kafka provides fault-tolerant communication between producers, which generate events, and consumers, which read those events. There can be multiple producers and consumers in any single app. In Kafka, every event is persisted for a configured length of time, so multiple consumers can read the same event over and over. A Kafka cluster is comprised of several brokers, which is just a fancy name for any instance running Kafka.

Slide16

One of the key performance characteristics of Kafka is that it can process an extremely high throughput of events. Traditional enterprise queuing systems, like AMQP, have the event infrastructure itself keep track of the events that each consumer has processed. As your number of consumers scales up, that infrastructure will suffer under a greater load, as it needs to keep track of more and more states. And even establishing an agreement with a consumer is not trivial. Should a broker mark a message as "done" once it's sent over the network? What happens if a consumer goes down and needs a broker to re-send an event?

Kafka brokers, on the other hand, do not track any of its consumers. The consumer service itself is in charge of telling Kafka where it is in the event processing stream, and what it wants from Kafka. A consumer can start in the middle, having provided Kafka an offset of a specific event to read, or it can start at the very beginning or even very end. A consumer's ability to read event data is constant time of O(1); as more events arrive, the amount of time to look up information from the stream doesn't change.

Kafka also has a scalable and fault-tolerant profile. It runs as a cluster on one or more servers that can be scaled out horizontally by adding more machines. The data itself is written to disk and then replicated across multiple brokers. For a concrete number around what scalable looks like, companies such as Netflix, LinkedIn, and Microsoft all send over a trillion messages per day through their Kafka clusters!

Setting Kafka up in Rails

Heroku provides a Kafka cluster add-on that can be used in any environment. For Ruby apps, we recommend using the ruby-kafka gem for real-world use cases. A bare minimum implementation only requires you to provide hostnames for your brokers:

# config/initializers/kafka_producer.rb
require "kafka"

# Configure the Kafka client with the broker hosts and the Rails
# logger.
$kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], logger: Rails.logger)

# Set up an asynchronous producer that delivers its buffered messages
# every ten seconds:
$kafka_producer = $kafka.async_producer(
  delivery_interval: 10,
)

# Make sure to shut down the producer when exiting.
at_exit { $kafka_producer.shutdown }

After setting up the Rails initializer, you can start using the gem to send event payloads. Because of the asynchronous behavior of sending events, we can write an event outside the thread of our web execution, like this:

class OrdersController < ApplicationController
  def create
    @comment = Order.create!(params)

    $kafka_producer.produce(order.to_json, topic: "user_event", partition_key: user.id)
  end
end

We'll talk more about Kafka's serialization formats below, but in this scenario, we're using good old JSON. The topic keyword argument refers to the log where Kafka is going to write the event. Topics themselves are divided into partitions, which allow you to "split" the data in a particular topic across multiple brokers for scalability and reliability. It's a good idea to have two or more partitions per topic so if one partition fails, your events can still be written and consumed. Kafka guarantees that events are delivered in order inside a partition, but not inside a whole topic. If the order of the events is important, passing in a partition_key will ensure that all events of a specific type go to the same partition.

Kafka for your services

Some of the properties that make Kafka valuable for event pipeline systems also make it a pretty interesting fault tolerant replacement for RPC between services. Let's use an example of an e-commerce application to illustrate what this means in practice:

def create_order
  create_order_record
  charge_credit_card # call to Payments Service
  send_confirmation_email # call to Email Service
end

Let's assume that when a user places an order, this create_order method is going to be executed. It'll create an order record, charge the user's credit card, and send out a confirmation email. Those last two steps have been extracted out into services.

Slide 50

One challenge with this setup is that the upstream service is responsible for monitoring the downstream availability. If the email system is having a really bad day, the upstream service is responsible for knowing whether that email service is available. And if it isn't available, it also needs to be in charge of retrying any failing requests. How might Kafka's event stream help in this situation? Let's take a look:

Slide 52

In this event-oriented world, the upstream service can write an event to Kafka indicating that an order was created. Because Kafka has an "at least once" guarantee, the event is going to be written to Kafka at least once, and will be available for a downstream consumer to read. If the email service is down, the event is still persisted for it to consume. When the downstream email service comes back online, it can continue to process the events it missed in sequence.

Another challenge with an RPC-oriented architecture is that, in increasingly complex systems, integrating a new downstream service means also changing an upstream service. Suppose you'd like to integrate a new service that kicks off a fulfillment process when an order is created. In an RPC world, the upstream service would need to add a new API call out to your new fulfillment service. But in an event-oriented world, you would add a new consumer inside the fulfillment service that consumes the order once the event is created inside of Kafka.

Slide 54

Incorporating events in a service-oriented architecture

In a blog post titled "What do you mean by “Event-Driven”," Martin Fowler discusses the confusion surrounding "event-driven applications." When developers discuss these systems, they can actually be talking about incredibly different kinds of applications. In an effort to bring a shared understanding to what an event-driven system is, he's started defining a few architectural patterns.

Let's take a quick look at what these patterns are! If you'd like to learn more about them, check out his keynote at GOTO Chicago 2017 that covers these in-depth.

Event Notification

The first pattern Fowler talks about is called Event Notification. In this scenario, one service simply notifies the downstream services that an event happened with the bare minimum of information:

{
  "event": "order_created",
  "published_at": "2016-03-15T16:35:04Z"
}

If a downstream service needs more information about what happened, it will need to make a network call back upstream to retrieve it.

Event-Carried State Transfer

The second pattern is called Event-Carried State Transfer. In this design, the upstream service augments the event with additional information, so that a downstream consumer can keep a local copy of that data and not have to make a network call to retrieve it from the upstream service:

{
  "event": "order_created",
  "order": {
    "order_id": 98765,
    "size": "medium",
    "color": "blue"
  },
  "published_at": "2016-03-15T16:35:04Z"
}

Event-Sourced

A third designation from Fowler is an Event-Sourced architecture. This implementation suggests that not only is each piece of communication between your services kicked off by an event, but that by storing a representation of an event, you could drop all your databases and still completely rebuild the state of your application by replaying that event stream. In other words, each payload encapsulates the exact state of your system at any moment.

An enormous challenge to this approach is that, over time, code changes. A future API call to a downstream service might return a different set of data that previously available, which makes it difficult to recalculate the state at that moment.

Command Query Responsibility Segregation

The final pattern mentioned is Command Query Responsibility Segregation, or CQRS. The idea here is that actions you might need to perform on a record--creating, reading, updating--are split out into separate domains. That means that one service is responsible for writing and another is responsible for reading. In event-oriented architectures, you'll often see event systems nestled in the diagrams at the place where commands are actually written.

Slide 72

The writer service is going to read off of the event stream, process commands, and store them to a write database. Any queries happen on a read-only database. Separating out read and write logic into two different services adds an increase in complexity, but it does allow you to optimize performance separately for those systems.

Practical considerations

Let's talk about a few practical considerations that you might run into while integrating Kafka into your service-oriented application.

The first thing to consider are slow consumers. In an event-driven system, your services need to be able to process events as quickly as the upstream service produces them. Otherwise, they will slowly drift behind, without any indication that there's a problem, because there won't be any timeouts or call failures. One place where you can identify timeouts will be on the socket connection with the Kafka brokers. If a service is not processing events fast enough, that connection can timeout, and reestablishing it has an additional time cost since it's expensive to create those sockets.

If a consumer is slow, how do you speed it up? For Kafka, you can increase the number of consumers in your consumer group so that you can process more events in parallel. You'll want at least two consumer processes running per service, so that if one goes down, any other failed partitions can be reassigned. Essentially, you can parallelize work across as many consumers as you have topic partitions. (As with any scaling issue, you can't just add consumers forever; eventually you're going to hit scaling limits on shared resources, like databases.)

It's also extremely valuable to have metrics and alerts around how far behind you are from when an event was added to the queue. ruby-kafka is instrumented with ActiveSupport notifications, but it also has StatsD and Datadog reporters that are automatically included. You can use these to report on whether you're lagging behind when the events are added. The ruby-kafka gem even provides a list of recommended metrics to monitor!

Another aspect to building systems with Kafka is to design your consumers for failure. Kafka is guaranteed to send an event at least once; there's never a chance that messages will not be sent at all. But you need to design consumers to expect duplicated events. One way to do that is to always rely on UPSERT to add new records to your database. If a record already exists with the same attributes, the call will essentially be a no-op. Alternatively, you can include a unique identifier to each event, and just skip operating on events that have already been seen before.

Payload formats

One surprising aspect to Kafka is its very permissive attitude towards data. You can send it anything in bytes and it will simply send that back out to consumers without any verification. This feature makes its usage extremely flexible because you don't need to adopt a specific format. But what happens when an upstream service decides to change an event that it produces? If you just change that event payload, there's a really good chance that one of your downstream consumers will break.

Before you begin adopting an event-driven architecture, choose a data format and evaluate how it can help you register schemas and evolve them over time. It's much easier to think about validation and evolution of schemas before you actually implement them.

One format to use is, of course, JSON, the format of the web. It's human readable and supported in basically every programming language. But there are a few downsides. For one, the actual size of JSON payloads can be really large. The payloads require you to send key-value pairs, which are flexible, but are also often duplicated across every event. There's no built-in documentation inside a payload, such that, given a value, you might not know what it means. Schema evolution is also a challenge, since there's no built-in support for aliasing one key to another if you need to rename a field.

Confluent, the team that built Apache Kafka, recommends using Avro as a data serialization system. The data is sent over in binary, so it's not human-readable. But the upside is that there is a more robust schema support. A full Avro object includes its schema and its data. Avro comes with support for simple types, like integers, and complex ones, like dates. It also embeds documentation into the schema, which allows you to comprehend what a field does in your system. It provides built-in tools that help you evolve your schemas in backwards-compatible ways over time.

avro-builder is a gem created by Salsify that offers a very Ruby-ish DSL to help you create your schemas. If you're curious to learn more about Avro, the Salsify engineering blog has a really great writeup on avro and avro-builder!

More information

If you're curious to learn more about how we run our hosted Kafka products, or how we use Kafka internally at Heroku, we do have two talks other Heroku engineers have given that you can watch!

Jeff Chao's talk at DataEngConf SF '17 was titled "Beyond 50,000 Partitions: How Heroku Operates and Pushes the Limits of Kafka at Scale," while Pavel Pravosud spoke at Dreamforce '16 about " Dogfooding Kafka: How We Built Heroku's Real-Time Platform Event Stream." Enjoy!

Originally published: January 14, 2019

Browse the archives for engineering or all blogs Subscribe to the RSS feed for engineering or all blogs.