How does one effectively define a schema for all the workers in dataflow to have access to the schema defined .Below is section of my code failing since the schema name cannot be found.
I have deployed/run the script using the instructions here
class BmsSchema(typing.NamedTuple):
ident: int
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
# Creating the main_dict that has all the columns
all_columns = ['ident']
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
# Parse the JSON message
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
'ident': main_dict["ident"]
}
def run():
# Define pipeline options
options = PipelineOptions(
project='.....',
runner='DataflowRunner',
streaming=True, # Enable streaming mode
temp_location='gs://......temp',
staging_location='gs://.....staging',
region='europe-west1',
job_name='.......dataflow-test'
)
# Set streaming mode
options.view_as(StandardOptions).streaming = True
# Pub/Sub subscription
input_subscription = 'projects/dwingestion/subscriptions/flespi_data_streaming'
table_schema = {
"fields": [
{"name": "ident", "type": "STRING", "mode": "NULLABLE"}
]
}
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Read from Pub/Sub and parse the messages
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to df
df = to_dataframe(messages)
transformed_pcol = to_pcollection(df)
# Write to BigQuery
Below is the full trace of the error
Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 690, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.12/dist-packages/apache_beam/transforms/core.py", line 2084, in wrapper = lambda x: [fn(x)] ^^^^^ File "/home/coyugi/bms_test_schema.py", line 66, in except ImportError as exc:
NameError: name 'BmsSchema' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute response = task() ^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in lambda: self.create_worker().do_instruction(request), request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction return getattr(self, request_type)( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle bundle_processor.process_bundle(instruction_id)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 567, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1592, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1687, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1800, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1592, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1687, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1800, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1613, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 690, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.12/dist-packages/apache_beam/transforms/core.py", line 2084, in wrapper = lambda x: [fn(x)] ^^^^^ File "/home/coyugi/bms_test_schema.py", line 66, in except ImportError as exc:
NameError: name 'BmsSchema' is not defined [while running 'Attaching the schema-ptransform-67']
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1191
--save_main_session =True
? beam.apache.org/documentation/sdks/python-pipeline-dependencies/…