SCD2 implementation using PySpark.


from pyspark.sql.window import Window
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import array, col, explode, struct, lit, udf
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.types import DateType
from pyspark.sql import Window
from pyspark.sql import functions as F
import pyspark.sql.functions as  func
import hashlib

####Existing Target-HIST Table

df = sc.parallelize([(1,100,"Jhoney","Bengaluru","2018-09-25","2099-12-31","Y"),
(2,200,"Stuart","Hyderbad","2018-09-25","2099-12-31","Y"),
(3,300,"Moxie","Pune","2018-09-25","2099-12-31","Y"),
(4,400,"Jeff","Mumbai","2018-09-25","2099-12-31","Y")]).toDF(["SK","ID","NAME","CITY","LOAD_DT","END_DT","ACTIVE_FLAG"])

+---+---+------+---------+----------+----------+-----------+
| SK| ID|  NAME|     CITY|   LOAD_DT|    END_DT|ACTIVE_FLAG|
+---+---+------+---------+----------+----------+-----------+
|  1|100|Jhoney|Bengaluru|2018-09-25|2099-12-31|          Y|
|  2|200|Stuart| Hyderbad|2018-09-25|2099-12-31|           Y|
|  3|300| Moxie|     Pune|2018-09-25|2099-12-31|             Y|
|  4|400|  Jeff|   Mumbai|2018-09-25|2099-12-31|             Y|
+---+---+------+---------+----------+----------+-----------+


####Delta Source-Daily Load

src_df1 = sc.parallelize([(500,"Alex","Chennai"),(600,"Peter","Delhi"),(400,"Jeff","Kolkata")])
.toDF(["ID","NAME","CITY"])

+---+-----+-------+
| ID| NAME|   CITY|
+---+-----+-------+
|500| Alex|Chennai|
|600|Peter|  Delhi|
|400| Jeff|Kolkata|
+---+-----+-------+

####Maximu sarrogate

rdd1 = df.agg({"SK": "max"}).rdd.map(list)
for i in rdd1.collect():
     max_sk_id = i[0]
     print("max_sk_id: ", max_sk_id)


('max_sk_id: ', 4)

###Creating UDF function to generate the hash values(MD5 function)

hash_udf = func.udf(lambda x: hashlib.sha256(str(x)).hexdigest().upper())

###Filtering only active records from Target for LKP with delta source and generating the hash function for comparison columns

tgt_df = df.where(df.ACTIVE_FLAG=="Y").select(col("SK").alias("TGT_SK"),col("ID").alias("TGT_ID"),col("NAME").alias("TGT_NAME"),
col("CITY").alias("TGT_CITY"),col("LOAD_DT").alias("TGT_LOAD_DT"),col("END_DT").alias("TGT_END_DT"),col("ACTIVE_FLAG").
alias("TGT_ACTIVE_FLAG")).withColumn("TGT_CITY_haskey", hash_udf(col("TGT_CITY")))

+------+------+--------+---------+-----------+----------+---------------+--------------------+
|TGT_SK|TGT_ID|TGT_NAME| TGT_CITY|TGT_LOAD_DT|TGT_END_DT|TGT_ACTIVE_FLAG|     TGT_CITY_haskey|
+------+------+--------+---------+-----------+----------+---------------+--------------------+
|     1|   100|  Jhoney|Bengaluru| 2018-09-25|2099-12-31|              Y|282F96B099630502F...|
|     2|   200|  Stuart| Hyderbad| 2018-09-25|2099-12-31|              Y|2709AAB323A83EDB0...|
|     3|   300|   Moxie|     Pune| 2018-09-25|2099-12-31|              Y|E7C90B789FE2A7D16...|
|     4|   400|    Jeff|   Mumbai| 2018-09-25|2099-12-31|              Y|9F2BA6981E36EA0FC...|
+------+------+--------+---------+-----------+----------+---------------+--------------------+

###generating the hash function for comparission columns

src_df = src_df1.withColumn("SRC_CITY_haskey", hash_udf(src_df1.CITY))

