scala - Spark consuming records only off of one Kafka topic partition -


i have spark streaming app consumes off kafka:

kafkautils.createdirectstream[string, string](   ssc,   preferconsistent,   subscribe[string, string](set(kafkatopic), kafkaparams) ) 

kafka parameters default in https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

everything works fine long kafka topic has 1 partition. however, when it's more (2), spark seems reading off 1 of them. i'm seeing in logs:

17/07/28 12:08:15 info kafka010.kafkardd: computing topic processedjobs, partition 0 offsets 20 -> 29 17/07/28 12:08:15 info kafka010.kafkardd: beginning offset 0 same ending offset skipping processedjobs 1 17/07/28 12:08:20 info kafka010.kafkardd: beginning offset 29 same ending offset skipping processedjobs 0 17/07/28 12:08:20 info kafka010.kafkardd: beginning offset 0 same ending offset skipping processedjobs 1 

output kafka-consumer-offset-checker.sh --zookeeper $zookeeper --topic processedjobs --group counselor-01 gives:

group           topic                          pid offset          logsize         lag             owner counselor-01    processedjobs                  0   29              29              0               none counselor-01    processedjobs                  1   0               28              28              none 

i thought has faulty offset committing, using pattern:

stream.foreachrdd { rdd =>   val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges   stream.asinstanceof[cancommitoffsets].commitasync(offsetranges) } 

i've changed enable.auto.commit true , removed commiting, problem still persists (the lag on 1 of partitions keep growing).

i'm running spark 2.1.0 , kafka 0.10.1.0 in docker environment. tips on issue here?

i've seen topic: spark structured stream messages 1 partition of kafka kafka 0.10.0.1 getting:

warn clients.networkclient: bootstrap broker kafka:9092 disconnected 

errors spark


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -