Kafka - scala - processing multiple messages -


is possible send array of string kafka producer object. want take messages 'topic1' - lines of text split single words , send topic.

    object kafkaconsumer extends app {        implicit val actorsystem = actorsystem("test-actor-system")       implicit val streammaterializer = actormaterializer()       implicit val executioncontext = actorsystem.dispatcher       val log = actorsystem.log         // producer config       val producersettings = producersettings(         actorsystem,         new bytearrayserializer,         new stringserializer)         .withbootstrapservers("localhost:9092")         .withproperty("auto.create.topics.enable", "true")        // consumer config       val consumersettings = consumersettings(         system = actorsystem,         keydeserializer = new bytearraydeserializer,         valuedeserializer = new stringdeserializer)         .withbootstrapservers("localhost:9092")         .withgroupid("kafka-sample")         .withproperty(consumerconfig.auto_offset_reset_config, "earliest")       // -----------------------------------------------------------------------//        // route of app       consumer.committablesource(consumersettings,        subscriptions.topics("topic1"))      .map {             msg => println(s"topic1 -> topic2: $msg")             producermessage.message(new producerrecord[array[byte], string]( "topic2", msg.record.value), msg.committableoffset)           }      .runwith(producer.commitablesink(producersettings))      }   

the akka streams sample creates simple stream reads 1 message, uses sink produces kafka , commits offset consumed message. if need read 1 or more messages , produce many words exist in consumed set, should play more akka stream graph api.

this example uses graphs , builds 1 source kafka , uses groupedwithin read bunch of messages , existing words.

two simple flows created, 1 getting last offset , words. source stage created broadcasting consumed message kafka both flows , zipping result in tuple (seq[string],long). runforeach function messages produced. note messages aren´t produced in order future.sequence.

although sample can long compiles , work using "com.typesafe.akka" %% "akka-stream-kafka" % "0.14"

import java.util.properties  import akka.actor.actorsystem import akka.kafka.consumermessage.{committablemessage, committableoffset} import akka.kafka.{consumersettings, producersettings, subscriptions} import akka.kafka.scaladsl.consumer import akka.stream.{actormaterializer, sourceshape} import akka.stream.scaladsl.{broadcast, flow, graphdsl, source, zip}  import org.apache.kafka.clients.consumer.consumerconfig import org.apache.kafka.clients.producer.{kafkaproducer, producerrecord} import org.apache.kafka.common.serialization.{   bytearraydeserializer,   bytearrayserializer,   stringdeserializer,   stringserializer }  import scala.concurrent.future import scala.util.success import scala.concurrent.duration._  object splitsource extends app {    implicit val actorsystem = actorsystem("test-actor-system")   implicit val streammaterializer = actormaterializer()   implicit val executioncontext = actorsystem.dispatcher   val log = actorsystem.log    // producer config   val producersettings = producersettings(actorsystem,                                           new bytearrayserializer,                                           new stringserializer)     .withbootstrapservers("localhost:9092")     .withproperty("auto.create.topics.enable", "true")    // consumer config   val consumersettings =     consumersettings(system = actorsystem,                      keydeserializer = new bytearraydeserializer,                      valuedeserializer = new stringdeserializer)       .withbootstrapservers("localhost:9092")       .withgroupid("kafka-sample4")       .withproperty(consumerconfig.auto_offset_reset_config, "earliest")    implicit val producerconfig = {     val props = new properties()     props.setproperty("bootstrap.servers", "localhost:9092")     props.setproperty("key.serializer", classof[stringserializer].getname)     props.setproperty("value.serializer", classof[stringserializer].getname)     props   }    lazy val kafkaproducer = new kafkaproducer[string, string](producerconfig)    // create scala future java   private def publishtokafka(id: string, data: string) = {     future {       kafkaproducer         .send(new producerrecord("outtopic", id, data))         .get()     }   }    def getkafkasource =     consumer       .committablesource(consumersettings, subscriptions.topics("intopic"))       // consumes 10 messages or waits 30 seconds push downstream       .groupedwithin(10, 30 seconds)    val getstreamsource = graphdsl.create() { implicit b =>     import graphdsl.implicits._      val in = getkafkasource      // broadcast 2 flows. 1 obtain last offset commit     // , other return seq words publish     val br = b.add(broadcast[seq[committablemessage[array[byte], string]]](2))     val zipresult = b.add(zip[committableoffset, array[string]]())     val flowcommit = flow[seq[committablemessage[array[byte], string]]].map(_.last.committableoffset)      // flow creates list of words in consumed messages     val _flowwords =       flow[seq[committablemessage[array[byte], string]]].map(input => {         input.map(_.record.value()).mkstring(" ").split(" ")       })      val zip = zip[committableoffset, array[string]]      // build stage     in ~> br ~> flowcommit ~> zipresult.in0           br ~> _flowwords ~> zipresult.in1      sourceshape(zipresult.out)   }    source.fromgraph(getstreamsource).runforeach { msgs =>     {       // publish words , when futures complete commit last kafka offset       val futures = msgs._2.map(publishtokafka("outtopic", _)).tolist        // produces in parallel!!. use flatmap make in order       future.sequence(futures).oncomplete {         case success(e) => {           // once futures done, makes commit last consumed message           msgs._1.commitscaladsl()         }       }     }   }  } 

the akka stream api allows create awesome processing pipelines.


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -