scala - NATS streaming server subscriber rate limiting and exactly once delivery -


i playing bit nats streaming , have problem subscriber rate limiting. when set max in flight 1 , timeout 1 second , have consumer thread.sleep(1000) multiple times same event. thought limiting in flight , using manual ack should not happen. how can exatly once delivery on slow consumers?

  case class eventbus[i, o](inputtopic: string, outputtopic: string, connection: connection, eventprocessor: statefuleventprocessor[i, o]) {     // event bus abstract class while `connection` coulbd injected using di     val substritionoptions: subscriptionoptions = new subscriptionoptions.builder()                                                                          .setmanualacks(true)                                                                          .setdurablename("foo")                                                                          .setmaxinflight(1)                                                                          .setackwait(1, timeunit.seconds)                                                                          .build()      if (!inputtopic.isempty) {       connection.subscribe(inputtopic, new messagehandler() {         override def onmessage(m: message) {           m.ack()           try {             val event = eventprocessor.deserialize(m.getdata)             eventprocessor.onevent(event)           } catch {             case =>               try {                 val command = new string(m.getdata)                 eventprocessor.oncommand(command)               } catch {                 case => println(s"de-serialization error: $any")               }           } {             println("got event")           }         }       }, substritionoptions)     }      if (!outputtopic.isempty) {       eventprocessor.setbus(e => {         try {           connection.publish(outputtopic, eventprocessor.serialize(e))         } catch {           case ex => println(s"serialization error $ex")         }       })     }   }     abstract class statefuleventprocessor[i, o] {     private var bus: option[o => unit] = none     def onevent(event: i): unit     def oncommand(command: string): unit      def serialize(o: o): array[byte] =       serializationutils.serialize(o.asinstanceof[java.io.serializable])      def deserialize(in: array[byte]): =       serializationutils.deserialize[i](in)      def setbus(push: o => unit): unit = {       if (bus.isdefined) {         throw new illegalstateexception("bus set")       } else {         bus = some(push)       }     }      def push(event: o) =       bus.get.apply(event)   }     eventbus("out-1", "out-2", sc, new statefuleventprocessor[string, string] {     override def onevent(event: string): unit = {       thread.sleep(1000)       push("!!!" + event)     }      override def oncommand(command: string): unit = {}   })    (0 until 100).foreach(i => sc.publish("out-1", serializationutils.serialize(s"test-$i"))) 

first, there no once (re)delivery guarantee nats streaming. maxinflight gives you, assurance server not send new messages subscriber until number of unacknowledged messages below number. in case of maxinflight(1), asking server send next new message after receiving ack delivered message. however, not block redelivery of unacknowledged messages.

the server has no guarantee or no knowledge message received subscriber. ack for, let server know message processed subscriber. if server not honor redelivery (even when maxinflight reached), "lost" message stall subscription ever. keep in mind nats streaming server , clients not directly connected each other tcp connection (they both connected nats server, aka gnatsd).


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -