steps:
1) start zookeper server
2) Start Kafka brokers [ one or more ]
3) create topic .
4) start console producer [ to write messages into topic ]
5) start console consumer [ to test , whether messages are stremed ]
6) create spark streaming context,
which streams from kafka topic.
7) perform transformations or aggregations
8) output operation : which will direct the results into another kafka topic.
------------------------------------------
following code tested with ,
spark 1.6.0 and kafka 0.10.2.0
kafka and spark streaming
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)
val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)
wc.print()
// use below code to write results into kafka topic
ssc.start
------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1
// writing into kafka topic.
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
wc.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case t:(w:String,cnt:Long)=>{
val x = w+"\t"+cnt
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("results1",null,x)
producer.send(message)
}
}))
-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning
-------------------
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
1. --? KafkaUtils.createStream()..
needs 4 arguments.
1st ---> streaming Context
2nd --> zk details.
3rd --- > consumer group id
4th ----> Topics.
spark streaming can read from multiple topics.
topic should be as a key value pair of map object
key ---> topic name
value ---> no.of consumer threads.
to read from multiple topics,
the 4th argument should be as follows.
Map("t1"->2,"t2"->4,"t3"->1)
-------------------------
each given number of consumer threads will applied on each partition of kafka topic.
ex: topic has 3 threads,
consumber threads are 5.
so , total number of threads = 15.
but these 15 theads are not parallely executed.
at shot, 5 threads for one partiton will be parallely consuming data.
to make all (15) parallel.
val numparts = 3
val kstreams = (1 to numparts).map{x =>
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer- group", Map("spark-topic" -> 5))
}
1) start zookeper server
2) Start Kafka brokers [ one or more ]
3) create topic .
4) start console producer [ to write messages into topic ]
5) start console consumer [ to test , whether messages are stremed ]
6) create spark streaming context,
which streams from kafka topic.
7) perform transformations or aggregations
8) output operation : which will direct the results into another kafka topic.
------------------------------------------
following code tested with ,
spark 1.6.0 and kafka 0.10.2.0
kafka and spark streaming
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)
val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)
wc.print()
// use below code to write results into kafka topic
ssc.start
------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1
// writing into kafka topic.
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
wc.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case t:(w:String,cnt:Long)=>{
val x = w+"\t"+cnt
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("results1",null,x)
producer.send(message)
}
}))
-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning
-------------------
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
1. --? KafkaUtils.createStream()..
needs 4 arguments.
1st ---> streaming Context
2nd --> zk details.
3rd --- > consumer group id
4th ----> Topics.
spark streaming can read from multiple topics.
topic should be as a key value pair of map object
key ---> topic name
value ---> no.of consumer threads.
to read from multiple topics,
the 4th argument should be as follows.
Map("t1"->2,"t2"->4,"t3"->1)
-------------------------
each given number of consumer threads will applied on each partition of kafka topic.
ex: topic has 3 threads,
consumber threads are 5.
so , total number of threads = 15.
but these 15 theads are not parallely executed.
at shot, 5 threads for one partiton will be parallely consuming data.
to make all (15) parallel.
val numparts = 3
val kstreams = (1 to numparts).map{x =>
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer- group", Map("spark-topic" -> 5))
}
nice blog
ReplyDeleteFree Workday Training offers beginners and professionals an opportunity to learn Workday HCM, payroll, and finance modules at no cost. Access tutorials, videos, and community resources to build in-demand skills.
ReplyDelete