KAFKA DOCUMENTATION
Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.
TOPIC:
A particular stream of Data
* Similar to a table in a database(without all constraints)
* You can have as many topics as you want
* A topic is identified by its name
*Topics are split into Partitions
*Each partition is ordered
*Each message within a partition gets on incremental id, called offset
Example:
*say you have a fleet of trucks, each truck reports it’s GPS position to Kafka
*You can have a topic truck-GPS that contains the position of all trucks
*Each truck will send a message to Kafka every 20 seconds, each message will contain the Truck ID
& Truck position(latitude & longitude)
*we choose to create that topic with 10 partitions(orbitary number)
*offset only have a meaning for a specific partition, Eg offset 3 in partition 0 doesn't represent the same data as offset 3 in partition 1
*Order is guaranteed only within a partition(not across partitions)
*Data is kept only for a limited time(default is one week)
*once the data is written to a partition, it can’t be changed(Immutability)
*Data is assigned randomly to a partition unless a key is provided(more on this later)
BROKERS:
*A Kafka cluster is composed of multiple brokers(Servers)
*Each broker is identified with its ID(integer)
*Each broker contains certain topic partitions.
*After connecting to any broker ( called a bootstrap broker), you will be connected to the either cluster
*A good number to get started is 3 brokers, but some big clusters have over 100 brokes
*In these examples we choose to number brokers starting at 100(orbitary)
|
| ||||
| |||||
BROKER & TOPICS
* Example of Topic-A with 3 Partitions
* Example of Topic-B with 2 Partitions
*NOTE:Data is distributed & Broker 103 doesn’t have any Topic B data.
Topic Replication Factor
* Topics should have a replication factor >1(usually between 2 & 3)
*This way if a broker is down, another broker can serve the data
*Example: Topic-A with 2 partitions & replication of 2
*Example: we lost Broker 102
*Result: Broker 101 & 103 can still serve the data
*At any time only ONE broker can be a leader for a given partition.
*only that leader can receive & serve data for a partition
*The other brokers will synchronize the data
*Therefore each partition has one leader & multiple ISR(in-sync-replica)
PRODUCERS
*Producers write data to topics(which is made of partitions)
*Producers automatically know to which broker & partition to write to.
*In case of broker failures, producers will automatically recover
*Producers can choose to receive acknowledgment of data writes:
*acks=0 producer Don’t wait for an acknowledgment (possible data loss)
*acks=1 Producer will wait for leader acknowledgment (limited data loss)
*acks=all leader +replicas acknowledgment(no data loss)
Producers: Message keys
*Producer can choose to send a key with the message (String, number, etc)
*If key=null, data is sent round-robin (broker 101 then 102 then 103...)
*If the key is sent, then all messages for that key will always go to the same partition
*A key is basically sent if you need message orbitary for a specific field(ex:trick_id)
CONSUMERS & CONSUMER GROUPS
CONSUMERS
*Consumers read data from a topic (Identified by name)
*Consumers Known which broker to read from
*In case of broker failures, Consumes Know how to recover
*Data is read in order with each Partition
CONSUMER GROUP
*Consumers read data in consumer groups
*Each Consumer within a group reads from exclusive Partitions.
*If you have more consumers than Partitions, some consumers will be inactive
CONSUMER OFFSETS
*Kafka stores the offsets at which a consumer group has been reading.
*The offsets committed live in a Kafka topic named_consumer_offsets(_ _)
*When a consumer in a group has processed data received from Kafka, it should be committing the offsets
*if a consumer disc,it will be able to read back from where it left off thanks to the committed consumer offsets
Delivery Semantics for Consumers
*Consumers choose when to commit offsets
*There are 3 delivery semantics:
*At most once:
>Offsets are committed as soon as the message is received.
>if the processing goes wrong, the message will be lost (it won’t be read again)
*At least once(usually Preferred):
>offsets are committed after the message is processed.
>If the processing goes wrong, the message will be read again.
>This can result in duplicate processing is idempotent(i.e processing again the messages won’t impact your systems).
*Exactly Once:
>Can be achived for kafka => Kafka workflows using Kafka streams API
>For kafka => External system workflows, use an idempotent consumer.
Kafka Broker Discovery
· Every Kafka broker is also called a “bootstrap server”
· That means that you only need to connect to the entire cluster
· Each broker knows about all brokers, topics & partition(metadata)
ZOOKEEPER:
· Zookeeper manages brokers(keeps a list of then)
· Zookeeper helps in performing leader election for partitions
· Zookeeper sends notifications to kafka in case of changes(e.g new topic, broker dies, broker comes up,delete toipc, etc...)
· kafka can’t work witout zookeeper.
· zookeeper by design operates with an odd number of servers(3,5,7)
· Zookeeper has a leader (handle writes) the reset of the servers are followers
· zookeeper does not store consumer offsets with kafka >v0.10
·
KAFKA GUARANTEES:
· Messages are appended to a topic-partition in the order they are sent.
· Consumers read messages in the order stored in a topic-partition.
· With a replication factor of N, Producers & Consumers can tolerate upto N-1 brokers being down.
· This is why a replication factor of 3 is a good idea.
· Allows for one broker to be taken down for maintenance.
· Allows for another broker to be taken down unexpectedly.
· As long as the number of partitions remains constant for a topic (no new partitions)the same key will always go to the same partition
COMMANDS TO START KAFKA
Zookeeper server starts
zookeeper-server-start.sh config/zookeeper.properties
#Kafka Topic
kafka-server-start.sh config/server.properties
#Create Topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic third-topic --create --partitions 3 --replication-factor 1
#To check kafka topic created or not
kafka-topics.sh--zookeeper 127.0.0.1:2181 --list
#To check the details of topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --describe
#Delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181--topic second_topic --delete
#kafka console producer CLI
$kafka-console-producer.sh(It shows list)
#Create Message in producer
kafka-console-prpducer.sh--broker-list 127.0.0.1:9092 --topic first_topic
>Hello
>How are You
#Creating producer property
Kafka -console-producer --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all
#Automatically create topic if given the wrong name in that name one new topic is created
kafka-console-producer --broker-list 127.0.0.1:9092 --topic new_topic
>hey this topic does not exit!
WARN[producer clientID = console-producer
>another message
#check the list of kafka topics
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
#By default to set the partition
3 or more
in nano config/server.properties
#kafka console consumer CLI
kafka-console-consumer.sh
kafka-console-consumer.sh --bootstarp-server 127.0.0.1:9092 --topic first-topic
kafka-console-consumer.sh --bootstarp-server 127.0.0.1:9092 --topic first-topic --from -begining
#kafka consumers in Group(From IDE)
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first-topic --group my-first-application
$kafka-consumer-groups.sh
$kafka--consumer-groups --bootstraps-server localhost:9092 --list
my-first-application
console-consumer-10824
my-second-application
console-consumer-1052
#Resetting offsets
How to do restart(--to-datetime,--by-period,--to-earliest,--to-latest,--shift-by,--from-file,--to-current)
$kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic
$kafka-consumer-groups.sh --bootstrap.server localhost:9092 --decribe –group my-first-application
#Shift-by(which it takes back messages)
kafka-consumer-groups –bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic
Kafka Producer:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_
˓→send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
Kafka Consumer:
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
#acks &min.insync replicas
producers Acks Deep Drive
acks =0(no acks)
· No response is requested
· If the broker goes offline or an exception happens, we want to know & will lose data
· Useful for data where it’s okay to potentially lose messages:
#Metrics collection
#Log Collection
· Leader response is requested, but replication is not a guarantee(happens in the background)
· If an ack is not received,the producer may retry
·
· If the leader broker goes offline but replicas haven’t replicated the data yet, we have a data loss.
· Leader + Replicas ack requested
#Retries & max.in.flight.requests.per.connection
Producer retries
In case of transient failures,developerrs are expected to handle exceptions,otherwise the data will be lost
>Example of transient failure.
>Not Enough ReplicationsException
There is a “retries” setting
>defaults to 0
>you can increase to a high number, ex Integer.MAX_VALUE
· In case of retries, by default, there is a chance that messages will be sent out of order(if a batch has failed to be sent)
· If you rely on key-based ordering, that can be an issue.
· For this, you can set the security while controls how many produce requests can be made in parallel: max.in.flight.requests.per.connection
.Default :5
.set it to 1 if you need to ensure ordering (may impact throughput)
· In kafka>=1.0.0,there a better solution!
#Idempotent Producer
*Here’s the problem: the producer can introduce duplicate messages in kafka due to network errors.
*In Kafka >=0.11 you can define an “idempotent producer” which won’t introduce duplicates on network error
*idempotent producers are great to guarantee a stable & safe pipeline!
*They come with:
#retries = Integer.MAX_VALUE (2{31-1=214783647)
#max.in.flight.requests=1(kafka >0.11<1.1) or
#max.in.flight.requests=5(kafka>1.1-higher performance)
#acks=all
#just set:
*ProducerProps.Put(“enable.idempotence”,true);
Safe producer summery & Demo
kafka<0.11
.acks=all(producer level)
.Ensures data is property replicated before an ack is received
.min.insync.replicas=2(broker/topic level)
.Encures two brokers in ISR at least have the data after an ack
.retries=MAX_INT(Producer level)
.Ensures transient errors are retried intefinetly
.max.in.flight.request.per.connection=1(producer level)
.Ensures only one request is tried at any time,preventing message re-ordering in case retries.
Kafka>=0.11
.enable.idempotence=true(Producer level) + min.insync.replicas=2(broker/topic level)
.Implies acks =all,retries = MAX_INT,max.in.flight.requests.per.connection=5(default)
.while keeping ordering guarantess & improving performance!
.Running a “safe producer”might impact throughput & latency,always test for your use case.
PRODUCER COMPRESSION
Message Compression
*Producer usually send data this is text-based, for example with JSON data.
*In this case, it is important to apply compression to the producer
*Compression is enabled at the producer level & doesn’t require any configuration change in the Brokers or in the consumers
*” Compression.type” can be ‘name(default),’gzip’, ’snappy’
*compression is more effective the bigger the batch of message being sent to kafka
*The compressed batch has the following advantage:
Much smaller producer request size(compression ratio up to 4x!)
*Faster to transfer data over the network =>less latency
*Better throughput
*Better disk utilison in Kafka (stored messages on disk are smaller)
Disadvantages(very minor):
*Producers must commit some CPU cycles to compression
*Consumers must Commit some CPU cycles to decompression
*Overall:
*Cosider testing snappy for optimal speed/compression ratio
Message Compression Recommendations
*Find a compression algorithm that gives you the best peformance for your specific data test all of them!
*Always use compression in production & especially if you have high throughput
*Consider tweaking linger.ms & batch.size to have bigger batches & therefore more compression & higher throughput
Linger.ms & batch.size
.By default,kafka tries to send records as soon as posible
>It will have up to 5 requests in flight, meaning up to 5 messages individually sent at the some time.
>After this, if more messages have to be sent while others are in flight, Kafka is smart & will start batching them while they wait to send them all at once.
>This smart batching allows kafka to increase throughput while maintaining very low latency
>Batches have higher compression ratio so better efficiency
>So how can we control the batching mechanism?
*Linger .ms: Number of milliseconds a producer is willing to wait before sending a batch out(default 0)
*By introducing some lag (for example linger.ms=5),we increase the chances of messages being sent together in a batch
*So at the expense of introducing a small delay,we can increase throughput compression & efficiency of our producer
*If a batch is full (see batch.size)before the end of the linger.ms period, it will be sent to kafka right away!
Batch Size
*batch.size:Maximum number of bytes that will be included in a batch. The default is 16 KB
*Incrreasing a batch size to something like32KB or 64KB can help to increase the compression, throughput & efficiency of requests
*Any message that is bigger than the batch size will not be batched
*A batch is allocated per partition, so make sure that you don’t set it to a number that’s too high,otherwise you run waste memory!
*(Note: you can monitor the average batch size metric using Kafka producer Metrics)
High Throughput Producer demo
*we’ll add snappy message compression in our producer
*Snappy is very helpful if your message are text based, for example, log lines or JSON documents
*Snappy has a good balance of CPU /Compression ratio
*we’ll also increase the batchsize to 32KB & introduce a small delay through linger ms (20ms)
High throughput latency expences & CPU Usage
Properties.setProperty(ProducerConfig.COMPARESSION TYPE_CONFIG,”snappy”);
Properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,”20”)
Properties.setProperty(ProducerConfig.Batch_SIZE_CONFIG,Integer to String(32*1024); //32 kilobyte size
Producer Default Partition & how keys are hashed
*By default, your keys are hushed using the “murmur2” algorithm.
*It is most likely preferred to not override the behavior of the partitioner, but it is possible to do so(partitioner.class)
*The formula is:
targetPartition =utils.abs(Utils.murmur2(recrd.key()))%numpartitions;
*This means that the same key will go to the same partition (we already know this),& adding partitions to a topic will completely alter the formula.
Max.block.ms & buffer.memory
*If the producer produces faster than the broker can take,the records will be buffered in memory
*buffer.memory=33554432(32MB):the size of the send buffer
*That buffer will fill up over time & fill back down when the throughput to the broker increases
*If that buffer is full(all 32MB),then the send() method will start to block(won’t return right away)
*max.block.ms=6000:the time the .send() will block untill throwing an exception.Exceptions are basically thrown when
>The producer has filled up its buffer
>The broker is not accepting any new data
>60 seconds has elapsed
*If you hit an exception hit that usually means your brokers are down or overloaded as they can’t respond to requests.
Consumer poll Behavior
.Kafka consumers have a “poll” model,while many other messing buses in enterprises have a” push” model
.This allows consumers to control wherein the log they want to consume, how fast & gives them the ability to reply events.
Consumer poll Behaviour
*Fetch.min.bytes(default 1):
>controls how much data you want to pull at least on each request
>Helps improving throughput & decreasing request number
>At the cost of latency
*Max.poll.records(default 500):
>Controls how many records to receive per poll request
>Increase if your messages are very small & have a lot of available RAM
>Good to monitor how many records are polled per request
*Max.partitions.fetch.bytes(default 1MB)
>Maximum data returned by the broker per partition
>If you read from 100 partitions, you’ll need a lot of memory(RAM)
*Fetch.max.bytes(default 50MB)
>Maximum data returned for each fetch request(covers muliple Partitions)
*Change these setting only if your consumer maxes out on throughput already
Consumer offset commits strategies
*There are two most common patterns for committing offsets in a consumer application
*2 Strategies
>(easy) enable.auto.commit=true &synchronous processing of batches
>(medium)enable.auto.commit=false & manual commit of offsets
>enable.auto.commit = true & synchronous processsing of batches
while(true){
List<Records>batch = consumer.poll(Duration.ofMillis(100))
do sometingSynchronous(batch)
}
*with auto-commit, offsets will be committed automatically for you at regular interval
(auto.commit.interval.ms=500 by default)
every-time you call .poll()
*If you don’t use synchronous processing, you will be in “at-most-once” behavior because offsets will be committed before your data is processed.
*enable.auto.commit =false & synchronous processing of batches
while(true){
batch + = Consumer.poll(Duration.ofMillis(100)
if is Ready(batch){
doSomethingsynchronous(batch)
Consumer.CommitSync();
}
}
*you control when you commit offsets & whats the condition for committing them.
*Example: accumulating records into a buffer & then flushing the buffer to a database + committing offsets then.
Consumer offset Reset Behaviour
*The behavior for the consumer is to then use:
>auto.offset.reset = latest:will read the end of the log
>auto.offset.reset = earliest:will read from the start of the log
>auto.offset.reset = none:will throw exception if no offset is found
*Additionally consumer offsets can be lost:
>If a consumer hasn’t read new data in 1 day(kafka < 2.0)
>If a consumer hasn’t read new data in 7 days(kafka>=2.0)
*This can be controlled by the broker settting offset.retention.minutes
*To reply data for a consumer group:
>Take all the consumers from a specific group down
>use kafka -consumer- groups command to set offset to what you want
>Restart Consumers
Bottom line
*Set proper data retention period & offset retention period & offset retention period.
*Ensure the auto offset rest behavior is the one you except/want
kafka-consumer-groups –bootstrap-server 127.0.0.1:9092 –group kafka-demo-elasticsearch –reset-offsets –execute –to -earliest –topic manju1
Consumer Heartbeat Thread
*Heartbeats are sent periodically to the broker
*If no heartbeat is sent during what period the consumer is considered dead
*Set even lower to faster consumer rebalances
*Heartbeat.interval.ms(default 3 seconds):
>How often to send Heartbeats
>usually set to 3rdof session.timeout.ms
*Take-away: This mechanism is used to detect a consumer application being down
Consumer Poll Thread
*max.poll.intervals.ms(default 5 minutes):
*Maximum amount of time between two .poll() calls before declaring the consumer dead.
*This is particularly relevant for BigdData frameworks like the spark in case of the processing tasks time
*Take-away: This mechanism is used to detect a data processing issue with the Consumer.