Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday, 16 May 2017

Spark : Spark streaming and Kafka Integration

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }


















6 comments:

  1. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  2. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
    Big Data Hadoop Training in electronic city

    ReplyDelete