0

Getting following errors after launching dataflow template - code execution starts and once pipeline graph generation starts, getting this 👇👇

"subprocess.CalledProcessError: Command '['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/tmp/tmpomr2b4n9/tmp_requirements.txt', '--exists-action', 'i', '--no-deps', '--implementation', 'cp', '--abi', 'cp38', '--platform', 'manylinux2014_x86_64']' returned non-zero exit status 1."}



Pip install failed for package: -r           "}
{"severity":"INFO","time":"2024/05/29 14:53:10.869696","line":"exec.go:66","message":" Output from execution of subprocess: b\"WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('\u003cpip._vendor.urllib3.connection.HTTPSConnection object at 0x7f5f4c7bde50\u003e: Failed to establish a new connection: [Errno 101] Network is unreachable')': /simple/google-cloud-storage/}
{"severity":"INFO","time":"2024/05/29 14:53:11.262883","line":"exec.go:52","message":"python failed with exit status 1"}
{"severity":"ERROR","time":"2024/05/29 14:53:11.263007","line":"launch.go:80","message":"Error: Template launch failed: exit status 1"}
  • Following is my requirements.txt

apache-beam[gcp] google-cloud-storage google-cloud-bigquery google-cloud-logging pandas pandas-gbq db-dtypes

  • Following is my metadata.json

    {
        "name": "Getting started Batch Pipeline",
        "description": "Batch Pipeline flex template for Python.",
        "parameters": [
          {
            "name": "output",
            "label": "Output destination",
            "helpText": "The path and filename prefix for writing output files. Example: gs://your-bucket/your-path",
            "regexes": [
              "^gs:\\/\\/[^\\n\\r]+$"
            ]
          }
        ]
    }
    
  • This is dummy batch_pipeline.py

    import apache_beam as beam
    from apache_beam.io.textio import WriteToText
    from apache_beam.options.pipeline_options import PipelineOptions
    
    
    def write_to_cloud_storage(argv=None):
        # Parse the pipeline options passed into the application.
        class MyOptions(PipelineOptions):
            @classmethod
            # Define a custom pipeline option that specfies the Cloud Storage bucket.
            def _add_argparse_args(cls, parser):
                parser.add_argument("--output", required=True)
    
        wordsList = ["1", "2", "3", "4"]
        options = MyOptions()
    
        with beam.Pipeline(options=options) as pipeline:
            (
                pipeline
                | "Create elements" >> beam.Create(wordsList)
                | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
            )
    
    
    if __name__ == "__main__":
        write_to_cloud_storage()
    

    Using following command I am building the flex template

       gcloud dataflow flex-template build gs://<BUCKET_NAME>/batch_pipeline-req-py.json 
       --image-gcr-path "europe-west1-docker.pkg.dev/<PROJECT_ID>/<BUCKET_NAME>/batch-pipeline-python:V1"    \
       --sdk-language "PYTHON"  \
       --flex-template-base-image "PYTHON3"  \
       --metadata-file "metadata.json"  \
       --py-path "."  \
       --env "FLEX_TEMPLATE_PYTHON_PY_FILE=batch_pipeline.py"  \
       --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt"
    
       - Following command is used to run the template on DataflowRunner
    
       gcloud dataflow flex-template run "batch-pipeline-flex" 
       --template-file-gcs-location "gs://<BUCKET_NAME>/batch_pipeline-req-py.json" \
       --service-account-email "service-dfl-ingest@<PROJECT_ID>.iam.gserviceaccount.com" \
       --subnetwork "https://www.googleapis.com/compute/v1/projects/<PROJECT_ID>/regions/europe-west1/subnetworks/<SUBNETWORK>"    \
       --parameters output="gs://<BUCKET_NAME>/output-" \
       --region "europe-west1" 
    
       >! Service account has following IAM roles
    

enter image description here

3
  • Hi @ShubhGurukul, Could you please follow the troubleshooting steps mentioned in this documentation and let me know if your issue is resolved or not. Commented May 30, 2024 at 9:23
  • Can your Dataflow worker can access the internet? Looks like it tries to call pip to install the depended packages.
    – XQ Hu
    Commented Jun 4, 2024 at 17:19
  • yes, the Dataflow worker has access to the internet. Commented Jun 7, 2024 at 9:00

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.