0

I am trying to use Cloud Function (Create/Finalize) trigger for GCS Bucket to start a Data flow pipeline. I am trying to figure out how to give the csv file path in GCS Bucket to custom dataflow pipeline when triggered.

Please let me know if you came across similar issue and what's your solution in python?

Thanks.

2
  • Please provide enough code so others can better understand or reproduce the problem.
    – Community Bot
    Commented Mar 31, 2022 at 23:32
  • Your query is an important one. But, Please do elaborate your question with examples. It helps to answer much faster as the problem at hand is easy to understand. Commented Apr 1, 2022 at 14:12

1 Answer 1

0

You need to create flex template for dataflow. https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates In this, you can pass parameters dynamically to the dataflow at runtime. Once done, add dataflow trigger code in your cloud function similar to this:

def startDataflow(project, flex_template_path, jobName, bq_dataset, raw_table, prc_table, start_date):

# Defining JSON request for trigger flex DF Job

parameters = {
    "gcp_project": project,
    "bq_dataset": bq_dataset,
    "raw_table": raw_table,
    "prc_table": prc_table,
    "start_date": start_date
}
environment = {
    "stagingLocation": DATAFLOW_STAGING_LOCATION,
    "additionalExperiments": DATAFLOW_ADDITIONAL_EXPERIMENTS,
    "maxWorkers": DATAFLOW_MAX_WORKER_COUNT,
    "machineType": DATAFLOW_MACHINE_TYPE,
    "serviceAccountEmail": DATAFLOW_SERVICE_ACCOUNT_EMAIL,
    "network": DATAFLOW_NETWORK,
    "subnetwork": DATAFLOW_SUBNETWORK,
    "ipConfiguration": DATAFLOW_IP_CONFIGURATION
}
body = {
    "launchParameter": {
        "jobName": jobName,
        "parameters": parameters,
        "environment": environment,
        "containerSpecGcsPath": flex_template_path,
    }
}
service = build("dataflow", "v1b3", cache_discovery=False)
# Creating request to trigger the Flex DF Pipeline
request = (
    service.projects().locations().flexTemplates().launch(
        projectId=DATAFLOW_RUN_LOCATION,
        location=DATAFLOW_RUN_PROJECT_REGION,
        body=body
    )
)
try:
    response = request.execute()
except Exception as e:
    logging.exception(
        "There was an exception while triggering the dataflow pipeline with the job name: {}. The exception is: {}".format(
            jobName, e))

return response

In the above code, the parameters dictionary is the place where you pass in parameters to the dataflow pipeline at run time. Now, as your cloud function is triggered by gcs event, you can access the file name from GCS and send it to your dataflow pipeline when you invoke it.

Does this answer?

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.