scala - Why does my query fail with AnalysisException? -
i new spark streaming. trying structured spark streaming local csv files.i getting below exception while processing.
exception in thread "main" org.apache.spark.sql.analysisexception: queries streaming sources must executed writestream.start();; filesource[file:///home/teju/desktop/sparkinputfiles/*.csv]
this code.
val df = spark .readstream .format("csv") .option("header", "false") // use first line of files header .option("delimiter", ":") // specifying delimiter of input file .schema(inputdata_schema) // specifying schema input file .load("file:///home/teju/desktop/sparkinputfiles/*.csv") val filterop = spark.sql("select tagshortid,timestamp,listenershortid,rootorgid,suborgid,first(rssi_weightage(rssi)) rssi_weight my_table rssi > -127 group tagshortid,timestamp,listenershortid,rootorgid,suborgid order timestamp asc") val outstream = filterop.writestream.outputmode("complete").format("console").start()
i created cron job every 5 mins 1 input csv file.i trying parse through spark streaming.any appreciated.
(this not solution more comment, given length ended here. i'm going make answer right after i've collected enough information investigation).
my guess you're doing incorrect on df
have not included in question.
since error message filesource
path below , streaming dataset must df
that's in play.
filesource[file:///home/teju/desktop/sparkinputfiles/*.csv]
given other lines guess register streaming dataset temporary table (i.e. my_table
) use in spark.sql
execute sql , writestream
console.
df.createorreplacetempview("my_table")
if that's correct, code you've included in question incomplete , not show reason error.
Comments
Post a Comment