Pivot/Transpose in Spark/PySpark-DataFrame:

Example-1:

df = spark.read.format("csv").option("header","true").load("file:///Users/furqan/data/emp.txt")
>>> df.show()
+-----+------+-----------+-------+----------+-------+------+
|empno| ename|designation|manager| hire_date|    sal|deptno|
+-----+------+-----------+-------+----------+-------+------+
| 7369| SMITH|      CLERK|   7902|1980-12-17| 800.00|    20|
| 7499| ALLEN|   SALESMAN|   7698|1981-02-20|1600.00|    30|
| 7521|  WARD|   SALESMAN|   7698|1981-02-22|1250.00|    30|
| 7566| JONES|    MANAGER|   7839|1981-04-02|2975.00|    20|
| 7654|MARTIN|   SALESMAN|   7698|1981-09-28|1250.00|    30|
| 7698| BLAKE|    MANAGER|   7839|1981-05-01|2850.00|    30|
| 7782| CLARK|    MANAGER|   7839|1981-06-09|2450.00|    10|
| 7788| SCOTT|    ANALYST|   7566|1982-12-09|3000.00|    20|
| 7839|  KING|  PRESIDENT|   NULL|1981-11-17|5000.00|    10|
| 7844|TURNER|   SALESMAN|   7698|1981-09-08|1500.00|    30|
| 7876| ADAMS|      CLERK|   7788|1983-01-12|1100.00|    20|
| 7900| JAMES|      CLERK|   7698|1981-12-03| 950.00|    30|
| 7902|  FORD|    ANALYST|   7566|1981-12-03|3000.00|    20|
| 7934|MILLER|      CLERK|   7782|1982-01-23|1300.00|    10|
+-----+------+-----------+-------+----------+-------+------+

>>> df=df.withColumn("sal",df["sal"].cast('int'))
>>> df=df.withColumn("deptno",df["deptno"].cast('int'))
>>> df=df.withColumn("empno",df["empno"].cast('int'))

>>> df.groupBy("deptno").pivot("deptno").sum("sal").show()
+------+----+-----+----+
|deptno|  10|   20|  30|
+------+----+-----+----+
|    20|null|10875|null|
|    10|8750| null|null|
|    30|null| null|9400|
+------+----+-----+----+

Example-2:

df = spark.read.format("csv").option("header","True").load("file:///Users/furqan/data/pivot.csv")

df.show()
+---+---+-----+---+
|  A|  B|    C|  D|
+---+---+-----+---+
|foo|one|small|  1|
|foo|one|large|  2|
|foo|one|large|  2|
|foo|two|small|  3|
|foo|two|small|  3|
|bar|one|large|  4|
|bar|one|small|  5|
|bar|two|small|  6|
|bar|two|large|  7|
+---+---+-----+---+

from pyspark.sql.types import IntegerType
df=df.withColumn("D",df["D"].cast(IntegerType()))
df.printSchema()
root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)
 |-- D: integer (nullable = true)


>>> df.groupBy("A","B").pivot("C").sum("D").show()
+---+---+-----+-----+                                       
|  A|  B|large|small|
+---+---+-----+-----+
|foo|one|    4|    1|
|foo|two| null|    6|
|bar|two|    7|    6|
|bar|one|    4|    5|
+---+---+-----+-----+

OR

we can use .agg function:--

df.groupBy("A", "B").pivot("C").agg({'D':'sum'}).show()

No comments:

Post a Comment

Pages