scala - spark-streaming-kafka-10 DSteam is not pulling anything from Kafka -


i trying use spark-streaming-kafka-0.10 pull messages kafka topic(broker version 0.10). have checked messages being produced , used kafkaconsumer pull them successfully. now, when try use spark streaming api, not getting anything. if use kafkautils.createrdd , specify offset ranges manually works. when, try use createdirectstream, rdds empty , when check partition offsets reports partitions 0. here tried:

 val sparkconf = new sparkconf().setappname("kafkastream")  val ssc = new streamingcontext(sparkconf, seconds(3))  val topics = array("my_topic")   val kafkaparams = map[string, object](    "bootstrap.servers" -> "hostname:6667"    "key.deserializer" -> classof[stringdeserializer],    "value.deserializer" -> classof[stringdeserializer],    "group.id" -> "my_group",    "auto.offset.reset" -> "earliest",    "enable.auto.commit" -> (true: java.lang.boolean)  )   val stream = kafkautils.createdirectstream[string, string](    ssc,    preferconsistent,    subscribe[string, string](topics, kafkaparams)  )   stream.foreachrdd { rdd =>    val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges    rdd.foreachpartition { iter =>      val o: offsetrange = offsetranges(taskcontext.get.partitionid)      println(s"${o.topic} ${o.partition} ${o.fromoffset} ${o.untiloffset}")    }     val rddcount = rdd.count()    println("rdd count: ", rddcount)     // stream.asinstanceof[cancommitoffsets].commitasync(offsetranges)  }   ssc.start()  ssc.awaittermination() 

all partitions show offset ranges 0 0 , rdds empty. start beginning of partition pick being produced it.

i have found kafka-clients issue. using kafka-clients 0.10.1.1 , had bug, fixed here: https://issues.apache.org/jira/browse/kafka-4547

thus, fix me updating 0.10.2.1. spark-streaming-kafka using compile dependency of 0.10.0.1, overwriting when included own version.


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -