Sunday 16 September 2018

suscribe

 Map(new TopicPartition(topic, partition) -> 2L)
    val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams,offsets))


override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {

 var partition = 0

 val keyInt = Integer.parseInt(key.asInstanceOf[String])

 val tripData = value.asInstanceOf[String]

 //Gets the UserType from the message produced

 val userType = tripData.split(",")(12)

 //Assigns the partitions to the messages based on the user types

 if ("Subscriber".equalsIgnoreCase(userType)) {

  partition = 0;

 } else if ("Customer".equalsIgnoreCase(userType)) {

  partition = 1;

 }

 println("Partition for message " + value + " is " + partition)

 partition

}







import kafka.producer._
import kafka.utils._
import java.util._
import java.text._
import java.util.concurrent.atomic._

class KafkaPartitioner(props: VerifiableProperties = null) extends Partitioner {
  
  val counter = new AtomicInteger(0)
  val batch = new AtomicInteger(0)
  val partition = new AtomicInteger(0)
  
  def partition(key: Any, numPartitions: Int): Int = {
    //round robin partitioner to smooth producers' traffic on all partitions
    //change partition every X messages where X corresponds to kafka producer batch message size
    if(batch.incrementAndGet % BATCH_NUM_MESSAGES == 1) {
      partition.set(math.abs(counter.incrementAndGet) % numPartitions)
      batch.set(0)
    }
    partition.get
  }
}



















    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class SensorProducer {
                                    
    public static void main(String[] args) throws Exception{
                                    
        String topicName = "SensorTopic";
                                    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "SensorPartitioner");
            props.put("speed.sensor.name", "TSS");
                                        
            Producer<String, String> producer = new KafkaProducer<>(props);
                                        
            for (inti=0 ;i<10 ; i++)
                producer.send(new ProducerRecord<>(topicName,"SSP"+i,"500"+i));
                                        
            for (inti=0 ;i<10 ; i++)
                producer.send(new ProducerRecord<>(topicName,"TSS","500"+i));
                                        
            producer.close();                                        
            System.out.println("SimpleProducer Completed.");
        }
    }                                               
                            





    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.utils.*;
    import org.apache.kafka.common.record.*;
                                    
  public class SensorPartitioner implements Partitioner {
                                    
 private String speedSensorName;
                                    
 public void configure(Map<String, ?> configs) {
speedSensorName=configs.get("speed.sensor.name").toString();                                    
        }
                                        
 public intpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                                    
List<PartitionInfo> partitions =cluster.partitionsForTopic(topic);
  intnumPartitions = partitions.size();
  intsp = (int)Math.abs(numPartitions*0.3);
  int p=0;
                                    
if ( (keyBytes == null) || (!(key instanceof String)) )
 throw new InvalidRecordException("All messages must have sensor name as key");
                                    
if ( ((String)key).equals(speedSensorName) )
               
 p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
            else
                
p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ;
                                    
System.out.println("Key = " + (String)key + " Partition = " + p );
            return p;
        }
        public void close() {}                                    
    }                                           

                            

2 comments: