Kafka - scala - processing multiple messages -
is possible send array of string kafka producer object. want take messages 'topic1' - lines of text split single words , send topic.
object kafkaconsumer extends app { implicit val actorsystem = actorsystem("test-actor-system") implicit val streammaterializer = actormaterializer() implicit val executioncontext = actorsystem.dispatcher val log = actorsystem.log // producer config val producersettings = producersettings( actorsystem, new bytearrayserializer, new stringserializer) .withbootstrapservers("localhost:9092") .withproperty("auto.create.topics.enable", "true") // consumer config val consumersettings = consumersettings( system = actorsystem, keydeserializer = new bytearraydeserializer, valuedeserializer = new stringdeserializer) .withbootstrapservers("localhost:9092") .withgroupid("kafka-sample") .withproperty(consumerconfig.auto_offset_reset_config, "earliest") // -----------------------------------------------------------------------// // route of app consumer.committablesource(consumersettings, subscriptions.topics("topic1")) .map { msg => println(s"topic1 -> topic2: $msg") producermessage.message(new producerrecord[array[byte], string]( "topic2", msg.record.value), msg.committableoffset) } .runwith(producer.commitablesink(producersettings)) }
the akka streams sample creates simple stream reads 1 message, uses sink produces kafka , commits offset consumed message. if need read 1 or more messages , produce many words exist in consumed set, should play more akka stream graph api.
this example uses graphs , builds 1 source kafka , uses groupedwithin read bunch of messages , existing words.
two simple flows created, 1 getting last offset , words. source stage created broadcasting consumed message kafka both flows , zipping result in tuple (seq[string],long). runforeach function messages produced. note messages aren´t produced in order future.sequence.
although sample can long compiles , work using "com.typesafe.akka" %% "akka-stream-kafka" % "0.14"
import java.util.properties import akka.actor.actorsystem import akka.kafka.consumermessage.{committablemessage, committableoffset} import akka.kafka.{consumersettings, producersettings, subscriptions} import akka.kafka.scaladsl.consumer import akka.stream.{actormaterializer, sourceshape} import akka.stream.scaladsl.{broadcast, flow, graphdsl, source, zip} import org.apache.kafka.clients.consumer.consumerconfig import org.apache.kafka.clients.producer.{kafkaproducer, producerrecord} import org.apache.kafka.common.serialization.{ bytearraydeserializer, bytearrayserializer, stringdeserializer, stringserializer } import scala.concurrent.future import scala.util.success import scala.concurrent.duration._ object splitsource extends app { implicit val actorsystem = actorsystem("test-actor-system") implicit val streammaterializer = actormaterializer() implicit val executioncontext = actorsystem.dispatcher val log = actorsystem.log // producer config val producersettings = producersettings(actorsystem, new bytearrayserializer, new stringserializer) .withbootstrapservers("localhost:9092") .withproperty("auto.create.topics.enable", "true") // consumer config val consumersettings = consumersettings(system = actorsystem, keydeserializer = new bytearraydeserializer, valuedeserializer = new stringdeserializer) .withbootstrapservers("localhost:9092") .withgroupid("kafka-sample4") .withproperty(consumerconfig.auto_offset_reset_config, "earliest") implicit val producerconfig = { val props = new properties() props.setproperty("bootstrap.servers", "localhost:9092") props.setproperty("key.serializer", classof[stringserializer].getname) props.setproperty("value.serializer", classof[stringserializer].getname) props } lazy val kafkaproducer = new kafkaproducer[string, string](producerconfig) // create scala future java private def publishtokafka(id: string, data: string) = { future { kafkaproducer .send(new producerrecord("outtopic", id, data)) .get() } } def getkafkasource = consumer .committablesource(consumersettings, subscriptions.topics("intopic")) // consumes 10 messages or waits 30 seconds push downstream .groupedwithin(10, 30 seconds) val getstreamsource = graphdsl.create() { implicit b => import graphdsl.implicits._ val in = getkafkasource // broadcast 2 flows. 1 obtain last offset commit // , other return seq words publish val br = b.add(broadcast[seq[committablemessage[array[byte], string]]](2)) val zipresult = b.add(zip[committableoffset, array[string]]()) val flowcommit = flow[seq[committablemessage[array[byte], string]]].map(_.last.committableoffset) // flow creates list of words in consumed messages val _flowwords = flow[seq[committablemessage[array[byte], string]]].map(input => { input.map(_.record.value()).mkstring(" ").split(" ") }) val zip = zip[committableoffset, array[string]] // build stage in ~> br ~> flowcommit ~> zipresult.in0 br ~> _flowwords ~> zipresult.in1 sourceshape(zipresult.out) } source.fromgraph(getstreamsource).runforeach { msgs => { // publish words , when futures complete commit last kafka offset val futures = msgs._2.map(publishtokafka("outtopic", _)).tolist // produces in parallel!!. use flatmap make in order future.sequence(futures).oncomplete { case success(e) => { // once futures done, makes commit last consumed message msgs._1.commitscaladsl() } } } } }
the akka stream api allows create awesome processing pipelines.
Comments
Post a Comment