spark dataframe - pyspark-df window function with filter -


i have dataset column: id,timestamp,x,y

[id], [timestamp], [x], [y]

0 , 1443489380, 100 , 1

0 , 1443489390, 200 , 0

0 , 1443489400, 300 , 0

0 , 1443489410, 400 , 1

i defined window spec: w = window.partitionby("id").orderby("timestamp")

i want this. create new column sum x of current row x of next row.

if sum >= 500 set new column = big else small.

df = df.withcolumn("newcol",                     when(df.x + lag(df.x,-1).over(w) >= 500 , "big")                    .otherwise("small") ) 

however, want filter data before without affecting original df.

[only row y =1 apply above code]

so data apply above code these 2 rows.

0 , 1443489380, 100 , 1

0 , 1443489410, 400 , 1

i have done way bad.

df2 = df.filter(df.y == 1) df2 = df2.withcolumn("newcol",                       when(df.x + lag(df.x,-1).over(w) >= 500 , "big")                      .otherwise("small") ) df = df.join(df2, ["id","timestamp"], "outer") 

i want it's not possible since cause attributeerror: 'dataframe' object has no attribute 'when'

df = df.withcolumn("newcol", df.filter(df.y == 1)                    .when(df.x + lag(df.x,-1).over(w) >= 500 , "big")                    .otherwise("small") ) 

in conclusion, want temporary filter row y =1 before sum x next x.

your code works fine, think din import functions module. tried code,

>>> pyspark.sql import functions f >>> df2 = df2.withcolumn("newcol",                   f.when((df.x + f.lag(df.x,-1).over(w))>= 500 , "big")                  .otherwise("small") ) >>> df2.show() +---+----------+---+---+------+ | id| timestamp|  x|  y|newcol| +---+----------+---+---+------+ |  0|1443489380|100|  1|   big| |  0|1443489410|400|  1| small| +---+----------+---+---+------+ 

edited : have tried changing window partition based on 'id','y' columns,

>>> w = window.partitionby("id","y").orderby("timestamp") >>> df.select("*", f.when(df.y == 1,f.when((df.x+f.lag("x",-1).over(w)) >=500,'big').otherwise('small')).otherwise(none).alias('new_col')).show() +---+----------+---+---+-------+ | id| timestamp|  x|  y|new_col| +---+----------+---+---+-------+ |  0|1443489380|100|  1|    big| |  0|1443489410|400|  1|  small| |  0|1443489390|200|  0|   null| |  0|1443489400|300|  0|   null| +---+----------+---+---+-------+ 

Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -