Being able to pass data around within a distributed system is the one of the the most crucial aspects of the success for your business, especially when you are dealing with large number of users, reads and writes. It's usual that for a given data write for an entity, you will have N number of read patterns, not just one. Apache Kafka is one of the most effective ways to enable that data distribution within a complex system. I have had the chance to use Kafka at work for more than a year now. However, it has always been implicit and I never needed to understand its intrinsic semantics (standing on the shoulders of giants). I have spent this extended weekend reading the Kafka documentation and running some local examples with Kafka to understand it in details, not just at a high level.
Kafka already has a great documentation, which is very detailed and clear. The intention with this post is not to replicate that document. Instead, it's to pull out bits and pieces which helped me understand Kafka better, and increased by trust. As it has been said in Batman Begins movie (which is one of my all-time favourites): "You always fear what you don't understand", and the main outcome here is to remove that fear :) The post is written by a someone, which is me, who has previous experience with messaging systems such as RabbitMQ, Amazon SQS, and Azure Service Bus. So, I might be overlooking some important aspects which you may also need if you don't have this background. If that's the case, it might be useful to first understand some use cases where Kafka might fit in.
Let's first understand some of the high level concepts of Kafka, which will allow us to get started and work on a sample later on. This is by all means not an exhaustive list of concepts in Kafka but will be enough to get us going by allowing us to extract some facts as well as allowing us to make some assumptions with high confidence.
The most important concept of Kafka is a Topic. Topics in Kafka is a place where you can logically group your messages into. When I say logically, I don't mean a schema or anything. You can think of it as just a bucket where your data will end up in the order they appear, and can also be retrieved in the same order (i.e. continually appended to a structured commit log). Topics can be subscribed to by one or more consumers, which we will touch on that a few points later, but this means that Kafka doesn't have exact message queue semantics, which ensures that the data is gone as soon as one consumer processes the data.
These message are called Records, which are durably persisted in the Kafka cluster regardless of the fact that they have been consumed or not. This differentiates Kafka from queuing systems such as RabbitMQ or SQS, where messages vanish after they are being consumed and processed. Using Kafka for storing records permanently is a perfectly valid choice. However, if this is not desired, Kafka also give you a retention configuration options to specific how long you want to hold onto records per topic basis.
The records gets into (i.e. written) a topic through a producer, who are also responsible for choosing which record to assign to which partition within the topic. In other words, data sharding is handled by the clients which publish data to a particular topic. Depending on what client you use, you may have different options on how to distribute data across the partitions, e.g. round robin, your custom sharding strategy, etc.
The records within a specific topic are consumed (i.e. read) by a consumer, which is part of a consumer group. Consumer groups allow records to be processed in parallel by the consumer instances (which are associated to that group, and can live in separate processes or machines) with a guarantee that a record is only delivered to one consumer instance. A consumer instance within a consumer group will own one or more partitions exclusively, which means that you can have at max N number of consumer if you have N partitions.
So, based on these, here are some take aways which I was able to further unpack by following up:
hash(key) % number_of_partitions
). It's also
important to know that Kafka will not attempt to automatically redistribute data in any way. So,
this onus is also on you, too.On the data producing side, we need to know the topic name and the approach we need to use to distribute data across partitions (which is likely that your client will help on this with some out-of-the-box strategies, such as round-robin as guaranteed by Confluent clients). Apart from this, we have quite a few producer level configuration we can apply to influence the semantics of data publishing.
When I am working with messaging systems, the first thing I want to understand is how the message delivery and durability guarantees are influenced, and what the default behaviour is for these. In Kafka, I found that this story a bit more confusing that it should probably be, which is due to a few configuration settings to be aligned to make it work in favour of durability to prevent message loss. Here are some important configuration for this:
acks
: This setting
indicates the number of acknowledgments the producer requires for a message publishing to be deemed
as successful. It can be set to 0
, meaning that the producer won't require an ack
from any of the servers and this won't give us any guarantees that the message is received by
the server. This option could be preferable for cases where we need high throughput at the producing
side and the data loss is not critical (e.g. sensor data, where losing a few seconds of data from a
source won't spoil our world). For cases where record durability is important, this can be set
to all
. This means the leader will wait for the full set of in-sync replicas to
acknowledge the record, where the minimum number of required in-sync replicas is configured
separately.min.insync.replicas
:
Quoting from the doc directly: "When a producer sets acks
to
"all
" (or "-1
"), min.insync.replicas specifies the
minimum number of replicas that must acknowledge a write for the write to be considered
successful". This setting is topic level but can also be specified at the broker level. Setting
this to the correct amount is really important and it's set to 1
by default, which
is probably not what you want if you care about durability of your messages and you have replication
factor of >3
for the topic.flush.messages
:
In Kafka, messages are immediately written to the filesystem but by default we only fsync() to sync
the OS cache lazily. This means that even if we have set our acks and min.insync.replicas to
optimise for durability, there is still a theoretical chance that we can lose data with this
behaviour. I explicitly said "theoretical" here as it's quite unlikely to lose data
with appropriate settings to rely on replication for data durability. For instance, with
acks=all
and min.insync.replicas=2
settings for a topic which has
replication factor of 3, we would be losing data after seeing a data write as successfull in cases
of 3 machines (1 leader and 2 replicas) to fail at the same time before having a chance to flush
that particular record to the disk, which is pretty unlikely, and this is why Kafka doesn't
recommend setting this value as well as flush.ms
value. So,
we need to think a bit harder before setting these configuration values as this has some trade-offs
to be thought about:So, a lot to think about here just to get message durability right. The good side of this complexity here is that Kafka is not trying to provide one way to solve all problems, which is not really possible especially when you want to optimise against different aspects (e.g. durability, throughput, etc.) depending the problem at hand. There is some further information on message delivery guarantees in Kafka documentation.
There are some other producer semantics that requires understanding since the consequences of not understanding these might be costly depending on your needs. For example, producer retries is really important to understand correctly as this will have impact on message ordering even within a single partition. Another one is the batch size configuration, which influences how many records to batch into one request whenever multiple records are being sent to the same partition. This might mean that the sends will be performed asynchronously and it may not be suitable for your needs. Finally, the log compaction is another concept which can be really useful to have a prior knowledge on, especially for cases where you publish the current state of an entity to a topic instead of publishing fine-grained events.