scala - Why does my query fail with AnalysisException? -


i new spark streaming. trying structured spark streaming local csv files.i getting below exception while processing.

exception in thread "main" org.apache.spark.sql.analysisexception: queries streaming sources must executed writestream.start();; filesource[file:///home/teju/desktop/sparkinputfiles/*.csv] 

this code.

val df = spark   .readstream   .format("csv")   .option("header", "false") // use first line of files header   .option("delimiter", ":") // specifying delimiter of input file   .schema(inputdata_schema) // specifying schema input file   .load("file:///home/teju/desktop/sparkinputfiles/*.csv")  val filterop = spark.sql("select tagshortid,timestamp,listenershortid,rootorgid,suborgid,first(rssi_weightage(rssi)) rssi_weight my_table rssi > -127 group tagshortid,timestamp,listenershortid,rootorgid,suborgid order timestamp asc") val outstream = filterop.writestream.outputmode("complete").format("console").start() 

i created cron job every 5 mins 1 input csv file.i trying parse through spark streaming.any appreciated.

(this not solution more comment, given length ended here. i'm going make answer right after i've collected enough information investigation).


my guess you're doing incorrect on df have not included in question.

since error message filesource path below , streaming dataset must df that's in play.

filesource[file:///home/teju/desktop/sparkinputfiles/*.csv]

given other lines guess register streaming dataset temporary table (i.e. my_table) use in spark.sql execute sql , writestream console.

df.createorreplacetempview("my_table") 

if that's correct, code you've included in question incomplete , not show reason error.


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -