PySpark-RDD-Operation


                                       PySpark RDD Operation on Employee Data:

emp_data = sc.textFile("file:///Users/furqan/emp.txt")

emp_data.collect()

[u'empno,ename,designation,manager,hire_date,sal,deptno', u'7369,SMITH,CLERK,7902,1980-12-17,800.00,20', u'7499,ALLEN,SALESMAN,7698,1981-02-20,1600.00,30', u'7521,WARD,SALESMAN,7698,1981-02-22,1250.00,30', u'7566,JONES,MANAGER,7839,1981-04-02,2975.00,20', u'7654,MARTIN,SALESMAN,7698,1981-09-28,1250.00,30', u'7698,BLAKE,MANAGER,7839,1981-05-01,2850.00,30', u'7782,CLARK,MANAGER,7839,1981-06-09,2450.00,10', u'7788,SCOTT,ANALYST,7566,1982-12-09,3000.00,20', u'7839,KING,PRESIDENT,NULL,1981-11-17,5000.00,10', u'7844,TURNER,SALESMAN,7698,1981-09-08,1500.00,30', u'7876,ADAMS,CLERK,7788,1983-01-12,1100.00,20', u'7900,JAMES,CLERK,7698,1981-12-03,950.00,30', u'7902,FORD,ANALYST,7566,1981-12-03,3000.00,20', u'7934,MILLER,CLERK,7782,1982-01-23,1300.00,10']

emp_header=emp_data.first()

emp_data_without_header = emp_data.filter(lambda x:x!=emp_header)

rdd1 = emp_data_without_header.map(lambda x:x.split(","))

rdd1.collect()

[[u'7369', u'SMITH', u'CLERK', u'7902', u'1980-12-17', u'800.00', u'20'], 
[u'7499', u'ALLEN', u'SALESMAN', u'7698', u'1981-02-20', u'1600.00', u'30'], 
[u'7521', u'WARD', u'SALESMAN', u'7698', u'1981-02-22', u'1250.00', u'30'], 
[u'7566', u'JONES', u'MANAGER', u'7839', u'1981-04-02', u'2975.00', u'20'], 
[u'7654', u'MARTIN', u'SALESMAN', u'7698', u'1981-09-28', u'1250.00', u'30'], 
[u'7698', u'BLAKE', u'MANAGER', u'7839', u'1981-05-01', u'2850.00', u'30'], 
[u'7782', u'CLARK', u'MANAGER', u'7839', u'1981-06-09', u'2450.00', u'10'], 
[u'7788', u'SCOTT', u'ANALYST', u'7566', u'1982-12-09', u'3000.00', u'20'], 
[u'7839', u'KING', u'PRESIDENT', u'NULL', u'1981-11-17', u'5000.00', u'10'], 
[u'7844', u'TURNER', u'SALESMAN', u'7698', u'1981-09-08', u'1500.00', u'30'], 
[u'7876', u'ADAMS', u'CLERK', u'7788', u'1983-01-12', u'1100.00', u'20'], 
[u'7900', u'JAMES', u'CLERK', u'7698', u'1981-12-03', u'950.00', u'30'], 
[u'7902', u'FORD', u'ANALYST', u'7566', u'1981-12-03', u'3000.00', u'20'], 
[u'7934', u'MILLER', u'CLERK', u'7782', u'1982-01-23', u'1300.00', u'10']]


1) get the list of all employees in each Department:

rdd2 = rdd1.map(lambda n: (int(n[6]),str(n[1]))).groupByKey().mapValues(list)

for t in rdd2.collect():
    print(t[0], [v for v in t[1]])

(10, ['CLARK', 'KING', 'MILLER'])
(20, ['SMITH', 'JONES', 'SCOTT', 'ADAMS', 'FORD'])
(30, ['ALLEN', 'WARD', 'MARTIN', 'BLAKE', 'TURNER', 'JAMES'])

2)  get the total no of employees in a department:

    rdd3 = rdd2.map(lambda x:(x[0],len(x[1])))
    rdd3.collect()

    [(10, 3), (20, 5), (30, 6)]


            Sum of Salary based on Department:

We use above employee data for this operation:

>>> def parseLine(line):
...     fields = line.split(",")
...     deptno = int(fields[6])
...     sal = float(fields[5])
...     return (deptno,sal)


emp_header=emp_data.first() 

emp_data_without_header = emp_data.filter(lambda x:x!=emp_header)

rdd1 = emp_data_without_header.map(parseLine)


rdd1.collect()

[(20, 800.0), (30, 1600.0), (30, 1250.0), (20, 2975.0), (30, 1250.0), (30, 2850.0), (10, 2450.0), (20, 3000.0), (10, 5000.0), (30, 1500.0), (20, 1100.0), (30, 950.0), (20, 3000.0), (10, 1300.0)]


rdd1.collect()

[(20, 800.0), (30, 1600.0), (30, 1250.0), (20, 2975.0), (30, 1250.0), (30, 2850.0), (10, 2450.0), (20, 3000.0), (10, 5000.0), (30, 1500.0), (20, 1100.0), (30, 950.0), (20, 3000.0), (10, 1300.0)]

>>> rdd_sum = rdd1.reduceByKey(lambda x,y:x+y)
>>> rdd_sum.collect()

[(10, 8750.0), (20, 10875.0), (30, 9400.0)]


                 Maximum of Salary:

emp_sal_list = emp_data_without_header.map(lambda x:x.split(",")).map(lambda x:float(x[5]))


emp_sal_list.collect()

[800.0, 1600.0, 1250.0, 2975.0, 1250.0, 2850.0, 2450.0, 3000.0, 5000.0, 1500.0, 1100.0, 950.0, 3000.0, 1300.0]


emp_max_sal= emp_sal_list.max()

print emp_max_sal

5000.0



No comments:

Post a Comment

Pages