1

I am using pyspark structed streaming and reading data from Kafka topic which is in Json complex format.

I am using Spark Structred Streaming Format as Kafka and code as below -

spark = SparkSession.builder \
        .appName("PythonSparkStreamingKafka") \
        .getOrCreate()

kafkaStreamDF = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "main.test.mysql.main.test_bank_data") \
            .option("startingOffsets", "earliest") \
            .load()

kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")

message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))

consoleOutput = kafkaStreamDF2.writeStream \
                .outputMode("append") \
                .format("console") \
                .option("truncate", "false") \
                .start()

I have extracted the data from message till Payload part of kafka json message and its output on console like below -

|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

Now I would like to extract the data of after part and read the filed data in dataframe like below -

transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt|   balance_amt

20              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL12071 |  16/08/2020  |   129000.00      |    (null)      | 7320950.00

21              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL13071 |  16/08/2020  |   230013.00      |    (null)      | 7090937.00

Please suggest me how to achieve this expected output in a pyspark dataframe?

Added below the exact value feild of kafka message -

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"transaction_id"},{"type":"int64","optional":false,"field":"account_no"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string","optional":true,"field":"transaction_details"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"value_date"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"deposit_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"balance_amt"}],"optional":true,"name":"main.test.mysql.main.test_bank_data.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"transaction_id"},{"type":"int64","optional":false,"field":"account_no"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string","optional":true,"field":"transaction_details"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"value_date"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"deposit_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"balance_amt"}],"optional":true,"name":"main.test.mysql.main.test_bank_data.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"main.test.mysql.main.test_bank_data.Envelope"},"payload":{"before":null,"after":{"transaction_id":146,"account_no":409000611076,"transaction_date":18652,"transaction_details":"TRF FROM Indiaforensic SERVICES","value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":1611587463000,"snapshot":"false","db":"main","table":"test_bank_data","server_id":19105,"gtid":null,"file":"binlog.000584","pos":46195052,"row":0,"thread":1604558,"query":null},"op":"c","ts_ms":1611587463181,"transaction":null}}

From here have i have converted into string on DF1 and taken the part of Payload into DF2.

-- Final working condition comments -- Added after transform the SMT in Debezium MySQL connector in Kafka connect side I am getting the message value in PySpark structred streaming with Kafaka as below -

Value = 
{"transaction_id":21,"account_no":409000611074,"transaction_date":"2020-08- 
229","transaction_details":"INDO GIBL Indiaforensic STL13071 
","value_date":"2020-08-22","withdrawal_amt":"230013.00","deposit_amt":null,"balance_amt":"7090937.00"}

message_schema = StructType([
StructField('transaction_id', IntegerType(), True),
StructField('account_no', LongType(), True),
StructField('transaction_date', StringType(), True),
StructField('transaction_details', StringType(), True),
StructField('value_date', StringType(), True),
StructField('withdrawal_amt', StringType(), True),
StructField('deposit_amt', StringType(), True),
StructField('balance_amt', StringType(), True)   
]
)

1 Answer 1

1

You can pass the schema of the string JSON messages to the from_json function.

Having your messages like this:

#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

You can modify your code to parse the after field in your json into a MapType then you select the keys you want as columns:

message_schema = StructType([
     StructField('before', MapType(StringType(), StringType(), True), True),
     StructField('after', MapType(StringType(), StringType(), True), True),
     StructField('source', MapType(StringType(), StringType(), True), True),
     StructField('op', StringType(), True),
     StructField('ts_ms', StringType(), True),
     StructField('transaction', StringType(), True)
     ]
)

after_fields = [
    "account_no", "balance_amt", "deposit_amt", "transaction_date",
    "transaction_details", "transaction_id", "value_date", "withdrawal_amt"
]

# parse json strings using from_json and select message.after.*
 kafkaStreamDF.withColumn(
     "message",
     F.from_json(F.col("value"), message_schema)
 ).select(
     *[F.col("message.after").getItem(f).alias(f) for f in after_fields]
 ).writeStream \
  .outputMode("append") \
  .format("console") \
  .option("truncate", "false") \
  .start() \
  .awaitTermination()   
Sign up to request clarification or add additional context in comments.

8 Comments

Hi @blacbishop Thanks for your reply.But when I am trying to use this logic I am getting error that - File "/home/ubuntu/python-scripts/pyspark-sql-test.py", line 24, in <module> json_message_rdd = kafkaStreamDF.select("value").limit(1).rdd.map(lambda row: row.value) pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;kafka
@Nicolas Can you try with the updated answer?
@blackbishop looks like OP is just copy-pasting code, soyou may already add awaitTermination as well ;-)
@mike thank you for pointing it out. I'm not really using spark streaming but answered it as it's pure SQL transformations :)
@blackbishop : I have chcked the schema and tried some logic on debezium connector side to drop unwanted data from debezium connector and just to produce the after part of message into kafka.after then I tried with your provided logic and created new message_schema as below and its working fine and showing the data as expacted in dataframe.Thank a lot for your help and suggestion :)
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.