Saturday 21 March 2020

Easy to Understand Kafka Importance

KAFKA DOCUMENTATION

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.

TOPIC: 
A particular stream of Data

* Similar to a table in a database(without all constraints)
* You can have as many topics  as you want
* A topic is identified by its name
*Topics are split into Partitions


  

*Each partition is ordered
*Each message within a partition gets on incremental id, called offset

Example:
*say you have a fleet of trucks, each truck reports it’s GPS position to Kafka
*You can have a topic truck-GPS that contains the position of all trucks
*Each truck will send a message to Kafka every 20 seconds, each message will contain the Truck ID
& Truck position(latitude & longitude)
*we choose to create that topic with 10 partitions(orbitary number)
*offset only have a meaning for a specific partition, Eg offset 3 in partition 0 doesn't represent the same data as offset 3 in partition 1
*Order is guaranteed only within a partition(not across partitions)
*Data is kept only for a limited time(default is one week)
*once the data is written to a partition, it can’t be changed(Immutability)
*Data is assigned randomly to a partition unless a key is provided(more on this later)

BROKERS:
*A Kafka cluster is composed of multiple brokers(Servers)
*Each broker is identified with its ID(integer)
*Each broker contains certain topic partitions.
*After connecting to any broker ( called a bootstrap broker), you will be connected to the either cluster
*A good number to get started is 3 brokers, but some  big clusters have over 100 brokes
*In these examples we choose to number brokers starting at 100(orbitary)

 Broker 
  101
 
   Broker
     102
 
Broker
 103
 







BROKER & TOPICS
* Example of Topic-A with 3 Partitions
* Example of Topic-B with 2 Partitions
















*NOTE:Data is distributed & Broker 103 doesn’t have any Topic B data.

Topic Replication Factor
* Topics should have a replication factor >1(usually between 2 & 3)
*This way if a broker is down, another broker can serve the data
*Example: Topic-A with 2 partitions & replication of 2
*Example: we lost Broker 102
*Result: Broker 101 & 103 can still serve the data
*At any time only ONE broker can be a leader for a given partition.
*only that leader can receive & serve data for a partition
*The other brokers will synchronize the data
*Therefore each partition has one leader & multiple ISR(in-sync-replica)

PRODUCERS
*Producers write data to topics(which is made of partitions)
*Producers automatically know to which broker & partition to write to.
*In case of broker failures, producers will automatically recover
*Producers can choose to receive acknowledgment of data writes:
*acks=0 producer Don’t wait for an acknowledgment (possible data loss)
*acks=1 Producer will wait for leader acknowledgment (limited data loss)
*acks=all leader +replicas acknowledgment(no data loss)

Producers: Message keys
*Producer can choose to send a key with the message (String, number, etc)
*If key=null, data is sent round-robin (broker 101 then 102 then 103...)
*If the key is sent, then all messages for that key will always go to the same partition
*A key is basically sent if you need message orbitary for a specific field(ex:trick_id)

CONSUMERS & CONSUMER GROUPS

CONSUMERS
*Consumers read data from a topic (Identified by name)
*Consumers Known which broker to read from
*In case of broker failures, Consumes Know how to recover
*Data is read in order with each Partition

CONSUMER GROUP
*Consumers read data in consumer groups
*Each Consumer within a group reads from exclusive Partitions.
*If you have more consumers than Partitions, some consumers will be inactive

CONSUMER OFFSETS
*Kafka stores the offsets at which a consumer group has been reading.
*The offsets committed live in a Kafka topic named_consumer_offsets(_ _)
*When a consumer in a group has processed data received from Kafka, it should be committing the offsets
*if a consumer disc,it will be able to read back from where it left off thanks to the committed consumer offsets

Delivery Semantics for Consumers
*Consumers choose when to commit offsets
*There are 3 delivery semantics:
            *At most once:
            >Offsets are committed as soon as the message is received.
            >if the processing goes wrong, the message will be lost (it won’t be read again)
          *At least once(usually Preferred):
            >offsets are committed after the message is processed.
            >If the processing goes wrong, the message will be read again.
            >This can result in duplicate processing is idempotent(i.e processing again the messages won’t impact your systems).
     *Exactly Once:
            >Can be achived for kafka => Kafka workflows using Kafka streams API
            >For kafka => External system workflows, use an idempotent consumer.

 Kafka Broker Discovery
·      Every Kafka broker is also called a “bootstrap server”
·     That means that you only need to connect to the entire cluster
·     Each broker knows about all brokers, topics & partition(metadata)




ZOOKEEPER:
·     Zookeeper manages brokers(keeps a list of then)
·     Zookeeper helps in performing leader election for partitions
·     Zookeeper sends notifications to kafka in case of changes(e.g new topic, broker dies, broker comes up,delete toipc, etc...)
·     kafka can’t work witout zookeeper.
·     zookeeper by design operates with an odd number of servers(3,5,7)
·     Zookeeper has a leader (handle writes) the reset of the servers are followers
·     zookeeper does not store consumer offsets with kafka >v0.10
·       

KAFKA GUARANTEES:
·     Messages are appended to a topic-partition in the order they are sent.
·     Consumers read messages in the order stored in a topic-partition.
·     With a replication factor of N, Producers & Consumers can tolerate upto N-1 brokers being down.
·     This is why a replication factor of 3 is a good idea.
·     Allows for one broker to be taken down for maintenance.
·     Allows for another broker to be taken down unexpectedly.
·     As long as the number of partitions remains constant for a topic (no new partitions)the same key will always go to the same partition

COMMANDS TO START KAFKA
Zookeeper server starts
zookeeper-server-start.sh config/zookeeper.properties

#Kafka Topic
 kafka-server-start.sh config/server.properties

#Create Topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic third-topic --create --partitions 3 --replication-factor 1

#To check kafka topic created or not
kafka-topics.sh--zookeeper 127.0.0.1:2181 --list

#To check the details of topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --describe

#Delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181--topic second_topic --delete

#kafka console producer CLI
$kafka-console-producer.sh(It shows list)

#Create Message in producer
kafka-console-prpducer.sh--broker-list 127.0.0.1:9092 --topic first_topic

>Hello
>How are You


#Creating producer property

Kafka -console-producer --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

#Automatically create topic if given the wrong name in that name one new topic is created

kafka-console-producer --broker-list 127.0.0.1:9092 --topic new_topic
>hey this topic does not exit!
    WARN[producer clientID = console-producer
>another message


#check the list of kafka topics
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

#By default to set the partition
3 or more
in nano config/server.properties

#kafka console consumer CLI

kafka-console-consumer.sh

kafka-console-consumer.sh --bootstarp-server 127.0.0.1:9092 --topic first-topic

kafka-console-consumer.sh --bootstarp-server 127.0.0.1:9092 --topic first-topic --from -begining 

#kafka consumers in Group(From IDE)
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first-topic --group my-first-application

$kafka-consumer-groups.sh

$kafka--consumer-groups --bootstraps-server localhost:9092 --list

my-first-application
console-consumer-10824
my-second-application
console-consumer-1052

#Resetting offsets
How to do restart(--to-datetime,--by-period,--to-earliest,--to-latest,--shift-by,--from-file,--to-current)

$kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic

$kafka-consumer-groups.sh --bootstrap.server localhost:9092 --decribe –group my-first-application




#Shift-by(which it takes back messages)

kafka-consumer-groups –bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic


Kafka Producer:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'])

# Asynchronous by default 
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
            record_metadata = future.get(timeout=10)
except KafkaError:
            # Decide what to do if produce request failed...
            log.exception()
            pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')

# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})

# produce asynchronously
for _ in range(100):
            producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
            print(record_metadata.topic)
            print(record_metadata.partition)
            print(record_metadata.offset)

def on_send_error(excp):
            log.error('I am an errback', exc_info=excp)
            # handle exception

# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_
˓→send_error)

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)


Kafka Consumer:

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
   # message value and key are raw bytes -- decode if necessary!
   # e.g., for unicode: `message.value.decode('utf-8')`
   print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)

# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)

# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')


#acks &min.insync replicas

producers Acks Deep Drive
acks =0(no acks)

·     No response is requested
·     If the broker goes offline or an exception happens, we want to know & will lose data
·     Useful for data where it’s okay to potentially lose messages:
            #Metrics collection
            #Log Collection
·     Leader response is requested, but replication is not a guarantee(happens in the background)
·     If an ack is not received,the producer may retry
·       
·     If the leader broker goes offline but replicas haven’t replicated the data yet, we have a data loss.
·     Leader + Replicas ack requested


#Retries & max.in.flight.requests.per.connection

Producer retries
            In case of transient failures,developerrs are expected to handle exceptions,otherwise the data will be lost
            >Example of transient failure.
            >Not Enough ReplicationsException
There is a “retries” setting
            >defaults to 0
            >you can increase to a high number, ex Integer.MAX_VALUE
·     In case of retries, by default, there is a chance that messages will be sent out of order(if a batch has failed to be sent)
·     If you rely on key-based ordering, that can be an issue.
·     For this, you can set the security while controls how many produce requests can be made in parallel: max.in.flight.requests.per.connection
            .Default :5
            .set it to 1 if you need to ensure ordering (may impact throughput)
·     In kafka>=1.0.0,there a better solution!  

#Idempotent Producer
*Here’s the problem: the producer can introduce duplicate messages in kafka due to network errors.

*In Kafka >=0.11 you can define an “idempotent producer” which won’t introduce duplicates on network error

*idempotent producers are great to guarantee a stable & safe pipeline!

*They come with:
            #retries = Integer.MAX_VALUE (2{31-1=214783647)
            #max.in.flight.requests=1(kafka >0.11<1.1) or
            #max.in.flight.requests=5(kafka>1.1-higher performance)
            #acks=all
            #just set:
                        *ProducerProps.Put(“enable.idempotence”,true);

Safe producer summery & Demo

kafka<0.11
            .acks=all(producer level)
            .Ensures data is property replicated before an ack is received
            .min.insync.replicas=2(broker/topic level)
            .Encures two brokers in ISR at least have the data after an ack
            .retries=MAX_INT(Producer level)
            .Ensures transient errors are retried intefinetly
            .max.in.flight.request.per.connection=1(producer level)
            .Ensures only one request is tried at any time,preventing message re-ordering in case retries.

Kafka>=0.11
            .enable.idempotence=true(Producer level) + min.insync.replicas=2(broker/topic level)
            .Implies acks =all,retries = MAX_INT,max.in.flight.requests.per.connection=5(default)
            .while keeping ordering guarantess & improving performance!
            .Running a “safe producer”might impact throughput & latency,always test for your use case.

PRODUCER COMPRESSION
Message Compression
*Producer usually send data this is text-based, for example with JSON data.
*In this case, it is important to apply compression to the producer
*Compression is enabled at the producer level & doesn’t require any configuration change in the Brokers or in the consumers
*” Compression.type” can be ‘name(default),’gzip’, ’snappy’
*compression is more effective the bigger the batch of message being sent to kafka
*The compressed batch has the following advantage:
 Much smaller producer request size(compression ratio up to 4x!)
*Faster to transfer data over the network =>less latency
*Better throughput
*Better disk utilison in Kafka (stored messages on disk are smaller)

Disadvantages(very minor):
*Producers must commit some CPU cycles to compression
*Consumers must Commit some CPU cycles to decompression

*Overall:
            *Cosider testing snappy  for optimal speed/compression ratio

Message Compression Recommendations
*Find a compression algorithm that gives you the best peformance for your specific data test all of them!
*Always use compression in production & especially if you have high throughput
*Consider tweaking linger.ms & batch.size to have bigger batches & therefore more compression & higher throughput


Linger.ms & batch.size
.By default,kafka tries to send records as soon as posible
            >It will have up to 5 requests in flight, meaning up to 5 messages individually sent at the some time.
            >After this, if more messages have to be sent while others are in flight, Kafka is smart & will start batching them while they wait to send them all at once.
            >This smart batching allows kafka to increase throughput while maintaining very low latency
            >Batches have higher compression ratio so better efficiency
            >So how can we control the batching mechanism?
*Linger .ms: Number of milliseconds a producer is willing to wait before sending a batch out(default 0)
*By introducing some lag (for example linger.ms=5),we increase the chances of messages being sent together in a batch
*So at the expense of introducing a small delay,we can increase throughput compression & efficiency of our producer
*If a batch is full (see batch.size)before the end of the linger.ms period, it will be sent to kafka right away!

Batch Size
*batch.size:Maximum number of bytes that will be included in a batch. The default is 16 KB
*Incrreasing a batch size to something like32KB or 64KB can help to increase the compression, throughput & efficiency of requests
*Any message that is bigger than the batch size will not be batched
*A batch is allocated per partition, so make sure that you don’t set it to a number that’s too high,otherwise you run waste memory!
*(Note: you can monitor the average batch size metric using Kafka producer Metrics)

High Throughput Producer demo 
*we’ll add snappy message compression in our producer
*Snappy is very helpful if your message are text based, for example, log lines or JSON documents
*Snappy has a good balance of CPU /Compression ratio
*we’ll also increase the batchsize to 32KB & introduce a small delay through linger ms (20ms)

