Problem-1:
+-------+------------+-----+---+ |user_id|episode_name|start|end| +-------+------------+-----+---+ | 101| Big Boss| 1| 10| | 101| Big Boss| 5| 10| | 101| Big Boss| 50| 60| +-------+------------+-----+---+
output:
+-------+------------+---------+
|user_id|episode_name|time |
+-------+------------+---------+
| 101| Big Boss| 25 |
+-------+------------+---------+
Logic
Time is calculated based on start and end,
1st Row, since 1-10 all is unique time so total time spent by user is 10.
2nd Row, since from 5 to 10 user has already watched so total time spent is 5.
3rd Row, since 50-60 is unique so total time spent by user is 10.
output is 10+5+10 = 25.
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType,IntegerType
from pyspark.sql.window import Window
input = [[101,'Big Boss',1,10],[101,'Big Boss',5,10],[101,'Big Boss',50,60]]
df = spark.createDataFrame(input,['user_id','episode_name','start','end'])
df.show()
window_1 = Window.orderBy("start").partitionBy("user_id")
window_2 = Window.orderBy("end").partitionBy("user_id")
df2 = df.withColumn("previous_start",F.coalesce(F.lag(F.col("start"),1).over(window_1),F.lit(0))).withColumn("previous_end",F.coalesce(F.lag(F.col("end"),1).over(window_2),F.lit(0)))
+-------+------------+-----+---+--------------+------------+ |user_id|episode_name|start|end|previous_start|previous_end| +-------+------------+-----+---+--------------+------------+ | 101| Big Boss| 1| 10| 0| 0| | 101| Big Boss| 5| 10| 1| 10| | 101| Big Boss| 50| 60| 5| 10| +-------+------------+-----+---+--------------+------------+
df3 = df2.withColumn("time",F.when(F.col("start")==1,0).when(F.col("start")<F.col("previous_start"),F.col("previous_end")).otherwise(F.col("start"))).groupBy("user_id","episode_name").agg(F.sum(F.col("end")-F.col("time")))
df3.show()
+-------+------------+-----------------+ |user_id|episode_name|sum((end - time))| +-------+------------+-----------------+ | 101| Big Boss| 25| +-------+------------+-----------------+
Problem-2:
Pick The last not Null values
I/P: O/P:
P1 " P1 "
P2 a P1 a
P3 null P1 a
P1 b P1 b
P1 null P1 b
>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import max
>>> input = [["P1", "\""],["P1","a"],["P1",None],["P1","b"],["P1",None]]
>>> df = spark.createDataFrame(input,["col1","col2"])
>>> df.show()
+----+----+
|col1|col2|
+----+----+
| P1| "|
| P1| a|
| P1|null|
| P1| b|
| P1|null|
+----+----+
>>> windowspec = Window.partitionBy(df.col1).rowsBetween(-1,0)
>>> df = df.withColumn("col3",max(df.col2).over(windowspec))
>>> df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| P1| "| "|
| P1| a| a|
| P1|null| a|
| P1| b| b|
| P1|null| b|
+----+----+----+
Problem-3:
Finding trend in the pyspark(spark-sql) question ( to find whether sale increases or decreases over months):
input = [['tenant 1',2014,2000],['tenant 1',2015,5000],['tenant 2',2013,1000],['tenant 2',2014,1500],['tenant 2',2015,800]]
df=spark.createDataFrame(input,["tenant","year","sales"])
df.show()
+--------+----+-----+ | tenant|year|sales| +--------+----+-----+ |tenant 1|2014| 2000| |tenant 1|2015| 5000| |tenant 2|2013| 1000| |tenant 2|2014| 1500| |tenant 2|2015| 800| +--------+----+-----+
df.createOrReplaceTempView("df")
spark.sql("select tenant, year, sales, round((case when chk<>0 then ((sales-chk)/chk)*100 else 0 end),2)as yoy from(select tenant, year, sales, lag(sales,1,0) over(partition by tenant order by year asc) as chk from df)").show()
+--------+----+-----+------+ | tenant|year|sales| yoy| +--------+----+-----+------+ |tenant 1|2014| 2000| 0.0| |tenant 1|2015| 5000| 150.0| |tenant 2|2013| 1000| 0.0| |tenant 2|2014| 1500| 50.0| |tenant 2|2015| 800|-46.67| +--------+----+-----+------+
Problem-4:
How many customers placed orders every month?
No comments:
Post a Comment