scala - How to write JDBC Sink for Spark Structured Streaming [SparkException: Task not serializable]? -
i need jdbc sink spark structured streaming data frame. @ moment, far know dataframe’s api lacks writestream jdbc implementation (neither in pyspark nor in scala (current spark version 2.2.0)). suggestion found write own foreachwriter scala class based on this article. so, i’ve modified simple word count example here adding custom foreachwriter class , tried writestream postgress. stream of words generated manually console (using netcat: nc -lk -p 9999) , read spark socket.
unfortunately, i’m getting "task not serializable" error.
apache_spark_version=2.1.0 using scala version 2.11.8 (java hotspot(tm) 64-bit server vm, java 1.8.0_112)
my scala code:
//spark context available 'sc' (master = local[*], app id = local-1501242382770). //spark session available 'spark'. import java.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.sparksession val spark = sparksession .builder .master("local[*]") .appname("structurednetworkwordcounttojdbc") .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar") .getorcreate() import spark.implicits._ val lines = spark.readstream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val words = lines.as[string].flatmap(_.split(" ")) val wordcounts = words.groupby("value").count() class jdbcsink(url: string, user:string, pwd:string) extends org.apache.spark.sql.foreachwriter[org.apache.spark.sql.row]{ val driver = "org.postgresql.driver" var connection:java.sql.connection = _ var statement:java.sql.statement = _ def open(partitionid: long, version: long):boolean = { class.forname(driver) connection = java.sql.drivermanager.getconnection(url, user, pwd) statement = connection.createstatement true } def process(value: org.apache.spark.sql.row): unit = { statement.executeupdate("insert public.test(col1, col2) " + "values ('" + value(0) + "'," + value(1) + ");") } def close(errorornull:throwable):unit = { connection.close } } val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>" val user="<user name>" val pwd="<pass>" val writer = new jdbcsink(url, user, pwd) import org.apache.spark.sql.streaming.processingtime val query=wordcounts .writestream .foreach(writer) .outputmode("complete") .trigger(processingtime("25 seconds")) .start() query.awaittermination()
error message:
error streamexecution: query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runid = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated error org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:298) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:288) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:108) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2094) @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:924) @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:923) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:362) @ org.apache.spark.rdd.rdd.foreachpartition(rdd.scala:923) @ org.apache.spark.sql.execution.streaming.foreachsink.addbatch(foreachsink.scala:49) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatch$1.apply$mcv$sp(streamexecution.scala:503) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatch$1.apply(streamexecution.scala:503) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatch$1.apply(streamexecution.scala:503) @ org.apache.spark.sql.execution.streaming.progressreporter$class.reporttimetaken(progressreporter.scala:262) @ org.apache.spark.sql.execution.streaming.streamexecution.reporttimetaken(streamexecution.scala:46) @ org.apache.spark.sql.execution.streaming.streamexecution.org$apache$spark$sql$execution$streaming$streamexecution$$runbatch(streamexecution.scala:502) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1$$anonfun$1.apply$mcv$sp(streamexecution.scala:255) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1$$anonfun$1.apply(streamexecution.scala:244) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1$$anonfun$1.apply(streamexecution.scala:244) @ org.apache.spark.sql.execution.streaming.progressreporter$class.reporttimetaken(progressreporter.scala:262) @ org.apache.spark.sql.execution.streaming.streamexecution.reporttimetaken(streamexecution.scala:46) @ org.apache.spark.sql.execution.streaming.streamexecution$$anonfun$org$apache$spark$sql$execution$streaming$streamexecution$$runbatches$1.apply$mcz$sp(streamexecution.scala:244) @ org.apache.spark.sql.execution.streaming.processingtimeexecutor.execute(triggerexecutor.scala:43) @ org.apache.spark.sql.execution.streaming.streamexecution.org$apache$spark$sql$execution$streaming$streamexecution$$runbatches(streamexecution.scala:239) @ org.apache.spark.sql.execution.streaming.streamexecution$$anon$1.run(streamexecution.scala:177) caused by: java.io.notserializableexception: org.apache.spark.sql.execution.streaming.streamexecution serialization stack: - object not serializable (class: org.apache.spark.sql.execution.streaming.streamexecution, value: streaming query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runid = e20beefa-146a-4139-96f9-de3d64ce048a] [state = terminated]) - field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.streamingquery) - object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f) - field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw) - object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19) - field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw) - object (class $line21.$read, $line21.$read@13e62f5d) - field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read) - object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c) - field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw) - object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5) - field (class: $line25.$read$$iw$$iw$jdbcsink, name: $outer, type: class $line25.$read$$iw$$iw) - object (class $line25.$read$$iw$$iw$jdbcsink, $line25.$read$$iw$$iw$jdbcsink@6c096c84) - field (class: org.apache.spark.sql.execution.streaming.foreachsink, name: org$apache$spark$sql$execution$streaming$foreachsink$$writer, type: class org.apache.spark.sql.foreachwriter) - object (class org.apache.spark.sql.execution.streaming.foreachsink, org.apache.spark.sql.execution.streaming.foreachsink@6feccb75) - field (class: org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.foreachsink) - object (class org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1, <function1>) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:46) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:100) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:295) ... 25 more
how make work?
solution
(thanks all, special thaks @zsxwing straightforward solution):
- save jdbcsink class file.
- in spark-shell load class f.eg. using
scala> :load <path_to_a_jdbcsink.scala_file>
- finally
scala> :paste
code without jdbcsink class definition.
just define jdbcsink in separated file rather defining inner class may capture outer reference.
Comments
Post a Comment