3

I'm trying to create a streaming pipeline with Dataflow that reads messages from a PubSub topic to end up writing them on a BigQuery table. I don't want to use any Dataflow template.

For the moment I just want to create a pipeline in a Python3 script executed from a Google VM Instance to carry out a loading and transformation process of every message that arrives from Pubsub (parsing the records that it contains and adding a new field) to end up writing the results on a BigQuery table.

Simplifying, my code would be:

#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1, 
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta

def load_pubsub(message):
    try:
        data = json.loads(message)
        records = data["messages"]
        return records
    except:
        raise ImportError("Something went wrong reading data from the Pub/Sub topic")

class ParseTransformPubSub(beam.DoFn):
    def __init__(self):
        self.water_mark = (datetime.now() + timedelta(hours = 1)).strftime("%Y-%m-%d %H:%M:%S.%f")
    def process(self, records):
        for record in records:
            record["E"] = self.water_mark 
            yield record

def main():
    table_schema = apache_beam.io.gcp.bigquery.parse_table_schema_from_json(open("TableSchema.json"))
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_topic')
    parser.add_argument('--output_table')
    known_args, pipeline_args = parser.parse_known_args(sys.argv)
    with beam.Pipeline(argv = pipeline_args) as p:
        pipe = ( p | 'ReadDataFromPubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
                   | 'LoadJSON' >> beam.Map(load_pubsub)
                   | 'ParseTransform' >> beam.ParDo(ParseTransformPubSub())
                   | 'WriteToAvailabilityTable' >> beam.io.WriteToBigQuery(
                                      table = known_args.output_table,
                                      schema = table_schema,
                                      create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                      write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
                )   
        result = p.run()
        result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

(For example) The messages published in the PubSub topic use to come as follows:

'{"messages":[{"A":"Alpha", "B":"V1", "C":3, "D":12},{"A":"Alpha", "B":"V1", "C":5, "D":14},{"A":"Alpha", "B":"V1", "C":3, "D":22}]}'

If the field "E" is added in the record, then the structure of the record (dictionary in Python) and the data type of the fields is what the BigQuery table expects.

The problems that a I want to handle are:

  1. If some messages come with an unexpected structure I want to fork the pipeline flatten and write them in another BigQuery table.

  2. If some messages come with an unexpected data type of a field, then in the last level of the pipeline when they should be written in the table an error will occur. I want to manage this type of error by diverting the record to a third table.

I read the documentation found on the following pages but I found nothing: https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline https://cloud.google.com/dataflow/docs/guides/common-errors

By the way, if I choose the option to configure the pipeline through the template that reads from a PubSubSubscription and writes into BigQuery I get the following schema which turns out to be the same one I'm looking for:

Template: Cloud Pub/Sub Subscription to BigQuery

2 Answers 2

7

You can't catch the errors that occur in the sink to BigQuery. The message that you write into bigquery must be good.

The best pattern is to perform a transform which checks your messages structure and fields type. In case of error, you create an error flow and you write this issue flow in a file (for example, or in a table without schema, you write in plain text your message)

2
  • Thanks a lot Guillaume! It's just what I imagined. Do you know any web or repository where I can find an example of a Python checking application in Dataflow?
    – JPMC
    Commented Nov 15, 2019 at 10:37
  • 7
    The Beam programming guide provide a full example of what you can do. In a DoFn function, like ProcessWords, perform the checks that you want to be sure that the flow is correct. For all error found, perform this yield pvalue.TaggedOutput('error_value', element). By applying your ParDo, you get 2 PCollection in output: Correct flow and error flow. Then apply the sink that you want on each of this PCollection. Commented Nov 15, 2019 at 10:54
0

We do the following when errors occur at the BigQuery sink.

  • send a message (without stacktrace) to GCP Error Reporting, for developers to be notified
  • log the error to StackDriver
  • stop the pipeline execution (the best place for messages to wait until a developer has fixed the issue, is the incomming pubSub subscription)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.