0

I have written the apache beam python code which reads the data from pubsub and inserts into DB.When I create the template file and submit the dataflow job manually it is working fine but while doing it through composer template file is getting created but dataflow job is not getting submitted.

Cloud composer in Project A and I want to submit the dataflow job in Project B. Thats where I am using the gcp_conn_id in the dag file.

I have created the below mentioned DAG file and tried checking the things in the Airflow UI. I am able to create the template file but dataflow job is not getting submitted. Task is getting the completed with out any error. Please log

Running command: /tmp/apache-beam-venvvxk4qxz7/bin/python /home/airflow/gcs/data/SessPubSubDataFlow.py --runner=DataflowRunner --job_name=start-python-jobdf1-84a0645c --project=abc-temp --region=us-east4 --labels=airflow-version=v2-2-5-composer --template_location=gs://abc-tempstreamsdataflow/Redislookup.txt --temp_location="gs://abc-tempstreamsdataflow/Redislookup.txt"    
[2022-11-08, 09:44:02 UTC] {beam.py:131} INFO - Start waiting for Apache Beam process to complete.
[2022-11-08, 09:44:12 UTC] {beam.py:127} INFO - Hello World
[2022-11-08, 09:44:18 UTC] {beam.py:127} WARNING - WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
[2022-11-08, 09:44:18 UTC] {beam.py:150} INFO - Process exited with return code: 0
[2022-11-08, 09:44:19 UTC] {dataflow.py:425} INFO - Start waiting for done.
[2022-11-08, 09:44:19 UTC] {dataflow.py:390} INFO - Google Cloud DataFlow job not available yet..
[2022-11-08, 09:44:19 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=DataFlowPythonJob, task_id=start-python-jobdf1, execution_date=20221108T094330, start_date=20221108T094333, end_date=20221108T094419
[2022-11-08, 09:44:20 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-11-08, 09:44:20 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
import datetime
from airflow import models
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

default_args = {"start_date": days_ago(1),
    'retries':0,
    'project':'abc-temp'
}

with models.DAG(
    dag_id="DataFlowPythonJob",
    start_date=days_ago(1),
    default_args=default_args,
    schedule_interval="@once"
) as dag:start_python_job = BeamRunPythonPipelineOperator(task_id="start-python-jobdf1",   runner="DataflowRunner",                                py_file="/home/airflow/gcs/data/SessPubSubDataFlow.py",py_options=[],pipeline_options={'template_location':"gs://abc-tempstreamsdataflow/Redislookup.txt",             'temp_location':"gs://abc-tempstreamsdataflow/",'project':"abc-temp"},py_requirements=['apache-beam[gcp]==2.37.0',py_interpreter='python3',py_system_site_packages=False,dataflow_config={'location': 'us-east4','project_id':'abc-temp','gcp_conn_id':'0-    app',"wait_until_finished": False,'job_name':'{{task.task_id}}'},)

Is there any other operator Do I need to use to submit the data flow job.


How to submit the dataflow job from composer.
1
  • You DAG appears in the Airflow UI ? Commented Nov 9, 2022 at 14:03

1 Answer 1

2

For a Dataflow streaming job, the DAG will instantiate the Dataflow job but you have to trigger the DAG in order to run the Dataflow job once.

You used the correct and up-to-date operator to create and launch a Dataflow job with Beam/Dataflow Python.

To execute a Dataflow template with Airflow, you have to use the following operator :

start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start-template-job",
    template='gs://dataflow-templates/latest/Word_Count',
    parameters={'inputFile': "gs://dataflow-samples/shakespeare/kinglear.txt", 'output': GCS_OUTPUT},
    location='europe-west3',
)
10
  • I can see the DAG name in the airflow UI. DAG also triggered and I triggered manually also. DAG is getting executed and template file getting created but dataflow job is not getting submitted. Commented Nov 10, 2022 at 4:51
  • In your DAG, you see your Beam/Dataflow task as expected ? you can check the rendered tab of a task. Commented Nov 10, 2022 at 10:18
  • Yes I can see the task name as mentioned in the DAG file start-python-jobdf1. In the logs I can see below statement too. INFO - Marking task as SUCCESS. dag_id=DataFlowPythonJob, task_id=start-python-jobdf1,. In short it is creating template file but not submitting the dataflow job. Commented Nov 10, 2022 at 10:53
  • Ok thanks, normally you can also see the Python command line executed by the operator in the log tab of your task, may you check it please ? Commented Nov 10, 2022 at 11:02
  • Yes I can see the python command which creates the template file " /tmp/apache-beam-venvvxk4qxz7/bin/python /home/airflow/gcs/data/SessPubSubDataFlow.py --runner=DataflowRunner --job_name=start-python-jobdf1-84a0645c --project=abc-temp --region=us-east4 --labels=airflow-version=v2-2-5-composer --template_location=gs://abc-tempstreamsdataflow/Redislookup.txt --temp_location="gs://abc-tempstreamsdataflow/Redislookup.txt" . I can see the template file also in the gcs location. but i dont see any dataflow command in the logs. Commented Nov 10, 2022 at 11:09

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.