0

We have requirements to load the JSON array into azure storage from databricks. after that the stored JSON array to read and write back to azure sql DB from ADF.

below my sample JSON produced by ADB, then how to convert into dataframe and write back to storage.

[{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

above JSON needs to load proper format into storage like parquet or delta ..etc

then we have to read this data from ADF to load into SQL DB

sample structure and expected table details .

adf_log_id | adf_id | e_name | e_desc         | status  | failure_msg
----------------------------------------------------------------------
1          |  1     | pipenam| {input and output details JSON} | success | Failure
1
  • what will be ADF log id ,adf id,ename from above json and ststus messgae
    – Pratik Lad
    Commented Sep 15, 2023 at 4:37

1 Answer 1

0

To convert Json into dataframe and write back to storage account with parquet or delta format.

To convert Json into dataframe you need to create and give Json schema explicitly.

from pyspark.sql.types import *
schema = StructType([
    StructField("Details", StructType([
        StructField("Input", StringType(), nullable=True),
        StructField("Output", StringType(), nullable=True)
    ])),
    StructField("Failure", StringType(), nullable=True),
    StructField("s", StringType(), nullable=True)
])

newJson = [{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

df=spark.createDataFrame(data=newJson,schema=schema)
df.show(truncate=False)

Output: enter image description here

to load this dataframe in proper format into storage like parquet use below code:

storage_account_name = "xxxxxx"
storage_account_access_key = "access key"
spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net",storage_account_access_key)
output_path = "wasbs://[email protected]/dataframe.parquet"

# Write the DataFrame to Azure Storage as Parquet
df.write.parquet(output_path, mode="overwrite")

After this read the parquet file from storage account and write it into sql database using ADF.

3
  • Hi, I have tried to write in storage, after that i am trying to read ADF copy activity to insert on my sql db but i am getting error like Parquet file contained column, which is of a non-primitive Commented Sep 15, 2023 at 14:24
  • can you please advice Commented Sep 15, 2023 at 14:24
  • The details column is taking the charArray data type which we are conertin while schema, which is non-primitive and not supported for Parquet files. So, you can rebuild your Parquet file with primitive data types
    – Pratik Lad
    Commented Sep 27, 2023 at 10:12

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.