PySpark RDD Operation on Employee Data:
emp_data = sc.textFile("file:///Users/furqan/emp.txt")
emp_data.collect()
[u'empno,ename,designation,manager,hire_date,sal,deptno', u'7369,SMITH,CLERK,7902,1980-12-17,800.00,20', u'7499,ALLEN,SALESMAN,7698,1981-02-20,1600.00,30', u'7521,WARD,SALESMAN,7698,1981-02-22,1250.00,30', u'7566,JONES,MANAGER,7839,1981-04-02,2975.00,20', u'7654,MARTIN,SALESMAN,7698,1981-09-28,1250.00,30', u'7698,BLAKE,MANAGER,7839,1981-05-01,2850.00,30', u'7782,CLARK,MANAGER,7839,1981-06-09,2450.00,10', u'7788,SCOTT,ANALYST,7566,1982-12-09,3000.00,20', u'7839,KING,PRESIDENT,NULL,1981-11-17,5000.00,10', u'7844,TURNER,SALESMAN,7698,1981-09-08,1500.00,30', u'7876,ADAMS,CLERK,7788,1983-01-12,1100.00,20', u'7900,JAMES,CLERK,7698,1981-12-03,950.00,30', u'7902,FORD,ANALYST,7566,1981-12-03,3000.00,20', u'7934,MILLER,CLERK,7782,1982-01-23,1300.00,10']
emp_header=emp_data.first()
emp_data_without_header = emp_data.filter(lambda x:x!=emp_header)
rdd1 = emp_data_without_header.map(lambda x:x.split(","))
rdd1.collect()
[[u'7369', u'SMITH', u'CLERK', u'7902', u'1980-12-17', u'800.00', u'20'],
[u'7499', u'ALLEN', u'SALESMAN', u'7698', u'1981-02-20', u'1600.00', u'30'],
[u'7521', u'WARD', u'SALESMAN', u'7698', u'1981-02-22', u'1250.00', u'30'],
[u'7566', u'JONES', u'MANAGER', u'7839', u'1981-04-02', u'2975.00', u'20'],
[u'7654', u'MARTIN', u'SALESMAN', u'7698', u'1981-09-28', u'1250.00', u'30'],
[u'7698', u'BLAKE', u'MANAGER', u'7839', u'1981-05-01', u'2850.00', u'30'],
[u'7782', u'CLARK', u'MANAGER', u'7839', u'1981-06-09', u'2450.00', u'10'],
[u'7788', u'SCOTT', u'ANALYST', u'7566', u'1982-12-09', u'3000.00', u'20'],
[u'7839', u'KING', u'PRESIDENT', u'NULL', u'1981-11-17', u'5000.00', u'10'],
[u'7844', u'TURNER', u'SALESMAN', u'7698', u'1981-09-08', u'1500.00', u'30'],
[u'7876', u'ADAMS', u'CLERK', u'7788', u'1983-01-12', u'1100.00', u'20'],
[u'7521', u'WARD', u'SALESMAN', u'7698', u'1981-02-22', u'1250.00', u'30'],
[u'7566', u'JONES', u'MANAGER', u'7839', u'1981-04-02', u'2975.00', u'20'],
[u'7654', u'MARTIN', u'SALESMAN', u'7698', u'1981-09-28', u'1250.00', u'30'],
[u'7698', u'BLAKE', u'MANAGER', u'7839', u'1981-05-01', u'2850.00', u'30'],
[u'7782', u'CLARK', u'MANAGER', u'7839', u'1981-06-09', u'2450.00', u'10'],
[u'7788', u'SCOTT', u'ANALYST', u'7566', u'1982-12-09', u'3000.00', u'20'],
[u'7839', u'KING', u'PRESIDENT', u'NULL', u'1981-11-17', u'5000.00', u'10'],
[u'7844', u'TURNER', u'SALESMAN', u'7698', u'1981-09-08', u'1500.00', u'30'],
[u'7876', u'ADAMS', u'CLERK', u'7788', u'1983-01-12', u'1100.00', u'20'],
[u'7900', u'JAMES', u'CLERK', u'7698', u'1981-12-03', u'950.00', u'30'],
[u'7902', u'FORD', u'ANALYST', u'7566', u'1981-12-03', u'3000.00', u'20'],
[u'7934', u'MILLER', u'CLERK', u'7782', u'1982-01-23', u'1300.00', u'10']]
[u'7934', u'MILLER', u'CLERK', u'7782', u'1982-01-23', u'1300.00', u'10']]
1) get the list of all employees in each Department:
rdd2 = rdd1.map(lambda n: (int(n[6]),str(n[1]))).groupByKey().mapValues(list)
for t in rdd2.collect():
print(t[0], [v for v in t[1]])
(10, ['CLARK', 'KING', 'MILLER'])
(20, ['SMITH', 'JONES', 'SCOTT', 'ADAMS', 'FORD'])
(30, ['ALLEN', 'WARD', 'MARTIN', 'BLAKE', 'TURNER', 'JAMES'])
2) get the total no of employees in a department:
rdd3 = rdd2.map(lambda x:(x[0],len(x[1])))
rdd3.collect()
[(10, 3), (20, 5), (30, 6)]
Sum of Salary based on Department:
We use above employee data for this operation:
>>> def parseLine(line):
... fields = line.split(",")
... deptno = int(fields[6])
... sal = float(fields[5])
... return (deptno,sal)
emp_data_without_header = emp_data.filter(lambda x:x!=emp_header)
rdd1 = emp_data_without_header.map(parseLine)
rdd1.collect()
[(20, 800.0), (30, 1600.0), (30, 1250.0), (20, 2975.0), (30, 1250.0), (30, 2850.0), (10, 2450.0), (20, 3000.0), (10, 5000.0), (30, 1500.0), (20, 1100.0), (30, 950.0), (20, 3000.0), (10, 1300.0)]
rdd1.collect()
[(20, 800.0), (30, 1600.0), (30, 1250.0), (20, 2975.0), (30, 1250.0), (30, 2850.0), (10, 2450.0), (20, 3000.0), (10, 5000.0), (30, 1500.0), (20, 1100.0), (30, 950.0), (20, 3000.0), (10, 1300.0)]
>>> rdd_sum = rdd1.reduceByKey(lambda x,y:x+y)
>>> rdd_sum.collect()
[(10, 8750.0), (20, 10875.0), (30, 9400.0)]
Maximum of Salary:
emp_sal_list = emp_data_without_header.map(lambda x:x.split(",")).map(lambda x:float(x[5]))
emp_sal_list.collect()
[800.0, 1600.0, 1250.0, 2975.0, 1250.0, 2850.0, 2450.0, 3000.0, 5000.0, 1500.0, 1100.0, 950.0, 3000.0, 1300.0]
emp_max_sal= emp_sal_list.max()
print emp_max_sal
5000.0
No comments:
Post a Comment