1

I am triggering a Dataflow pipeline using the BeamRunPythonPipelineOperator() in Airflow on Cloud Composer (composer-2.9.8-airflow-2.9.3). The job is submitted successfully to Dataflow, however the airflow task continues to run without logging any job status updates from Dataflow, and the task exits with INFO - Process exited with return code: 0. I want to track the job status in Airflow so that I can trigger subsequent tasks based on the job status (e.g.JOB_STATE_DONE).

My operator is set up as follows:

start_dataflow_job = BeamRunPythonPipelineOperator(
        task_id="start_dataflow_job",
        runner="DataflowRunner",
        py_file=GCS_FILE_LOCATION,
        pipeline_options={
            "tempLocation": GCS_BUCKET,
            "stagingLocation": GCS_BUCKET,
            "output_project": PROJECT,
            "service_account_email": GCP_CUSTOM_SERVICE_ACCOUNT, 
            "requirements_file": "gs://GCS_CODE_BUCKET/requirements.txt", 
            "max_num_workers": "2", 
            "region": "us-east1", 
             "experiments": [
                "streaming_boot_disk_size_gb=100", 
                "workerLogLevelOverrides=com.google.cloud.dataflow#DEBUG", 
                "dataflow_service_options=enable_prime"
            ],
        },
        py_options=[],
        py_requirements=["apache-beam[gcp]~=2.60.0"],
        py_interpreter="python3",
        py_system_site_packages=False,
        dataflow_config=DataflowConfiguration(
            job_name="{{task.task_id}}",
            project_id=PROJECT,
            location="us-east1",
            wait_until_finished=False,
            gcp_conn_id="google_cloud_default",
        ),
        do_xcom_push=True,
    )

The logs are:

[2024-11-04, 04:31:42 UTC] {beam.py:151} INFO - Start waiting for Apache Beam process to complete.
[2024-11-04, 04:41:09 UTC] {beam.py:172} INFO - Process exited with return code: 0
[2024-11-04, 04:41:11 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2024-11-04, 04:41:11 UTC] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=test_data_pipeline, task_id=start_dataflow_job, run_id=manual__2024-11-04T04:24:59.026429+00:00, execution_date=20241104T042459, start_date=20241104T042502, end_date=20241104T044111
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:243} INFO - Task exited with return code 0
[2024-11-04, 04:41:12 UTC] {taskinstance.py:3506} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:222} ▲▲▲ Log group end
2
  • What is the kind of task, you want to execute after the termination of the Dataflow job ? When the Airflow task and Dataflow job is finished, Airflow executes the following task and operator. Commented Nov 4, 2024 at 21:30
  • @MazlumTosun I have a DataflowJobStatusSensor which should check the job status after submission to confirm that the job has been completed with JOB_STATUS_DONE, and a DBTCloudRunJobOperator. I cannot let the BeamRunPythonPipelineOperator wait for completion because sometimes the Dataflow job takes 3 hours to finish, and the operator times out in 60 mins.
    – mgore
    Commented Nov 4, 2024 at 22:43

1 Answer 1

1

Airflow uses standard the Python logging framework to write logs, and for the duration of a task, the root logger is configured to write to the task’s log. So to track the Dataflow pipeline's progress in Airflow, the logging level in your Dataflow pipeline needs to be set to INFO, I had set it to ERROR originally. Once I updated the logging level, the operator was able to submit the job and obtain the dataflow_job_id in XCOM, marking itself as success shortly after, and the sensor followed up and tracked the job status to completion.

logging.getLogger().setLevel(logging.INFO)

Read more here: Writing to Airflow task logs from your code

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.