Friday 28 September 2018

Here is an example of a custom partitioner

package kafka.examples.manju;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class CustomPartitionProducer {
  private static Producer<String, String> producer;

  public CustomPartitionProducer() {
    Properties props = new Properties();

    // Set the broker list for requesting metadata to find the lead broker
    props.put("metadata.broker.list",
          "192.168.146.132:9092, 192.168.146.132:9093, 192.168.146.132:9094");

    // This specifies the serializer class for keys 
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    
 
// Defines the class to be used for determining the partition 
    // in the topic where the message needs to be sent.
    props.put("partitioner.class", "kafka.examples.ch4.SimplePartitioner");
    
    // 1 means the producer receives an acknowledgment once the lead replica 
    // has received the data. This option provides better durability as the 
    // client waits until the server acknowledges the request as successful.
    props.put("request.required.acks", "1");
    
    ProducerConfig config = new ProducerConfig(props);
    producer = new Producer<String, String>(config);
  }

  public static void main(String[] args) {
    int argsCount = args.length;
    if (argsCount == 0 || argsCount == 1)
      throw new IllegalArgumentExcept
"Please provide topic name and Message count as arguments");

    // Topic name and the message count to be published is passed from the
    // command line
    String topic = (String) args[0];
    String count = (String) args[1];
    int messageCount = Integer.parseInt(count);
    
    System.out.println("Topic Name - " + topic);
    System.out.println("Message Count - " + messageCount);

    CustomPartitionProducer simpleProducer = new CustomPartitionProducer();
    simpleProducer.publishMessage(topic, messageCount);
  }

  private void publishMessage(String topic, int messageCount) {
    Random random = new Random();
    for (int mCount = 0; mCount < messageCount; mCount++) {
    
    String clientIP = "192.168.14." + random.nextInt(255);
 String accessTime = new Date().toString();

    String message = accessTime + ",kafka.apache.org," + clientIP; 
      System.out.println(message);
      
      // Creates a KeyedMessage instance
      KeyedMessage<String, String> data = 
        new KeyedMessage<String, String>(topic, clientIP, message);
      
      // Publish the message
      producer.send(data);
    }
    // Close producer connection with broker.
    producer.close();
  }
}




    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;
    public class BananaPartitioner implements Partitioner {

            public void configure(Map<String, ?> configs) {}
            public int partition(String topic, Object key, byte[] keyBytes,

                           Object value, byte[] valueBytes,
                             Cluster cluster) {
        List<PartitionInfo> partitions =
          cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
 throw new InvalidRecordException("We expect all messages
   to have customer name as key")
if (((String) key).equals("Banana"))
 return numPartitions; // Banana will always go to last
partition
// Other records will get hashed to the rest of the
   partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}

}

Comparison: Spark DataFrame vs DataSets,



1. Spark Release

DataFrame-  In Spark 1.3 Release, dataframes are introduced.
DataSets-  In Spark 1.6 Release, datasets are introduced.

2. Data Formats

DataFrame-  Dataframes organizes the data in the named column. Basically, dataframes can efficiently process unstructured and structured data. Also, allows the Spark to manage schema.
DataSets-  As similar as dataframes, it also efficiently processes unstructured and structured data. Also, represents data in the form of a collection of row object or JVM objects of row. Through encoders, is represented in tabular forms.

3. Data Representation

DataFrame-  In dataframe data is organized into named columns. Basically,  it is as same as a table in a relational database.
DataSets-  As we know, it is an extension of dataframe API, which provides the functionality of type-safe, object-oriented programming interface of the RDD API. Also, performance benefits of the Catalyst query optimiser.

4. Compile-time type safety

DataFrame- There is a case if we try to access the column which is not on the table. Then, dataframe APIs does not support compile-time error.
DataSets- Datasets offers compile-time type safety.

5. Data Sources API

DataFrame- It allows data processing in different formats, for example, AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL.
DataSets- It also supports data from different sources.

6. Immutability and Interoperability

DataFrame- Once transforming into dataframe, we cannot regenerate a domain object.
DataSets- Datasets overcomes this drawback of dataframe to regenerate the RDD from dataframe. It also allows us to convert our existing RDD and data-frames into datasets.

7. Efficiency/Memory use

DataFrame- By using off-heap memory for serialization, reduce the overhead.
DataSets-  It allows to perform an operation on serialized data. Also, improves memory usage.

8. Serialization

DataFrame- In dataframe, can serialize data into off-heap storage in binary format. Afterwards, it performs many transformations directly on this off-heap memory.
DataSets-  In Spark, dataset API has the concept of an encoder. Basically, it handles conversion between JVM objects to tabular representation. Moreover, by using spark internal tungsten binary format it stores, tabular representation. Also, allows to perform an operation on serialised data and also improves memory usage.

9. Lazy Evolution

DataFrame- As same as RDD, Spark evaluates dataframe lazily too.
DataSets- As similar to RDD, and Dataset it also evaluates lazily.

10. Optimization

DataFrame- Through spark catalyst optimizer, optimization takes place in dataframe.
DataSets-  For optimising query plan, it offers the concept of data-frame catalyst optimiser.

11. Schema Projection

DataFrame- Through the Hive meta store, it auto-discovers the schema. We do not need to specify the schema manually.
DataSets-  Because of using spark SQL engine, it auto discovers the schema of the files.

12. Programming Language Support

DataFrame-  In 4 languages like Java, Python, Scala, and R dataframes are available.
DataSets- Only available in Scala and Java.

13. Usage of Datasets and Dataframes

DataFrame-
  • If low-level functionality is there.
  • Also, if high-level abstraction is required.
DataSets- 
  • For high-degree safety at runtime.
  • To take advantage of typed JVM objects.
  • Also, take advantage of the catalyst optimizer.
  • To save space.
  • It required faster execution.

Comparison between Spark RDD vs DataFrame



1. Release of DataSets
RDD – Basically, Spark 1.0 release introduced an RDD API.         

DataFrame-  Basically, Spark 1.3 release introduced a preview of the new dataset, that is dataFrame.

2. Data Formats
RDD- Through RDD, we can process structured as well as unstructured data. But, in RDD user need to specify the schema of ingested data, RDD cannot infer its own.

DataFrame- In data frame data is organized into named columns. Through dataframe, we can process structured and unstructured data efficiently. It also allows Spark to manage schema.

3. Data Representations
RDD- It is a distributed collection of data elements. That is spread across many machines over the cluster, they are a set of Scala or Java objects representing data.

DataFrame-  As we discussed above, in a data frame data is organized into named columns. Basically, it is as same as a table in a relational database.

4. Compile- Time Type Safety
RDD-  RDD Supports object-oriented programming style with compile-time type safety.

DataFrame- If we try to access any column which is not present in the table, then an attribute error may occur at runtime. Dataframe will not support compile-time type safety in such case.

5. Immutability and Interoperability
RDD- RDDs are immutable in nature. That means we can not change anything about RDDs. We can create it through some transformation on existing partitions. Due to immutability, all the computations performed are consistent in nature. If RDD is in tabular format, we can move from RDD to dataframe by to() method. We can also do the reverse by the .rdd method.

DataFrame-  One cannot regenerate a domain object, after transforming into dataframe. By using the example, if we generate one test data frame from tested then, we can not recover the original RDD again of the test class.

6. Data Sources API
RDD- From any data source, e.g. text files, a database via JDBC, etc. , an  RDD can come. Also, can easily handle data with no predefined structure.

DataFrame- In different formats, data source API allows data processing, such as AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL.

7. Optimization

RDD-  There was no provision for optimization engine in RDD. On the basis of its attributes, developers optimise each RDD.

DataFrame- By using Catalyst Optimizer, optimization takes place in dataframes. In 4 phases, dataframes use catalyst tree transformation framework

By Analysis
With logical plan optimization
By physical planning
With code generation to compile parts of the query to java bytecode.

8. Serialization
RDD-  Spark uses java serialization, whenever it needs to distribute data over a cluster. Serializing individual Scala and Java objects are expensive. It also requires sending both data and structure between nodes.

DataFrame- In dataframe, we can serialize data into off-heap storage in binary format. Afterwards, it performs transformations on this off-heap memory, as spark understands schema. Moreover, to encode the data, there is no need to use java serialization.

9. Efficiency/Memory use
RDD-  When serialization executes individually on a java and scala object, efficiency decreases. It also takes lots of time.

DataFrame- Use of off-heap memory for serialization reduces the overhead also generates, bytecode. So that, many operations can be performed on that serialized data. Basically, there is no need of deserialization for small operations.

10. Lazy Evaluation
RDD-  Spark does not compute their result right away, it evaluates RDDs lazily. Apart from it, Spark memorizes the transformation applied to some base data set. Moreover, When an action needs, a result sent to driver program for computation.

DataFrame- Similarly, computation happens only when action appears as Spark evaluates dataframe lazily.

11. Language Support
RDD- APIs for RDD is available in 4 languages, such as Java, Scala, Python, and R. As a result, this feature provides flexibility to the developers.

DataFrame- As similar as RDD, it also has APIs in same 4 languages, such as Java, Scala, Python, and R.

12. Schema Projection
RDD- Since RDD APIs, use schema projection explicitly. Therefore, a user needs to define the schema manually.

DataFrame- In dataframe, there is no need to specify a schema. Generally, it discovers schema automatically.

13. Aggregation
RDD- While performing simple grouping and aggregation operations RDD API is slower.

DataFrame- In performing exploratory analysis, creating aggregated statistics on data, dataframes are faster.

14. Usage
RDD-  When you want low-level transformation and actions, we use RDDs. Also, when we need high-level abstractions we use RDDs.

DataFrame-  We use dataframe when we need a high level of abstraction and for unstructured data, such as media streams or streams of text.

Sunday 16 September 2018

suscribe

 Map(new TopicPartition(topic, partition) -> 2L)
    val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams,offsets))