+---+-----+-------+--------------------+
| ID| NAME|   CITY|     SRC_CITY_haskey|
+---+-----+-------+--------------------+
|500| Alex|Chennai|7B7F15F9FFBEB905C...|
|600|Peter|  Delhi|7D1550B844FF586A6...|
|400| Jeff|Kolkata|CDE7276E7BB8FE013...|
+---+-----+-------+--------------------+


###Joining delta source with Target DF
###Extracting the new records.

new_df = src_df.join(tgt_df, (src_df.ID==tgt_df.TGT_ID), "left_outer").where(tgt_df["TGT_SK"].isNull()).select(src_df["*"]).drop("SRC_CITY_haskey")
##print("Total new records retrived :",new_df.count())

+---+-----+-------+
| ID| NAME|   CITY|
+---+-----+-------+
|600|Peter|  Delhi|
|500| Alex|Chennai|
+---+-----+-------+

###extracting changed records.

change_df = src_df.join(tgt_df, (src_df.ID==tgt_df.TGT_ID), "left_outer").where(src_df["SRC_CITY_haskey"]!=tgt_df["TGT_CITY_haskey"])

+---+----+-------+--------------------+------+------+--------+--------+-----------+----------+---------------+--------------------+
| ID|NAME|   CITY|     SRC_CITY_haskey|TGT_SK|TGT_ID|TGT_NAME|TGT_CITY|TGT_LOAD_DT|TGT_END_DT|TGT_ACTIVE_FLAG|     TGT_CITY_haskey|
+---+----+-------+--------------------+------+------+--------+--------+-----------+----------+---------------+--------------------+
|400|Jeff|Kolkata|CDE7276E7BB8FE013...|     4|   400|    Jeff|  Mumbai| 2018-09-25|2099-12-31|              Y|9F2BA6981E36EA0FC...|
+---+----+-------+--------------------+------+------+--------+--------+-----------+----------+---------------+--------------------+


change_df_src = change_df.drop("SRC_CITY_haskey","TGT_SK","TGT_ID","TGT_NAME","TGT_CITY","TGT_CITY_haskey","TGT_LOAD_DT","TGT_END_DT","TGT_ACTIVE_FLAG")
##print("Total existing records retrived :",change_df.count())

+---+----+-------+
| ID|NAME|   CITY|
+---+----+-------+
|400|Jeff|Kolkata|
+---+----+-------+

###Combining both new and changed records(records matching all the columns will be rejetced )

delta_df = new_df.union(change_df_src)

+---+-----+-------+                                                           
| ID| NAME|   CITY|
+---+-----+-------+
|600|Peter|  Delhi|
|500| Alex|Chennai|
|400| Jeff|Kolkata|
+---+-----+-------+

#####1st part - from delta source including new and changed records which are for INSERT and added required columns.

delta_load_I = delta_df.withColumn("SK", lit(max_sk_id + row_number().over(Window.partitionBy(lit(1)).orderBy(lit(1))))).
   withColumn("LOAD_DT", lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))).
   withColumn("END_DT", lit("2099-12-31")).
   withColumn("ACTIVE_FLAG", lit("Y"))

+---+-----+-------+---+-------------------+----------+-----------+           
| ID| NAME|   CITY| SK|            LOAD_DT|    END_DT|ACTIVE_FLAG|
+---+-----+-------+---+-------------------+----------+-----------+
|600|Peter|  Delhi|  5|2018-12-19 13:52:44|2099-12-31|          Y|
|500| Alex|Chennai|  6|2018-12-19 13:52:44|2099-12-31|          Y|
|400| Jeff|Kolkata|  7|2018-12-19 13:52:44|2099-12-31|          Y|
+---+-----+-------+---+-------------------+----------+-----------+

#####2nd part - from hist/Tgt table, which needs to be flagged/update as Inactive(N)

hist_tgt_df_U = change_df.select("TGT_ID","TGT_NAME","TGT_CITY","TGT_SK","TGT_LOAD_DT").
withColumn("END_DT", lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))).withColumn("ACTIVE_FLAG",lit("N"))

+------+--------+--------+------+-----------+-------------------+-----------+
|TGT_ID|TGT_NAME|TGT_CITY|TGT_SK|TGT_LOAD_DT|             END_DT|ACTIVE_FLAG|
+------+--------+--------+------+-----------+-------------------+-----------+
|   400|    Jeff|  Mumbai|     4| 2018-09-25|2018-12-19 14:24:17|          N|
+------+--------+--------+------+-----------+-------------------+-----------+


#####3rd part - remaining records from His/TGT table 

hist_tgt_df_R = df.join(hist_tgt_df_U, (df.ID==hist_tgt_df_U.TGT_ID), "left_outer").
where(hist_tgt_df_U["TGT_ID"].isNull()).
select("ID","NAME","CITY","SK","LOAD_DT",df["END_DT"],df["ACTIVE_FLAG"])

+---+------+---------+---+----------+----------+-----------+
| ID|  NAME|     CITY| SK|   LOAD_DT|    END_DT|ACTIVE_FLAG|
+---+------+---------+---+----------+----------+-----------+
|100|Jhoney|Bengaluru|  1|2018-09-25|2099-12-31|          Y|
|200|Stuart| Hyderbad|  2|2018-09-25|2099-12-31|          Y|
|300| Moxie|     Pune|  3|2018-09-25|2099-12-31|          Y|
+---+------+---------+---+----------+----------+-----------+


####Final dataframe for loading

final_hist_tgt = delta_load_I.unionAll(hist_tgt_df_U).unionAll(hist_tgt_df_R)

+---+------+---------+---+-------------------+-------------------+-----------+
| ID|  NAME|     CITY| SK|            LOAD_DT|             END_DT|ACTIVE_FLAG|
+---+------+---------+---+-------------------+-------------------+-----------+
|600| Peter|    Delhi|  5|2018-12-19 13:52:44|         2099-12-31|          Y|
|500|  Alex|  Chennai|  6|2018-12-19 13:52:44|         2099-12-31|          Y|
|400|  Jeff|  Kolkata|  7|2018-12-19 13:52:44|         2099-12-31|          Y|
|400|  Jeff|   Mumbai|  4|         2018-09-25|2018-12-19 14:24:17|          N|
|100|Jhoney|Bengaluru|  1|         2018-09-25|         2099-12-31|          Y|
|200|Stuart| Hyderbad|  2|         2018-09-25|         2099-12-31|          Y|
|300| Moxie|     Pune|  3|         2018-09-25|         2099-12-31|          Y|
+---+------+---------+---+-------------------+-------------------+-----------+


##final_hist_tgt.select("SK","ID","NAME","CITY","LOAD_DT","END_DT","ACTIVE_FLAG").orderBy("SK").show()

final_hist_tgt.orderBy("SK").write.csv()

+---+------+---------+---+-------------------+-------------------+-----------+
| ID|  NAME|     CITY| SK|            LOAD_DT|             END_DT|ACTIVE_FLAG|
+---+------+---------+---+-------------------+-------------------+-----------+
|100|Jhoney|Bengaluru|  1|         2018-09-25|         2099-12-31|          Y|
|200|Stuart| Hyderbad|  2|         2018-09-25|         2099-12-31|          Y|
|300| Moxie|     Pune|  3|         2018-09-25|         2099-12-31|          Y|
|400|  Jeff|   Mumbai|  4|         2018-09-25|2018-12-19 14:24:17|          N|
|600| Peter|    Delhi|  5|2018-12-19 13:52:44|         2099-12-31|          Y|
|500|  Alex|  Chennai|  6|2018-12-19 13:52:44|         2099-12-31|          Y|
|400|  Jeff|  Kolkata|  7|2018-12-19 13:52:44|         2099-12-31|          Y|
+---+------+---------+---+-------------------+-------------------+-----------+




No comments:

Post a Comment

Pages