High throughput latency expences & CPU Usage            
Properties.setProperty(ProducerConfig.COMPARESSION TYPE_CONFIG,”snappy”);
Properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,”20”)
Properties.setProperty(ProducerConfig.Batch_SIZE_CONFIG,Integer to String(32*1024); //32 kilobyte size

Producer Default Partition & how keys are hashed        
*By default, your keys are hushed using the “murmur2” algorithm.
*It is most likely preferred to not override the behavior of the partitioner, but it is possible to do so(partitioner.class)
*The formula is:
targetPartition =utils.abs(Utils.murmur2(recrd.key()))%numpartitions;
*This means that the same key will go to the same partition (we already know this),& adding partitions to a topic will completely alter the formula.

Max.block.ms & buffer.memory
*If the producer produces faster than the broker can take,the records will be buffered in memory
*buffer.memory=33554432(32MB):the size of the send buffer
*That buffer will fill up over time & fill back down when the throughput to the broker increases
*If that buffer is full(all 32MB),then the send() method will start to block(won’t return right away)
*max.block.ms=6000:the time the .send() will block untill throwing an exception.Exceptions are basically thrown when
            >The producer has filled up its buffer
            >The broker is not accepting any new data
            >60 seconds has elapsed
*If you hit an exception hit that usually means your brokers are down or overloaded as they can’t respond to requests.



Consumer poll Behavior     
.Kafka consumers have a “poll” model,while many other messing buses in enterprises have a” push” model
.This allows consumers to control wherein the log they want to consume, how fast & gives them the ability to reply events.

Consumer poll Behaviour
*Fetch.min.bytes(default 1):
            >controls how much data you want to pull at least on each request
            >Helps improving throughput & decreasing request number
            >At the cost of latency
*Max.poll.records(default 500):
            >Controls how many records to receive per poll request
            >Increase if your messages are very small & have a lot of available RAM
            >Good to monitor how many records are polled per request
*Max.partitions.fetch.bytes(default 1MB)
            >Maximum data returned by the broker per partition
            >If you read from 100 partitions, you’ll need a lot of memory(RAM)
*Fetch.max.bytes(default 50MB)
            >Maximum data returned for each fetch request(covers muliple Partitions)
*Change these setting only if your consumer maxes out on throughput already

Consumer offset commits strategies
*There are two most common patterns for committing offsets in a consumer application
*2 Strategies
            >(easy) enable.auto.commit=true &synchronous processing of batches
            >(medium)enable.auto.commit=false & manual commit of offsets
            >enable.auto.commit = true & synchronous processsing of batches

while(true){
            List<Records>batch = consumer.poll(Duration.ofMillis(100))
            do sometingSynchronous(batch)
}

*with auto-commit, offsets will be committed automatically for you at regular interval
(auto.commit.interval.ms=500 by default)
every-time you call .poll()
*If you don’t use synchronous processing, you will be in “at-most-once” behavior because offsets will be committed before your data is processed.
*enable.auto.commit =false & synchronous processing of batches

while(true){
            batch + = Consumer.poll(Duration.ofMillis(100)
            if is Ready(batch){
                        doSomethingsynchronous(batch)
                        Consumer.CommitSync();
            }
}

*you control when you commit offsets & whats the condition for committing them.
*Example: accumulating records into a buffer & then flushing the buffer to a database + committing offsets then.

Consumer offset Reset Behaviour
*The behavior for the consumer is to then use:
            >auto.offset.reset = latest:will read the end of the log
            >auto.offset.reset = earliest:will read from the start of the log
            >auto.offset.reset = none:will throw exception if no offset is found
*Additionally consumer offsets can be lost:
            >If a consumer hasn’t read new data in 1 day(kafka < 2.0)
            >If a consumer hasn’t read new data in 7 days(kafka>=2.0)
*This can be controlled by the broker settting offset.retention.minutes
*To reply data for a consumer group:
            >Take all the consumers from a specific group down 
            >use kafka -consumer- groups command to set offset to what you want
            >Restart Consumers

Bottom line
*Set proper data retention period & offset retention period & offset retention period.
*Ensure the auto offset rest behavior is the one you except/want

kafka-consumer-groups –bootstrap-server 127.0.0.1:9092 –group kafka-demo-elasticsearch –reset-offsets –execute –to -earliest –topic manju1

Consumer Heartbeat Thread
*Heartbeats are sent periodically to the broker
*If no heartbeat is sent during what period the consumer is considered dead
*Set even lower to faster consumer rebalances
*Heartbeat.interval.ms(default 3 seconds):
            >How often to send Heartbeats
            >usually set to 3rdof session.timeout.ms
*Take-away: This mechanism is used to detect a consumer application being down

Consumer Poll Thread
*max.poll.intervals.ms(default 5 minutes):
            *Maximum amount of time between two .poll() calls before declaring the consumer dead.
            *This is particularly relevant for BigdData frameworks like the spark in case of the processing tasks time
*Take-away: This mechanism is used to detect a data processing issue with the Consumer.