override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {

 var partition = 0

 val keyInt = Integer.parseInt(key.asInstanceOf[String])

 val tripData = value.asInstanceOf[String]

 //Gets the UserType from the message produced

 val userType = tripData.split(",")(12)

 //Assigns the partitions to the messages based on the user types

 if ("Subscriber".equalsIgnoreCase(userType)) {

  partition = 0;

 } else if ("Customer".equalsIgnoreCase(userType)) {

  partition = 1;

 }

 println("Partition for message " + value + " is " + partition)

 partition

}







import kafka.producer._
import kafka.utils._
import java.util._
import java.text._
import java.util.concurrent.atomic._

class KafkaPartitioner(props: VerifiableProperties = null) extends Partitioner {
  
  val counter = new AtomicInteger(0)
  val batch = new AtomicInteger(0)
  val partition = new AtomicInteger(0)
  
  def partition(key: Any, numPartitions: Int): Int = {
    //round robin partitioner to smooth producers' traffic on all partitions
    //change partition every X messages where X corresponds to kafka producer batch message size
    if(batch.incrementAndGet % BATCH_NUM_MESSAGES == 1) {
      partition.set(math.abs(counter.incrementAndGet) % numPartitions)
      batch.set(0)
    }
    partition.get
  }
}



















    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class SensorProducer {
                                    
    public static void main(String[] args) throws Exception{
                                    
        String topicName = "SensorTopic";
                                    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "SensorPartitioner");
            props.put("speed.sensor.name", "TSS");
                                        
            Producer<String, String> producer = new KafkaProducer<>(props);
                                        
            for (inti=0 ;i<10 ; i++)
                producer.send(new ProducerRecord<>(topicName,"SSP"+i,"500"+i));
                                        
            for (inti=0 ;i<10 ; i++)
                producer.send(new ProducerRecord<>(topicName,"TSS","500"+i));
                                        
            producer.close();                                        
            System.out.println("SimpleProducer Completed.");
        }
    }                                               
                            





    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.utils.*;
    import org.apache.kafka.common.record.*;
                                    
  public class SensorPartitioner implements Partitioner {
                                    
 private String speedSensorName;
                                    
 public void configure(Map<String, ?> configs) {
speedSensorName=configs.get("speed.sensor.name").toString();                                    
        }
                                        
 public intpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                                    
List<PartitionInfo> partitions =cluster.partitionsForTopic(topic);
  intnumPartitions = partitions.size();
  intsp = (int)Math.abs(numPartitions*0.3);
  int p=0;
                                    
if ( (keyBytes == null) || (!(key instanceof String)) )
 throw new InvalidRecordException("All messages must have sensor name as key");
                                    
if ( ((String)key).equals(speedSensorName) )
               
 p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
            else
                
p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ;
                                    
System.out.println("Key = " + (String)key + " Partition = " + p );
            return p;
        }
        public void close() {}                                    
    }