Getting following errors after launching dataflow template - code execution starts and once pipeline graph generation starts, getting this 👇👇
"subprocess.CalledProcessError: Command '['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/tmp/tmpomr2b4n9/tmp_requirements.txt', '--exists-action', 'i', '--no-deps', '--implementation', 'cp', '--abi', 'cp38', '--platform', 'manylinux2014_x86_64']' returned non-zero exit status 1."}
Pip install failed for package: -r "}
{"severity":"INFO","time":"2024/05/29 14:53:10.869696","line":"exec.go:66","message":" Output from execution of subprocess: b\"WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('\u003cpip._vendor.urllib3.connection.HTTPSConnection object at 0x7f5f4c7bde50\u003e: Failed to establish a new connection: [Errno 101] Network is unreachable')': /simple/google-cloud-storage/}
{"severity":"INFO","time":"2024/05/29 14:53:11.262883","line":"exec.go:52","message":"python failed with exit status 1"}
{"severity":"ERROR","time":"2024/05/29 14:53:11.263007","line":"launch.go:80","message":"Error: Template launch failed: exit status 1"}
- Following is my requirements.txt
apache-beam[gcp] google-cloud-storage google-cloud-bigquery google-cloud-logging pandas pandas-gbq db-dtypes
Following is my metadata.json
{ "name": "Getting started Batch Pipeline", "description": "Batch Pipeline flex template for Python.", "parameters": [ { "name": "output", "label": "Output destination", "helpText": "The path and filename prefix for writing output files. Example: gs://your-bucket/your-path", "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ] } ] }
This is dummy batch_pipeline.py
import apache_beam as beam from apache_beam.io.textio import WriteToText from apache_beam.options.pipeline_options import PipelineOptions def write_to_cloud_storage(argv=None): # Parse the pipeline options passed into the application. class MyOptions(PipelineOptions): @classmethod # Define a custom pipeline option that specfies the Cloud Storage bucket. def _add_argparse_args(cls, parser): parser.add_argument("--output", required=True) wordsList = ["1", "2", "3", "4"] options = MyOptions() with beam.Pipeline(options=options) as pipeline: ( pipeline | "Create elements" >> beam.Create(wordsList) | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt") ) if __name__ == "__main__": write_to_cloud_storage()
Using following command I am building the flex template
gcloud dataflow flex-template build gs://<BUCKET_NAME>/batch_pipeline-req-py.json --image-gcr-path "europe-west1-docker.pkg.dev/<PROJECT_ID>/<BUCKET_NAME>/batch-pipeline-python:V1" \ --sdk-language "PYTHON" \ --flex-template-base-image "PYTHON3" \ --metadata-file "metadata.json" \ --py-path "." \ --env "FLEX_TEMPLATE_PYTHON_PY_FILE=batch_pipeline.py" \ --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" - Following command is used to run the template on DataflowRunner gcloud dataflow flex-template run "batch-pipeline-flex" --template-file-gcs-location "gs://<BUCKET_NAME>/batch_pipeline-req-py.json" \ --service-account-email "service-dfl-ingest@<PROJECT_ID>.iam.gserviceaccount.com" \ --subnetwork "https://www.googleapis.com/compute/v1/projects/<PROJECT_ID>/regions/europe-west1/subnetworks/<SUBNETWORK>" \ --parameters output="gs://<BUCKET_NAME>/output-" \ --region "europe-west1" >! Service account has following IAM roles