scala - spark-streaming-kafka-10 DSteam is not pulling anything from Kafka -
i trying use spark-streaming-kafka-0.10 pull messages kafka topic(broker version 0.10). have checked messages being produced , used kafkaconsumer pull them successfully. now, when try use spark streaming api, not getting anything. if use kafkautils.createrdd , specify offset ranges manually works. when, try use createdirectstream, rdds empty , when check partition offsets reports partitions 0. here tried:
val sparkconf = new sparkconf().setappname("kafkastream") val ssc = new streamingcontext(sparkconf, seconds(3)) val topics = array("my_topic") val kafkaparams = map[string, object]( "bootstrap.servers" -> "hostname:6667" "key.deserializer" -> classof[stringdeserializer], "value.deserializer" -> classof[stringdeserializer], "group.id" -> "my_group", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (true: java.lang.boolean) ) val stream = kafkautils.createdirectstream[string, string]( ssc, preferconsistent, subscribe[string, string](topics, kafkaparams) ) stream.foreachrdd { rdd => val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges rdd.foreachpartition { iter => val o: offsetrange = offsetranges(taskcontext.get.partitionid) println(s"${o.topic} ${o.partition} ${o.fromoffset} ${o.untiloffset}") } val rddcount = rdd.count() println("rdd count: ", rddcount) // stream.asinstanceof[cancommitoffsets].commitasync(offsetranges) } ssc.start() ssc.awaittermination()
all partitions show offset ranges 0 0 , rdds empty. start beginning of partition pick being produced it.
i have found kafka-clients issue. using kafka-clients 0.10.1.1 , had bug, fixed here: https://issues.apache.org/jira/browse/kafka-4547
thus, fix me updating 0.10.2.1. spark-streaming-kafka using compile dependency of 0.10.0.1, overwriting when included own version.
Comments
Post a Comment