scala - Spark consuming records only off of one Kafka topic partition -
i have spark streaming app consumes off kafka:
kafkautils.createdirectstream[string, string]( ssc, preferconsistent, subscribe[string, string](set(kafkatopic), kafkaparams) )
kafka parameters default in https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
everything works fine long kafka topic has 1 partition. however, when it's more (2), spark seems reading off 1 of them. i'm seeing in logs:
17/07/28 12:08:15 info kafka010.kafkardd: computing topic processedjobs, partition 0 offsets 20 -> 29 17/07/28 12:08:15 info kafka010.kafkardd: beginning offset 0 same ending offset skipping processedjobs 1 17/07/28 12:08:20 info kafka010.kafkardd: beginning offset 29 same ending offset skipping processedjobs 0 17/07/28 12:08:20 info kafka010.kafkardd: beginning offset 0 same ending offset skipping processedjobs 1
output kafka-consumer-offset-checker.sh --zookeeper $zookeeper --topic processedjobs --group counselor-01
gives:
group topic pid offset logsize lag owner counselor-01 processedjobs 0 29 29 0 none counselor-01 processedjobs 1 0 28 28 none
i thought has faulty offset committing, using pattern:
stream.foreachrdd { rdd => val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges stream.asinstanceof[cancommitoffsets].commitasync(offsetranges) }
i've changed enable.auto.commit
true
, removed commiting, problem still persists (the lag on 1 of partitions keep growing).
i'm running spark 2.1.0 , kafka 0.10.1.0 in docker environment. tips on issue here?
i've seen topic: spark structured stream messages 1 partition of kafka kafka 0.10.0.1 getting:
warn clients.networkclient: bootstrap broker kafka:9092 disconnected
errors spark
Comments
Post a Comment