1

Imagine a simple Google Dataflow Pipeline. In this pipeline you read from BQ using apache beam function and depending of the returned pcollection you have to update those rows

Journeys = (p
                    | 'Read from BQ' >> beam.io.Read(
                    beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))

Update = ( Journeys
                   | 'Updating Journey Table' >> beam.Map(UpdateBQ))

Write = (Journeys
                    | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))

The problem of this pipeline is that UpdateBQ is executed for each item in the returned pcollection when you read the table (beam.Map)


Which could be the better way to perform an update into a BigQuery table?

I suppose this could be done without using beam.Map and execute only and update which process all input pcolletion at once.


Extra

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD = "#standardSQL"
    QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    <...>

Possible solution

with beam.Pipeline(options=options) as p:
    Journeys = (p
                | 'Read from BQ' >> beam.io.Read(
                beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
                )

    Write = (Journeys
                | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


UpdateBQ();

1 Answer 1

2

Are you doing any further transformation using the beam pipeline after reading from BQ? Or is it just the way you showed in the code i.e. read from BQ and then fire update command in BQ? In that case, you don't need beam at all. Just use BQ query to update data in a table using another table. BQ best practices suggest to avoid single row insert/update at a time.

2
  • After reading from BQ exits other transformation: WriteToBigQuery. So while this is executing I want to update other table. How could be done in this pipeline the step "updating table" without beam.Map?Thanks!
    – IoT user
    Commented Dec 5, 2018 at 11:42
  • From your case looks like all the transofrmations, read,write happening in BQ. You may not need beam for this case which can work faster for you. Just create a table from the query which creates the initial Journeys dataset. Then submit two async job using the same table to update the 2nd table and write the result to another table. Maybe I am missing something here. But based on the scenario you mentioned, using dataflow seems overkill to me. Commented Dec 5, 2018 at 20:25

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.