1

I am trying to run below program using Apache Beam on GCP Dataflow. The program should read CSV file, do some transformation like sum, max and join. Then write to BQ table. Till step 4 I am getting expected result. But at step 5 the job is failing to write to BigQuery with below error.

Please help to solve it.

Data:

enter image description here

Error:

TypeError: 'tuple' object does not support item assignment [while running 'Write to BQ/StorageWriteToBigQuery/Convert dict to Beam Row/Convert dict to Beam Row']

Code:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
from pprint import pprint

parser = argparse.ArgumentParser()
parser.add_argument("--in_file", dest="in_file", required=True, help="input GCS file")
parser.add_argument("--out_table", dest="out_table", required=True, help="output BQ Table")
parser.add_argument("--out_ds", dest="out_ds", required=True, help="output BQ DS")
path_args, pipeline_args = parser.parse_known_args()

input_file = path_args.in_file
output_table = path_args.out_table
output_ds = path_args.out_ds
output_schema = {
    "fields": [
        {"name": "team", "type": "string", "mode": "NULLABLE"},
        {"name": "total_goals", "type": "integer", "mode": "NULLABLE"},
        {"name": "key_player", "type": "string", "mode": "NULLABLE"},
        {"name": "player_goals", "type": "integer", "mode": "NULLABLE"},
    ]
}

pipeline_options = PipelineOptions(pipeline_args)

with beam.Pipeline(options=pipeline_options) as pipeline:
    # step 1
    input_stats = (
        pipeline
        | "Read File" >> beam.io.ReadFromText(file_pattern=input_file, skip_header_lines=1)
        | "Split Line" >> beam.Map(lambda line: line.split(","))
    )

    # step 2
    country_tot_goals = (
        input_stats
        | "Country Goals Key Value Pair" >> beam.Map(lambda rec: (rec[1], int(rec[2])))
        | "CombinePerKey Country Total Goals" >> beam.CombinePerKey(sum)
    )

    # step 3
    country_max_goals = (
        input_stats
        | "Country Player Goals Key Value Pairs" >> beam.Map(lambda rec: (rec[1], [rec[0], rec[3]]))
        | "CombinePerKey Country Key Goals" >> beam.GroupByKey()
        | "Get key player with max goals" >> beam.Map(lambda cnt_ply: (cnt_ply[0], max(cnt_ply[1], key=lambda ply: int(ply[1]))))
    )

    # step 4
    def join_lists(items):
        team, data = items
        total_goals = data['country_data'][0]
        player_list = data['player_data'][0]
        key_player = player_list[0]
        player_goals = int(player_list[1])
        return team, total_goals, key_player, player_goals

    team_out = (
        {"country_data": country_tot_goals, "player_data": country_max_goals}
        | "Join based on Key" >> beam.CoGroupByKey()
        | "Map on Key" >> beam.Map(join_lists)
        # | "Print Team Out" >> beam.Map(pprint)
    )

    """
    ('Argentina', 1100, 'Messi', 250)
    ('India', 300, 'Chhetri', 200)
    ('Portugal', 500, 'Ronaldo', 200)
    ('Brazil', 1000, 'Pele', 220)
    """

    # step 5
    write_out = (
        team_out
        | "Write to BQ" >> beam.io.WriteToBigQuery(table=output_table, dataset=output_ds, schema=output_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
    )
2
  • I'll make a guess ... your return from join_lists is a Python Tuple ... i.e. an object which is an ordinal collection of values: ('India', 300, 'Chhetri', 200) Maybe you need to return a dictionary ... instead of return team, total_goals etc etc ... try and return { "team": team, "total_goals": total_goals ...}
    – Kolban
    Commented Oct 24, 2024 at 21:39
  • Hi @Kolban - Thanks for your reply. Your guess is right. I am looking for a better approach using which 2 PColls can be joined based on keys and then the result can be loaded to Bigquery. The code I could think of, was given above which generates the result in Python Tuple but that can't be written to BQ. Is there any better way ? Commented Oct 25, 2024 at 8:49

1 Answer 1

2

The input format of your data is not what WriteToBigQuery is expecting. Quoting from the documentation:

This transform receives a PCollection of elements to be inserted into BigQuery tables. The elements would come in as Python dictionaries, or as TableRow instances.

while you are providing a bunch of tuples. You need to modify your join_lists method to something like this

def join_lists(items):
    team, data = items
    total_goals = data['country_data'][0]
    player_list = data['player_data'][0]
    key_player = player_list[0]
    player_goals = int(player_list[1])

    return {
      "team": team,
      "total_goals": total_goals,
      "key_player": key_player,
      "player_goals": player_goals
    }

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.