I am triggering a Dataflow pipeline using the BeamRunPythonPipelineOperator() in Airflow on Cloud Composer (composer-2.9.8-airflow-2.9.3). The job is submitted successfully to Dataflow, however the airflow task continues to run without logging any job status updates from Dataflow, and the task exits with INFO - Process exited with return code: 0
. I want to track the job status in Airflow so that I can trigger subsequent tasks based on the job status (e.g.JOB_STATE_DONE).
My operator is set up as follows:
start_dataflow_job = BeamRunPythonPipelineOperator(
task_id="start_dataflow_job",
runner="DataflowRunner",
py_file=GCS_FILE_LOCATION,
pipeline_options={
"tempLocation": GCS_BUCKET,
"stagingLocation": GCS_BUCKET,
"output_project": PROJECT,
"service_account_email": GCP_CUSTOM_SERVICE_ACCOUNT,
"requirements_file": "gs://GCS_CODE_BUCKET/requirements.txt",
"max_num_workers": "2",
"region": "us-east1",
"experiments": [
"streaming_boot_disk_size_gb=100",
"workerLogLevelOverrides=com.google.cloud.dataflow#DEBUG",
"dataflow_service_options=enable_prime"
],
},
py_options=[],
py_requirements=["apache-beam[gcp]~=2.60.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=PROJECT,
location="us-east1",
wait_until_finished=False,
gcp_conn_id="google_cloud_default",
),
do_xcom_push=True,
)
The logs are:
[2024-11-04, 04:31:42 UTC] {beam.py:151} INFO - Start waiting for Apache Beam process to complete.
[2024-11-04, 04:41:09 UTC] {beam.py:172} INFO - Process exited with return code: 0
[2024-11-04, 04:41:11 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2024-11-04, 04:41:11 UTC] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=test_data_pipeline, task_id=start_dataflow_job, run_id=manual__2024-11-04T04:24:59.026429+00:00, execution_date=20241104T042459, start_date=20241104T042502, end_date=20241104T044111
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:243} INFO - Task exited with return code 0
[2024-11-04, 04:41:12 UTC] {taskinstance.py:3506} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:222} ▲▲▲ Log group end
DataflowJobStatusSensor
which should check the job status after submission to confirm that the job has been completed with JOB_STATUS_DONE, and aDBTCloudRunJobOperator
. I cannot let theBeamRunPythonPipelineOperator
wait for completion because sometimes the Dataflow job takes 3 hours to finish, and the operator times out in 60 mins.