steps:
1) start zookeper server
2) Start Kafka brokers [ one or more ]
3) create topic .
4) start console producer [ to write messages into topic ]
5) start console consumer [ to test , whether messages are stremed ]
6) create spark streaming context,
which streams from kafka topic.
7) perform transformations or aggregations
8) output operation : which will direct the results into another kafka topic.
------------------------------------------
following code tested with ,
spark 1.6.0 and kafka 0.10.2.0
kafka and spark streaming
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)
val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)
wc.print()
// use below code to write results into kafka topic
ssc.start
------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1
// writing into kafka topic.
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
wc.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case t:(w:String,cnt:Long)=>{
val x = w+"\t"+cnt
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("results1",null,x)
producer.send(message)
}
}))
-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning
-------------------
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
1. --? KafkaUtils.createStream()..
needs 4 arguments.
1st ---> streaming Context
2nd --> zk details.
3rd --- > consumer group id
4th ----> Topics.
spark streaming can read from multiple topics.
topic should be as a key value pair of map object
key ---> topic name
value ---> no.of consumer threads.
to read from multiple topics,
the 4th argument should be as follows.
Map("t1"->2,"t2"->4,"t3"->1)
-------------------------
each given number of consumer threads will applied on each partition of kafka topic.
ex: topic has 3 threads,
consumber threads are 5.
so , total number of threads = 15.
but these 15 theads are not parallely executed.
at shot, 5 threads for one partiton will be parallely consuming data.
to make all (15) parallel.
val numparts = 3
val kstreams = (1 to numparts).map{x =>
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer- group", Map("spark-topic" -> 5))
}
1) start zookeper server
2) Start Kafka brokers [ one or more ]
3) create topic .
4) start console producer [ to write messages into topic ]
5) start console consumer [ to test , whether messages are stremed ]
6) create spark streaming context,
which streams from kafka topic.
7) perform transformations or aggregations
8) output operation : which will direct the results into another kafka topic.
------------------------------------------
following code tested with ,
spark 1.6.0 and kafka 0.10.2.0
kafka and spark streaming
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)
val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)
wc.print()
// use below code to write results into kafka topic
ssc.start
------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1
// writing into kafka topic.
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
wc.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case t:(w:String,cnt:Long)=>{
val x = w+"\t"+cnt
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("results1",null,x)
producer.send(message)
}
}))
-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning
-------------------
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
1. --? KafkaUtils.createStream()..
needs 4 arguments.
1st ---> streaming Context
2nd --> zk details.
3rd --- > consumer group id
4th ----> Topics.
spark streaming can read from multiple topics.
topic should be as a key value pair of map object
key ---> topic name
value ---> no.of consumer threads.
to read from multiple topics,
the 4th argument should be as follows.
Map("t1"->2,"t2"->4,"t3"->1)
-------------------------
each given number of consumer threads will applied on each partition of kafka topic.
ex: topic has 3 threads,
consumber threads are 5.
so , total number of threads = 15.
but these 15 theads are not parallely executed.
at shot, 5 threads for one partiton will be parallely consuming data.
to make all (15) parallel.
val numparts = 3
val kstreams = (1 to numparts).map{x =>
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer- group", Map("spark-topic" -> 5))
}
nice blog
ReplyDelete