0

How does one effectively define a schema for all the workers in dataflow to have access to the schema defined .Below is section of my code failing since the schema name cannot be found.

I have deployed/run the script using the instructions here

 class BmsSchema(typing.NamedTuple):
    ident: int


beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)


class ParsePubSubMessage(beam.DoFn):
    def process(self, message):
        import json
        # Creating the main_dict that has all the columns
        all_columns = ['ident']
        main_dict = dict(zip(all_columns, [None] * len(all_columns)))
        # Parse the JSON message
        record = json.loads(message.decode('utf-8'))
        main_dict.update(record)

        yield {
            'ident': main_dict["ident"]
        }

def run():
    # Define pipeline options
    options = PipelineOptions(
        project='.....',
        runner='DataflowRunner',
        streaming=True,  # Enable streaming mode
        temp_location='gs://......temp',
        staging_location='gs://.....staging',
        region='europe-west1',
        job_name='.......dataflow-test'
    )

    # Set streaming mode
    options.view_as(StandardOptions).streaming = True

    # Pub/Sub subscription
    input_subscription = 'projects/dwingestion/subscriptions/flespi_data_streaming'

    table_schema = {
        "fields": [
            {"name": "ident", "type": "STRING", "mode": "NULLABLE"}
        ]
    }

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read from Pub/Sub and parse the messages
        messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    )

        # Convert the messages to df
        df = to_dataframe(messages)
        transformed_pcol = to_pcollection(df)
        # Write to BigQuery 

Below is the full trace of the error

Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 690, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.12/dist-packages/apache_beam/transforms/core.py", line 2084, in wrapper = lambda x: [fn(x)] ^^^^^ File "/home/coyugi/bms_test_schema.py", line 66, in except ImportError as exc:

NameError: name 'BmsSchema' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute response = task() ^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in lambda: self.create_worker().do_instruction(request), request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction return getattr(self, request_type)( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle bundle_processor.process_bundle(instruction_id)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 567, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1592, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1687, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1800, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1592, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1687, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1800, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1613, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 690, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.12/dist-packages/apache_beam/transforms/core.py", line 2084, in wrapper = lambda x: [fn(x)] ^^^^^ File "/home/coyugi/bms_test_schema.py", line 66, in except ImportError as exc:

NameError: name 'BmsSchema' is not defined [while running 'Attaching the schema-ptransform-67']

passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1191

3

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.