Analytics function using pyspark DataFrame:
from pyspark.sql.types import *
from pyspark.sql.window import Window
df = spark.read.format("csv").option("header","true").load("file:///Users/furqan/data/emp.txt")
+-----+------+-----------+-------+----------+-------+------+
|empno| ename|designation|manager| hire_date| sal|deptno|
+-----+------+-----------+-------+----------+-------+------+
| 7369| SMITH| CLERK| 7902|1980-12-17| 800.00| 20|
| 7499| ALLEN| SALESMAN| 7698|1981-02-20|1600.00| 30|
| 7521| WARD| SALESMAN| 7698|1981-02-22|1250.00| 30|
| 7566| JONES| MANAGER| 7839|1981-04-02|2975.00| 20|
| 7654|MARTIN| SALESMAN| 7698|1981-09-28|1250.00| 30|
| 7698| BLAKE| MANAGER| 7839|1981-05-01|2850.00| 30|
| 7782| CLARK| MANAGER| 7839|1981-06-09|2450.00| 10|
| 7788| SCOTT| ANALYST| 7566|1982-12-09|3000.00| 20|
| 7839| KING| PRESIDENT| NULL|1981-11-17|5000.00| 10|
| 7844|TURNER| SALESMAN| 7698|1981-09-08|1500.00| 30|
| 7876| ADAMS| CLERK| 7788|1983-01-12|1100.00| 20|
| 7900| JAMES| CLERK| 7698|1981-12-03| 950.00| 30|
| 7902| FORD| ANALYST| 7566|1981-12-03|3000.00| 20|
| 7934|MILLER| CLERK| 7782|1982-01-23|1300.00| 10|
+-----+------+-----------+-------+----------+-------+------+
df.printSchema()
root
|-- empno: string (nullable = true)
|-- ename: string (nullable = true)
|-- designation: string (nullable = true)
|-- manager: string (nullable = true)
|-- hire_date: string (nullable = true)
|-- sal: string (nullable = true)
|-- deptno: string (nullable = true)
df = df.withColumn("sal",df["sal"].cast(IntegerType()))
df = df.withColumn("deptno",df["deptno"].cast(IntegerType()))
df.printSchema()
root
|-- empno: string (nullable = true)
|-- ename: string (nullable = true)
|-- designation: string (nullable = true)
|-- manager: string (nullable = true)
|-- hire_date: string (nullable = true)
|-- sal: integer (nullable = true)
|-- deptno: integer (nullable = true)
1) Maximum Salary based on each Department:
Approach-1:
window = Window.partitionBy("deptno").orderBy(df["sal"].desc())
df.withColumn("rn",dense_rank().over(window)).show()
+-----+------+-----------+-------+----------+----+------+---+
|empno| ename|designation|manager| hire_date| sal|deptno| rn|
+-----+------+-----------+-------+----------+----+------+---+
| 7788| SCOTT| ANALYST| 7566|1982-12-09|3000| 20| 1|
| 7902| FORD| ANALYST| 7566|1981-12-03|3000| 20| 1|
| 7566| JONES| MANAGER| 7839|1981-04-02|2975| 20| 2|
| 7876| ADAMS| CLERK| 7788|1983-01-12|1100| 20| 3|
| 7369| SMITH| CLERK| 7902|1980-12-17| 800| 20| 4|
| 7839| KING| PRESIDENT| NULL|1981-11-17|5000| 10| 1|
| 7782| CLARK| MANAGER| 7839|1981-06-09|2450| 10| 2|
| 7934|MILLER| CLERK| 7782|1982-01-23|1300| 10| 3|
| 7698| BLAKE| MANAGER| 7839|1981-05-01|2850| 30| 1|
| 7499| ALLEN| SALESMAN| 7698|1981-02-20|1600| 30| 2|
| 7844|TURNER| SALESMAN| 7698|1981-09-08|1500| 30| 3|
| 7521| WARD| SALESMAN| 7698|1981-02-22|1250| 30| 4|
| 7654|MARTIN| SALESMAN| 7698|1981-09-28|1250| 30| 4|
| 7900| JAMES| CLERK| 7698|1981-12-03| 950| 30| 5|
+-----+------+-----------+-------+----------+----+------+---+
df.withColumn("rn",dense_rank().over(window)).filter("rn==1").drop("rn").show()
+-----+-----+-----------+-------+----------+----+------+
|empno|ename|designation|manager| hire_date| sal|deptno|
+-----+-----+-----------+-------+----------+----+------+
| 7788|SCOTT| ANALYST| 7566|1982-12-09|3000| 20|
| 7902| FORD| ANALYST| 7566|1981-12-03|3000| 20|
| 7839| KING| PRESIDENT| NULL|1981-11-17|5000| 10|
| 7698|BLAKE| MANAGER| 7839|1981-05-01|2850| 30|
+-----+-----+-----------+-------+----------+----+------+
Approach-2:
df.select(df["*"],dense_rank().over(Window.partitionBy("deptno").orderBy(df["sal"].desc())).alias("rn")).filter("rn==2").drop("rn").show()
+-----+-----+-----------+-------+----------+----+------+
|empno|ename|designation|manager| hire_date| sal|deptno|
+-----+-----+-----------+-------+----------+----+------+
| 7566|JONES| MANAGER| 7839|1981-04-02|2975| 20|
| 7782|CLARK| MANAGER| 7839|1981-06-09|2450| 10|
| 7499|ALLEN| SALESMAN| 7698|1981-02-20|1600| 30|
+-----+-----+-----------+-------+----------+----+------+
No comments:
Post a Comment