package kafka.examples.manju;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class CustomPartitionProducer {
private static Producer<String, String> producer;
public CustomPartitionProducer() {
Properties props = new Properties();
// Set the broker list for requesting metadata to find the lead broker
// This specifies the serializer class for keys
props.put("serializer.class", "kafka.serializer.StringEncoder");
// Defines the class to be used for determining the partition
// in the topic where the message needs to be sent.
props.put("partitioner.class", "kafka.examples.ch4.SimplePartitioner");
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
public static void main(String[] args) {
int argsCount = args.length;
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentExcept
"Please provide topic name and Message count as arguments");
// Topic name and the message count to be published is passed from the
// command line
String topic = (String) args[0];
String count = (String) args[1];
int messageCount = Integer.parseInt(count);
System.out.println("Topic Name - " + topic);
System.out.println("Message Count - " + messageCount);
CustomPartitionProducer simpleProducer = new CustomPartitionProducer();
simpleProducer.publishMessage(topic, messageCount);
private void publishMessage(String topic, int messageCount) {
Random random = new Random();
for (int mCount = 0; mCount < messageCount; mCount++) {
String clientIP = "192.168.14." + random.nextInt(255);
String accessTime = new Date().toString();
String message = accessTime + ",," + clientIP;
// Creates a KeyedMessage instance
KeyedMessage<String, String> data =
new KeyedMessage<String, String>(topic, clientIP, message);
// Publish the message
// Close producer connection with broker.
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions =
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages
to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana will always go to last
partition// Other records will get hashed to the rest of the
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
public void close() {}