We are trying to deploy a FastAPI endpoint, which calls a function to dynamically create a prefect deployment (sourced from git) but its failing. Below is the code that is being used async def create_deployment_for_batch_job( batch_job_id: int, project_id: int, user_id: str, query: str, limit: int, priority: str = "low", cron_schedule: str = None, ): """Creates a Prefect deployment for a batch job""" print("start") deployment_name = f"batch_job_{batch_job_id}" print("deployment_name") # Create deployment that will call the endpoint deployment = await flow.from_source( source=git_repo, entrypoint="backend/app/api/v1/endpoints/batch.py:run_batch_pipeline_flow",
)
await deployment.deploy(
name=deployment_name,
work_pool_name="my-local-pool",
parameters={
"query": query,
"limit": limit,
"priority": priority,
"user_id": user_id,
"project_id": project_id,
"batch_job_id": batch_job_id
},
# schedule=CronSchedule(cron=cron_schedule, timezone="UTC") if cron_schedule else None,
tags=["batch-pipeline", "prefect-git-deployment"]
)
this is being called within the below function
@router.post("/projects/{project_id}/batch_job_creation/")
async def create_batch_job_publication_retrieval(
project_id: int,
query: str,
limit: int ,
priority: str,
user: CurrentUser,
cron_job_schedule: Optional[str] = None,
):
"""
Create a new batch job for publication retrieval.
"""
user_id, _ = user
batch_service = BatchService()
try:
match = re.search(r'("(\d{4})"[PDAT] *: *"(\d{4})"[PDAT])', query)
if match:
start_date = match.group(1)
end_date = match.group(2)
else:
start_date = None
end_date = None
#started_at = time.time()
current_year = datetime.now(timezone.utc).year
start_date = start_date or str(current_year - 5) # Default to 5 years ago
end_date = end_date or str(current_year) # Default to current year
batch_job_id = await batch_service.create_batch_job(project_id=project_id, search_query=query, start_date=start_date, end_date=end_date,
max_publications=limit, priority=priority, user_id=user_id, cron_job_schedule = cron_job_schedule)
# TODO: This functionality works fine when run using from a standalone script [deployment_test.py in tests] but is failing here. Need to Debug.
# Create Prefect deployment
print("passed batch job", batch_job_id)
await create_deployment_for_batch_job(
batch_job_id=batch_job_id,
project_id=project_id,
user_id=user_id,
query=query,
limit=limit,
priority=priority,
cron_schedule=cron_job_schedule
)
The above code gives these errors cannot pickle 'coroutine' object coroutine object does not have a function "deploy" (if i remove await from the flow_from_source function) cannot call asyncio within a running event loop (when i tried to change await to asyncio.run()) Also tried making it a synchronous function (create_deployment_for_batch_job) Also tried using create_task and then awaiting that task. Tried these links https://github.com/PrefectHQ/prefect/issues/15008 https://linen.prefect.io/t/23211429/ulva73b9p-i-d-like-to-deploy-a-flow-using-from-source-from-a https://linen.prefect.io/t/26842108/ulva73b9p-ulva73b9p-i-am-upgrading-to-prefect-3-from-prefect https://linen.prefect.io/t/26884307/ulva73b9p-how-do-i-run-prefect-within-a-docker-container-tha and a few more but to no avail.