2

I have 1000s of files with data in the below format:

a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23

And I want to read it and convert it to a dataframe as below:

clm1|clm2|clm3|clm4|clm5|clm6|clm7
a|b|c|1|3|null|null
a|b|c|9|null|60|23

I have tried the below method:

files = [f for f in glob.glob(pathToFile + "/**/*.txt.gz", recursive=True)]
df = spark.read.load(files, format='csv', sep = '|', header=None)

But it is giving me below result:

clm1, clm2, clm3, clm4, clm5
a, b, c, 1, 3
a, b, c, 9, null
2

1 Answer 1

1

For Spark 2.4+, you can read the files as a single column then split it by |. You'll get an array column that you could transform using higher-order functions:

df.show(truncate=False)

+----------------------------+
|clm                         |
+----------------------------+
|a|b|c|clm4=1|clm5=3         |
|a|b|c|clm4=9|clm6=60|clm7=23|
+----------------------------+

We use transform function to convert the array of string that we get from splitting the clm column into an array of structs. Each struct contains column name if present (check if a string contains =) or name it clm + (i+1) where i is its position.

transform_expr = """
transform(split(clm, '[|]'), (x, i) -> 
                   struct(
                         IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)), 
                         substring_index(x, '=', -1)
                         )
        )
"""

Now use map_from_entries to convert the array to map. And finally, explode the map and pivot to get your columns

df.select("clm", 
          explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")
         ) \
  .groupby("clm").pivot('col_name').agg(first('col_value')) \
  .drop("clm") \
  .show(truncate=False)

Gives:

+----+----+----+----+----+----+----+
|clm1|clm2|clm3|clm4|clm5|clm6|clm7|
+----+----+----+----+----+----+----+
|a   |b   |c   |9   |null|60  |23  |
|a   |b   |c   |1   |3   |null|null|
+----+----+----+----+----+----+----+
1
  • Thanks for the help. Is there any way I can put a condition in above code to select only those columns that are present in an existing list of column names?
    – user0204
    Commented Jan 29, 2020 at 3:00

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.