0

Our team has a set of data pipelines built as DAGs triggered on Composer (Airflow) that run Beam (Dataflow) jobs.

Across these dataflow pipelines, there are a set of common utilities engineers need to share which I have written. My understanding is that one way to achieve this is via a custom build image whose first line must contain:

FROM apache/beam_python3.10_sdk

So I loaded my dependencies into a simple Dockerfile, built the image, validated that I was able to access my built python package by running the image locally, and then specified a custom SDK when running my dataflow pipelines from airflow.

Dockerfile:

# First stage - Get the Beam SDK
FROM apache/beam_python3.10_sdk:2.60.0 AS beam_python

# Second stage - Build the final image
FROM python:3.10-slim

# Copy Apache Beam files from first stage
COPY --from=beam_python /opt/apache/beam /opt/apache/beam

# Set working directory
WORKDIR /app

# Copy and install your package
COPY docker_build/*.whl /app/
RUN pip install --no-cache-dir /app/*.whl apache-beam[gcp]==2.60.0

# Clean up
RUN rm -rf /app/*.whl

# Set the entrypoint to Apache Beam SDK launcher
ENTRYPOINT ["/opt/apache/beam/boot"]

Now, within a DAG you should be able to import this custom SDK image when using a Beam operator as such:

DAG.py

run_dataflow = DataflowCreatePythonJobOperator(
    task_id="run_dataflow",
    py_file=f"gs://{DATAFLOW_BUCKET}/engineer_1_dataflow_script.py",
    job_name="engineer_1_job",
    location=GCP_REGION,
    project_id=GCP_PROJECT_ID,
    gcp_conn_id="google_cloud_default",
    options={
        "staging_location": f"gs://{DATAFLOW_BUCKET}/staging",
        "temp_location": f"gs://{DATAFLOW_BUCKET}/temp",
        "project": GCP_PROJECT_ID,
        "region": GCP_REGION,
        "sdk_container_image": DATAFLOW_SDK_CONTAINER_IMAGE, <- The custom image we just built
        "wait_till_finished": True,
        "poll_sleep": 60,
    },
    dag=dag,
)

However, Airflow by default will attempt to parse the python file before it sends it to the Dataflow worker. The fact that engineer_1_dataflow_script.py contains a reference to the utility functions which the docker contains means by necessity that you will ONLY see these utilities if you are the Dataflow worker, because the custom image would need to have been loaded in.

Since Airflow will first attempt to parse this python file, it will fail to find these dependencies as we will fail before even attempting to execute the Dockerfile. The Dockerfile is only loaded in once the Airflow stage is satisfied that the python file is legit.

One workaround that does not work was supposedly to place the imports within the run function, the reasoning was that they would be resolved at runtime which is not true.

Another workaround that does work is placing the import statements within DoFns that you would declare, since they are only executed at runtime. This totally defeats the purpose of a utility file that is imported, since now my engineers will need to re-declare each utility in a DoFn.

Any ideas? I have also tried:

  1. Creating dataflow templates (currently stuck here)
  2. Creating a "launcher file" dataflow python file that references a larger file as I was told that it would be the only file that is parsed which turned out to not be true
  3. Using the DockerOperator instead of a Dataflow Operator which also did not work and isn't even recommended
  4. Creating a setup.py file which would be loaded in which is super poorly documented and I found close to no help online for that (could work tbh)
4
  • Have you tried adding "sdk_location": "container" in your options ? We put this in our pipelines to tell beam to use the container entrypoint, though it's not with Airflow but Dataflow directly.
    – unitrium
    Commented Jan 14 at 16:28
  • I will try that now Commented Jan 14 at 16:31
  • Nope still broken unfortunately Commented Jan 14 at 16:52
  • Is it possible to conditionally import the utility functions depending upon the runtime environment? Commented Jan 17 at 5:49

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.