from pyspark.sql.window import Window
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import array, col, explode, struct, lit, udf
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.types import DateType
from pyspark.sql import Window
from pyspark.sql import functions as F
import pyspark.sql.functions as func
import hashlib
####Existing Target-HIST Table
df = sc.parallelize([(1,100,"Jhoney","Bengaluru","2018-09-25","2099-12-31","Y"),
(2,200,"Stuart","Hyderbad","2018-09-25","2099-12-31","Y"),
(3,300,"Moxie","Pune","2018-09-25","2099-12-31","Y"),
(4,400,"Jeff","Mumbai","2018-09-25","2099-12-31","Y")]).toDF(["SK","ID","NAME","CITY","LOAD_DT","END_DT","ACTIVE_FLAG"])
+---+---+------+---------+----------+----------+-----------+
| SK| ID| NAME| CITY| LOAD_DT| END_DT|ACTIVE_FLAG|
+---+---+------+---------+----------+----------+-----------+
| 1|100|Jhoney|Bengaluru|2018-09-25|2099-12-31| Y|
| 2|200|Stuart| Hyderbad|2018-09-25|2099-12-31| Y|
| 3|300| Moxie| Pune|2018-09-25|2099-12-31| Y|
| 4|400| Jeff| Mumbai|2018-09-25|2099-12-31| Y|
+---+---+------+---------+----------+----------+-----------+
####Delta Source-Daily Load
src_df1 = sc.parallelize([(500,"Alex","Chennai"),(600,"Peter","Delhi"),(400,"Jeff","Kolkata")])
.toDF(["ID","NAME","CITY"])
+---+-----+-------+
| ID| NAME| CITY|
+---+-----+-------+
|500| Alex|Chennai|
|600|Peter| Delhi|
|400| Jeff|Kolkata|
+---+-----+-------+
####Maximu sarrogate
rdd1 = df.agg({"SK": "max"}).rdd.map(list)
for i in rdd1.collect():
max_sk_id = i[0]
print("max_sk_id: ", max_sk_id)
('max_sk_id: ', 4)
###Creating UDF function to generate the hash values(MD5 function)
hash_udf = func.udf(lambda x: hashlib.sha256(str(x)).hexdigest().upper())
###Filtering only active records from Target for LKP with delta source and generating the hash function for comparison columns
tgt_df = df.where(df.ACTIVE_FLAG=="Y").select(col("SK").alias("TGT_SK"),col("ID").alias("TGT_ID"),col("NAME").alias("TGT_NAME"),
col("CITY").alias("TGT_CITY"),col("LOAD_DT").alias("TGT_LOAD_DT"),col("END_DT").alias("TGT_END_DT"),col("ACTIVE_FLAG").
alias("TGT_ACTIVE_FLAG")).withColumn("TGT_CITY_haskey", hash_udf(col("TGT_CITY")))
+------+------+--------+---------+-----------+----------+---------------+--------------------+
|TGT_SK|TGT_ID|TGT_NAME| TGT_CITY|TGT_LOAD_DT|TGT_END_DT|TGT_ACTIVE_FLAG| TGT_CITY_haskey|
+------+------+--------+---------+-----------+----------+---------------+--------------------+
| 1| 100| Jhoney|Bengaluru| 2018-09-25|2099-12-31| Y|282F96B099630502F...|
| 2| 200| Stuart| Hyderbad| 2018-09-25|2099-12-31| Y|2709AAB323A83EDB0...|
| 3| 300| Moxie| Pune| 2018-09-25|2099-12-31| Y|E7C90B789FE2A7D16...|
| 4| 400| Jeff| Mumbai| 2018-09-25|2099-12-31| Y|9F2BA6981E36EA0FC...|
+------+------+--------+---------+-----------+----------+---------------+--------------------+
###generating the hash function for comparission columns
src_df = src_df1.withColumn("SRC_CITY_haskey", hash_udf(src_df1.CITY))
+---+-----+-------+--------------------+
| ID| NAME| CITY| SRC_CITY_haskey|
+---+-----+-------+--------------------+
|500| Alex|Chennai|7B7F15F9FFBEB905C...|
|600|Peter| Delhi|7D1550B844FF586A6...|
|400| Jeff|Kolkata|CDE7276E7BB8FE013...|
+---+-----+-------+--------------------+
###Joining delta source with Target DF
###Extracting the new records.
new_df = src_df.join(tgt_df, (src_df.ID==tgt_df.TGT_ID), "left_outer").where(tgt_df["TGT_SK"].isNull()).select(src_df["*"]).drop("SRC_CITY_haskey")
##print("Total new records retrived :",new_df.count())
+---+-----+-------+
| ID| NAME| CITY|
+---+-----+-------+
|600|Peter| Delhi|
|500| Alex|Chennai|
+---+-----+-------+
###extracting changed records.
change_df = src_df.join(tgt_df, (src_df.ID==tgt_df.TGT_ID), "left_outer").where(src_df["SRC_CITY_haskey"]!=tgt_df["TGT_CITY_haskey"])
+---+----+-------+--------------------+------+------+--------+--------+-----------+----------+---------------+--------------------+
| ID|NAME| CITY| SRC_CITY_haskey|TGT_SK|TGT_ID|TGT_NAME|TGT_CITY|TGT_LOAD_DT|TGT_END_DT|TGT_ACTIVE_FLAG| TGT_CITY_haskey|
+---+----+-------+--------------------+------+------+--------+--------+-----------+----------+---------------+--------------------+
|400|Jeff|Kolkata|CDE7276E7BB8FE013...| 4| 400| Jeff| Mumbai| 2018-09-25|2099-12-31| Y|9F2BA6981E36EA0FC...|
+---+----+-------+--------------------+------+------+--------+--------+-----------+----------+---------------+--------------------+
change_df_src = change_df.drop("SRC_CITY_haskey","TGT_SK","TGT_ID","TGT_NAME","TGT_CITY","TGT_CITY_haskey","TGT_LOAD_DT","TGT_END_DT","TGT_ACTIVE_FLAG")
##print("Total existing records retrived :",change_df.count())
+---+----+-------+
| ID|NAME| CITY|
+---+----+-------+
|400|Jeff|Kolkata|
+---+----+-------+
###Combining both new and changed records(records matching all the columns will be rejetced )
delta_df = new_df.union(change_df_src)
+---+-----+-------+
| ID| NAME| CITY|
+---+-----+-------+
|600|Peter| Delhi|
|500| Alex|Chennai|
|400| Jeff|Kolkata|
+---+-----+-------+
#####1st part - from delta source including new and changed records which are for INSERT and added required columns.
delta_load_I = delta_df.withColumn("SK", lit(max_sk_id + row_number().over(Window.partitionBy(lit(1)).orderBy(lit(1))))).
withColumn("LOAD_DT", lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))).
withColumn("END_DT", lit("2099-12-31")).
withColumn("ACTIVE_FLAG", lit("Y"))
+---+-----+-------+---+-------------------+----------+-----------+
| ID| NAME| CITY| SK| LOAD_DT| END_DT|ACTIVE_FLAG|
+---+-----+-------+---+-------------------+----------+-----------+
|600|Peter| Delhi| 5|2018-12-19 13:52:44|2099-12-31| Y|
|500| Alex|Chennai| 6|2018-12-19 13:52:44|2099-12-31| Y|
|400| Jeff|Kolkata| 7|2018-12-19 13:52:44|2099-12-31| Y|
+---+-----+-------+---+-------------------+----------+-----------+
#####2nd part - from hist/Tgt table, which needs to be flagged/update as Inactive(N)
hist_tgt_df_U = change_df.select("TGT_ID","TGT_NAME","TGT_CITY","TGT_SK","TGT_LOAD_DT").
withColumn("END_DT", lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))).withColumn("ACTIVE_FLAG",lit("N"))
+------+--------+--------+------+-----------+-------------------+-----------+
|TGT_ID|TGT_NAME|TGT_CITY|TGT_SK|TGT_LOAD_DT| END_DT|ACTIVE_FLAG|
+------+--------+--------+------+-----------+-------------------+-----------+
| 400| Jeff| Mumbai| 4| 2018-09-25|2018-12-19 14:24:17| N|
+------+--------+--------+------+-----------+-------------------+-----------+
#####3rd part - remaining records from His/TGT table
hist_tgt_df_R = df.join(hist_tgt_df_U, (df.ID==hist_tgt_df_U.TGT_ID), "left_outer").
where(hist_tgt_df_U["TGT_ID"].isNull()).
select("ID","NAME","CITY","SK","LOAD_DT",df["END_DT"],df["ACTIVE_FLAG"])
+---+------+---------+---+----------+----------+-----------+
| ID| NAME| CITY| SK| LOAD_DT| END_DT|ACTIVE_FLAG|
+---+------+---------+---+----------+----------+-----------+
|100|Jhoney|Bengaluru| 1|2018-09-25|2099-12-31| Y|
|200|Stuart| Hyderbad| 2|2018-09-25|2099-12-31| Y|
|300| Moxie| Pune| 3|2018-09-25|2099-12-31| Y|
+---+------+---------+---+----------+----------+-----------+
####Final dataframe for loading
final_hist_tgt = delta_load_I.unionAll(hist_tgt_df_U).unionAll(hist_tgt_df_R)
+---+------+---------+---+-------------------+-------------------+-----------+
| ID| NAME| CITY| SK| LOAD_DT| END_DT|ACTIVE_FLAG|
+---+------+---------+---+-------------------+-------------------+-----------+
|600| Peter| Delhi| 5|2018-12-19 13:52:44| 2099-12-31| Y|
|500| Alex| Chennai| 6|2018-12-19 13:52:44| 2099-12-31| Y|
|400| Jeff| Kolkata| 7|2018-12-19 13:52:44| 2099-12-31| Y|
|400| Jeff| Mumbai| 4| 2018-09-25|2018-12-19 14:24:17| N|
|100|Jhoney|Bengaluru| 1| 2018-09-25| 2099-12-31| Y|
|200|Stuart| Hyderbad| 2| 2018-09-25| 2099-12-31| Y|
|300| Moxie| Pune| 3| 2018-09-25| 2099-12-31| Y|
+---+------+---------+---+-------------------+-------------------+-----------+
##final_hist_tgt.select("SK","ID","NAME","CITY","LOAD_DT","END_DT","ACTIVE_FLAG").orderBy("SK").show()
final_hist_tgt.orderBy("SK").write.csv()
+---+------+---------+---+-------------------+-------------------+-----------+
| ID| NAME| CITY| SK| LOAD_DT| END_DT|ACTIVE_FLAG|
+---+------+---------+---+-------------------+-------------------+-----------+
|100|Jhoney|Bengaluru| 1| 2018-09-25| 2099-12-31| Y|
|200|Stuart| Hyderbad| 2| 2018-09-25| 2099-12-31| Y|
|300| Moxie| Pune| 3| 2018-09-25| 2099-12-31| Y|
|400| Jeff| Mumbai| 4| 2018-09-25|2018-12-19 14:24:17| N|
|600| Peter| Delhi| 5|2018-12-19 13:52:44| 2099-12-31| Y|
|500| Alex| Chennai| 6|2018-12-19 13:52:44| 2099-12-31| Y|
|400| Jeff| Kolkata| 7|2018-12-19 13:52:44| 2099-12-31| Y|
+---+------+---------+---+-------------------+-------------------+-----------+
No comments:
Post a Comment