Window/Analytics Function in PySpark


                                  Analytics function using pyspark DataFrame:

from pyspark.sql.types import *
from pyspark.sql.window import Window

df = spark.read.format("csv").option("header","true").load("file:///Users/furqan/data/emp.txt")

+-----+------+-----------+-------+----------+-------+------+
|empno| ename|designation|manager| hire_date|    sal|deptno|
+-----+------+-----------+-------+----------+-------+------+
| 7369| SMITH|      CLERK|   7902|1980-12-17| 800.00|    20|
| 7499| ALLEN|   SALESMAN|   7698|1981-02-20|1600.00|    30|
| 7521|  WARD|   SALESMAN|   7698|1981-02-22|1250.00|    30|
| 7566| JONES|    MANAGER|   7839|1981-04-02|2975.00|    20|
| 7654|MARTIN|   SALESMAN|   7698|1981-09-28|1250.00|    30|
| 7698| BLAKE|    MANAGER|   7839|1981-05-01|2850.00|    30|
| 7782| CLARK|    MANAGER|   7839|1981-06-09|2450.00|    10|
| 7788| SCOTT|    ANALYST|   7566|1982-12-09|3000.00|    20|
| 7839|  KING|  PRESIDENT|   NULL|1981-11-17|5000.00|    10|
| 7844|TURNER|   SALESMAN|   7698|1981-09-08|1500.00|    30|
| 7876| ADAMS|      CLERK|   7788|1983-01-12|1100.00|    20|
| 7900| JAMES|      CLERK|   7698|1981-12-03| 950.00|    30|
| 7902|  FORD|    ANALYST|   7566|1981-12-03|3000.00|    20|
| 7934|MILLER|      CLERK|   7782|1982-01-23|1300.00|    10|
+-----+------+-----------+-------+----------+-------+------+


df.printSchema()
root
 |-- empno: string (nullable = true)
 |-- ename: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- manager: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- deptno: string (nullable = true)


df = df.withColumn("sal",df["sal"].cast(IntegerType()))
df = df.withColumn("deptno",df["deptno"].cast(IntegerType()))

df.printSchema()
root
 |-- empno: string (nullable = true)
 |-- ename: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- manager: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- deptno: integer (nullable = true)


1) Maximum Salary based on each Department:

Approach-1:

window = Window.partitionBy("deptno").orderBy(df["sal"].desc())

df.withColumn("rn",dense_rank().over(window)).show()
+-----+------+-----------+-------+----------+----+------+---+
|empno| ename|designation|manager| hire_date| sal|deptno| rn|
+-----+------+-----------+-------+----------+----+------+---+
| 7788| SCOTT|    ANALYST|   7566|1982-12-09|3000|    20|  1|
| 7902|  FORD|    ANALYST|   7566|1981-12-03|3000|    20|  1|
| 7566| JONES|    MANAGER|   7839|1981-04-02|2975|    20|  2|
| 7876| ADAMS|      CLERK|   7788|1983-01-12|1100|    20|  3|
| 7369| SMITH|      CLERK|   7902|1980-12-17| 800|    20|  4|
| 7839|  KING|  PRESIDENT|   NULL|1981-11-17|5000|    10|  1|
| 7782| CLARK|    MANAGER|   7839|1981-06-09|2450|    10|  2|
| 7934|MILLER|      CLERK|   7782|1982-01-23|1300|    10|  3|
| 7698| BLAKE|    MANAGER|   7839|1981-05-01|2850|    30|  1|
| 7499| ALLEN|   SALESMAN|   7698|1981-02-20|1600|    30|  2|
| 7844|TURNER|   SALESMAN|   7698|1981-09-08|1500|    30|  3|
| 7521|  WARD|   SALESMAN|   7698|1981-02-22|1250|    30|  4|
| 7654|MARTIN|   SALESMAN|   7698|1981-09-28|1250|    30|  4|
| 7900| JAMES|      CLERK|   7698|1981-12-03| 950|    30|  5|
+-----+------+-----------+-------+----------+----+------+---+

df.withColumn("rn",dense_rank().over(window)).filter("rn==1").drop("rn").show()
+-----+-----+-----------+-------+----------+----+------+
|empno|ename|designation|manager| hire_date| sal|deptno|
+-----+-----+-----------+-------+----------+----+------+
| 7788|SCOTT|    ANALYST|   7566|1982-12-09|3000|    20|
| 7902| FORD|    ANALYST|   7566|1981-12-03|3000|    20|
| 7839| KING|  PRESIDENT|   NULL|1981-11-17|5000|    10|
| 7698|BLAKE|    MANAGER|   7839|1981-05-01|2850|    30|
+-----+-----+-----------+-------+----------+----+------+

Approach-2:

df.select(df["*"],dense_rank().over(Window.partitionBy("deptno").orderBy(df["sal"].desc())).alias("rn")).filter("rn==2").drop("rn").show()

+-----+-----+-----------+-------+----------+----+------+
|empno|ename|designation|manager| hire_date| sal|deptno|
+-----+-----+-----------+-------+----------+----+------+
| 7566|JONES|    MANAGER|   7839|1981-04-02|2975|    20|
| 7782|CLARK|    MANAGER|   7839|1981-06-09|2450|    10|
| 7499|ALLEN|   SALESMAN|   7698|1981-02-20|1600|    30|
+-----+-----+-----------+-------+----------+----+------+















No comments:

Post a Comment

Pages