scala - How to write JDBC Sink for Spark Structured Streaming [SparkException: Task not serializable]? -


i need jdbc sink spark structured streaming data frame. @ moment, far know dataframe’s api lacks writestream jdbc implementation (neither in pyspark nor in scala (current spark version 2.2.0)). suggestion found write own foreachwriter scala class based on this article. so, i’ve modified simple word count example here adding custom foreachwriter class , tried writestream postgress. stream of words generated manually console (using netcat: nc -lk -p 9999) , read spark socket.

unfortunately, i’m getting "task not serializable" error.

apache_spark_version=2.1.0 using scala version 2.11.8 (java hotspot(tm) 64-bit server vm, java 1.8.0_112)

my scala code:

//spark context available 'sc' (master = local[*], app id = local-1501242382770). //spark session available 'spark'.  import java.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.sparksession  val spark = sparksession   .builder   .master("local[*]")   .appname("structurednetworkwordcounttojdbc")   .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")   .getorcreate()  import spark.implicits._  val lines = spark.readstream   .format("socket")   .option("host", "localhost")   .option("port", 9999)   .load()  val words = lines.as[string].flatmap(_.split(" "))  val wordcounts = words.groupby("value").count()  class jdbcsink(url: string, user:string, pwd:string) extends org.apache.spark.sql.foreachwriter[org.apache.spark.sql.row]{     val driver = "org.postgresql.driver"     var connection:java.sql.connection = _     var statement:java.sql.statement = _      def open(partitionid: long, version: long):boolean = {         class.forname(driver)         connection = java.sql.drivermanager.getconnection(url, user, pwd)         statement = connection.createstatement         true     }      def process(value: org.apache.spark.sql.row): unit = {             statement.executeupdate("insert public.test(col1, col2) " +                              "values ('" + value(0) + "'," + value(1) + ");")     }      def close(errorornull:throwable):unit = {         connection.close     } }  val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>" val user="<user name>" val pwd="<pass>" val writer = new jdbcsink(url, user, pwd)  import org.apache.spark.sql.streaming.processingtime  val query=wordcounts   .writestream   .foreach(writer)   .outputmode("complete")   .trigger(processingtime("25 seconds"))   .start()  query.awaittermination() 

error message:

error streamexecution: query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runid = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated error org.apache.spark.sparkexception: task not serializable         @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:298)         @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:288)         @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:108)         @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2094)         @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:924)         @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:923)         @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)         @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)         @ org.apache.spark.rdd.rdd.withscope(rdd.scala:362)         @ org.apache.spark.rdd.rdd.foreachpartition(rdd.scala:923)         @ org.apache.spark.sql.execution.streaming.foreachsink.addbatch(foreachsink.scala:49)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatch$1.apply$mcv$sp(streamexecution.scala:503)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatch$1.apply(streamexecution.scala:503)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatch$1.apply(streamexecution.scala:503)         @ org.apache.spark.sql.execution.streaming.progressreporter$class.reporttimetaken(progressreporter.scala:262)         @ org.apache.spark.sql.execution.streaming.streamexecution.reporttimetaken(streamexecution.scala:46)         @ org.apache.spark.sql.execution.streaming.streamexecution.org$apache$spark$sql$execution$streaming$streamexecution$$runbatch(streamexecution.scala:502)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1$$anonfun$1.apply$mcv$sp(streamexecution.scala:255)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1$$anonfun$1.apply(streamexecution.scala:244)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1$$anonfun$1.apply(streamexecution.scala:244)         @ org.apache.spark.sql.execution.streaming.progressreporter$class.reporttimetaken(progressreporter.scala:262)         @ org.apache.spark.sql.execution.streaming.streamexecution.reporttimetaken(streamexecution.scala:46)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1.apply$mcz$sp(streamexecution.scala:244)         @ org.apache.spark.sql.execution.streaming.processingtimeexecutor.execute(triggerexecutor.scala:43)         @ org.apache.spark.sql.execution.streaming.streamexecution.org$apache$spark$sql$execution$streaming$streamexecution$$runbatches(streamexecution.scala:239)         @ org.apache.spark.sql.execution.streaming.streamexecution$$anon$1.run(streamexecution.scala:177) caused by: java.io.notserializableexception: org.apache.spark.sql.execution.streaming.streamexecution serialization stack:         - object not serializable (class: org.apache.spark.sql.execution.streaming.streamexecution, value: streaming query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runid = e20beefa-146a-4139-96f9-de3d64ce048a] [state = terminated])         - field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.streamingquery)         - object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f)         - field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw)         - object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19)         - field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw)         - object (class $line21.$read, $line21.$read@13e62f5d)         - field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read)         - object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c)         - field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw)         - object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5)         - field (class: $line25.$read$$iw$$iw$jdbcsink, name: $outer, type: class $line25.$read$$iw$$iw)         - object (class $line25.$read$$iw$$iw$jdbcsink, $line25.$read$$iw$$iw$jdbcsink@6c096c84)         - field (class: org.apache.spark.sql.execution.streaming.foreachsink, name: org$apache$spark$sql$execution$streaming$foreachsink$$writer, type: class org.apache.spark.sql.foreachwriter)         - object (class org.apache.spark.sql.execution.streaming.foreachsink, org.apache.spark.sql.execution.streaming.foreachsink@6feccb75)         - field (class: org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.foreachsink)         - object (class org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1, <function1>)         @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40)         @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:46)         @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:100)         @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:295)         ... 25 more 

how make work?

solution

(thanks all, special thaks @zsxwing straightforward solution):

  1. save jdbcsink class file.
  2. in spark-shell load class f.eg. using scala> :load <path_to_a_jdbcsink.scala_file>
  3. finally scala> :paste code without jdbcsink class definition.

just define jdbcsink in separated file rather defining inner class may capture outer reference.


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -