I have written a function that takes two pyspark dataframes and creates a diff in line. I am struggling to get it to scale with 100s of columns. I get around the for loop calling .join() hundreds of times. I am stuck, or maybe tired and looking for some suggestions!
from pyspark.sql import functions as sf
def diff_generate(df_one, df_two, pk):
# create df to join compared columns with
diff_df = df_one.select(pk)
# ensure the PK is not in the schema to iterate needed cols
if pk in df_one.schema.names:
df_one.schema.names.remove(pk)
for col in df_one.schema.names:
# ensure that col exists in df2
if col in df_two.schema.names:
df_two_ = df_two.select(pk, col)\
.withColumnRenamed(col, "{}_compare".format(col))
df = df_one.select(pk, col)\
.subtract(df_two_)\
.join(
df_two_,
pk,
"left_outer"
)
# concat df columns to display comparison
concat_df = df\
.withColumn("{}_diff".format(col),
sf.concat(
sf.col(col),
sf.lit("_compare:_"),
sf.col("{}_compare".format(col))))\
.select(pk, "{}_diff".format(col))
# join column 'diff' with initialized df
diff_df = diff_df.join(concat_df, pk, "left_outer")
return diff_df
Sample result:
98723498,match,N_compare:_null,match,match,match,2018-05-15 18:37_compare:_2018-05-15 18:37:12,match,match