0

We are trying to deploy a FastAPI endpoint, which calls a function to dynamically create a prefect deployment (sourced from git) but its failing. Below is the code that is being used async def create_deployment_for_batch_job( batch_job_id: int, project_id: int, user_id: str, query: str, limit: int, priority: str = "low", cron_schedule: str = None, ): """Creates a Prefect deployment for a batch job""" print("start") deployment_name = f"batch_job_{batch_job_id}" print("deployment_name") # Create deployment that will call the endpoint deployment = await flow.from_source( source=git_repo, entrypoint="backend/app/api/v1/endpoints/batch.py:run_batch_pipeline_flow",

)
await deployment.deploy(
    name=deployment_name,
    work_pool_name="my-local-pool",
    parameters={
        "query": query,
        "limit": limit,
        "priority": priority,
        "user_id": user_id,
        "project_id": project_id,
        "batch_job_id": batch_job_id
    },
    # schedule=CronSchedule(cron=cron_schedule, timezone="UTC") if cron_schedule else None,
    tags=["batch-pipeline", "prefect-git-deployment"]
)

this is being called within the below function @router.post("/projects/{project_id}/batch_job_creation/") async def create_batch_job_publication_retrieval( project_id: int, query: str, limit: int , priority: str, user: CurrentUser, cron_job_schedule: Optional[str] = None, ): """ Create a new batch job for publication retrieval.
""" user_id, _ = user batch_service = BatchService() try: match = re.search(r'("(\d{4})"[PDAT] *: *"(\d{4})"[PDAT])', query) if match: start_date = match.group(1) end_date = match.group(2) else: start_date = None end_date = None #started_at = time.time() current_year = datetime.now(timezone.utc).year start_date = start_date or str(current_year - 5) # Default to 5 years ago end_date = end_date or str(current_year) # Default to current year

    batch_job_id = await batch_service.create_batch_job(project_id=project_id, search_query=query, start_date=start_date, end_date=end_date, 
                                                        max_publications=limit, priority=priority, user_id=user_id, cron_job_schedule = cron_job_schedule)
    
    # TODO: This functionality works fine when run using from a standalone script [deployment_test.py in tests] but is failing here. Need to Debug.
    # Create Prefect deployment
    print("passed batch job", batch_job_id)
    await create_deployment_for_batch_job(
         batch_job_id=batch_job_id,
         project_id=project_id,
         user_id=user_id,
         query=query,
         limit=limit,
         priority=priority,
         cron_schedule=cron_job_schedule
    )

The above code gives these errors cannot pickle 'coroutine' object coroutine object does not have a function "deploy" (if i remove await from the flow_from_source function) cannot call asyncio within a running event loop (when i tried to change await to asyncio.run()) Also tried making it a synchronous function (create_deployment_for_batch_job) Also tried using create_task and then awaiting that task. Tried these links https://github.com/PrefectHQ/prefect/issues/15008 https://linen.prefect.io/t/23211429/ulva73b9p-i-d-like-to-deploy-a-flow-using-from-source-from-a https://linen.prefect.io/t/26842108/ulva73b9p-ulva73b9p-i-am-upgrading-to-prefect-3-from-prefect https://linen.prefect.io/t/26884307/ulva73b9p-how-do-i-run-prefect-within-a-docker-container-tha and a few more but to no avail.

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.