You need to create flex template for dataflow.
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
In this, you can pass parameters dynamically to the dataflow at runtime. Once done, add dataflow trigger code in your cloud function similar to this:
def startDataflow(project, flex_template_path, jobName, bq_dataset, raw_table, prc_table, start_date):
# Defining JSON request for trigger flex DF Job
parameters = {
"gcp_project": project,
"bq_dataset": bq_dataset,
"raw_table": raw_table,
"prc_table": prc_table,
"start_date": start_date
}
environment = {
"stagingLocation": DATAFLOW_STAGING_LOCATION,
"additionalExperiments": DATAFLOW_ADDITIONAL_EXPERIMENTS,
"maxWorkers": DATAFLOW_MAX_WORKER_COUNT,
"machineType": DATAFLOW_MACHINE_TYPE,
"serviceAccountEmail": DATAFLOW_SERVICE_ACCOUNT_EMAIL,
"network": DATAFLOW_NETWORK,
"subnetwork": DATAFLOW_SUBNETWORK,
"ipConfiguration": DATAFLOW_IP_CONFIGURATION
}
body = {
"launchParameter": {
"jobName": jobName,
"parameters": parameters,
"environment": environment,
"containerSpecGcsPath": flex_template_path,
}
}
service = build("dataflow", "v1b3", cache_discovery=False)
# Creating request to trigger the Flex DF Pipeline
request = (
service.projects().locations().flexTemplates().launch(
projectId=DATAFLOW_RUN_LOCATION,
location=DATAFLOW_RUN_PROJECT_REGION,
body=body
)
)
try:
response = request.execute()
except Exception as e:
logging.exception(
"There was an exception while triggering the dataflow pipeline with the job name: {}. The exception is: {}".format(
jobName, e))
return response
In the above code, the parameters dictionary is the place where you pass in parameters to the dataflow pipeline at run time.
Now, as your cloud function is triggered by gcs event, you can access the file name from GCS and send it to your dataflow pipeline when you invoke it.
Does this answer?