I am trying to implement a simple mock api that implements webhooks using flask-smorest.
Basically, I am trying to mimic an api that process a video and sends a post response of succesful/error once the video has finished processing the video to the entered webhook url.
I am using asyncio to achieve this. However, I am not able to call an asynchronous function inside the post route
my routes.py look like this
from flask import abort
from flask.views import MethodView
from flask_smorest import Blueprint
from marshmallow import ValidationError
from server.models import TranslationJob
from server.schema import CreateStatusSchema, StatusResponseSchema
import uuid
from server.services.webhook import WebhookService
from server.models import StatusEnum
# In-memory storage
jobs_store = {}
status_blueprint = Blueprint(
"status",
"status",
url_prefix="/translation",
description="Status of translation jobs",
)
loop = asyncio.get_event_loop()
@status_blueprint.route("/status", methods=["POST", "GET"])
class TranslationCollection(MethodView):
@status_blueprint.arguments(CreateStatusSchema, location="json")
def post(self, job_data):
try:
job = TranslationJob(
duration=job_data["duration"],
webhook_url=job_data["webhook_url"]
)
jobs_store[job.id] = job
# Start webhook monitoring if URL provided
asyncio.create_task(start_monitor_status(job))
return {"message": "Job created successfully", "job_id": str(job.id)}
except ValidationError as e:
return {"message": "Validation error", "errors": e.messages}, 400
except Exception as e:
return {"message": str(e)}, 500
def get(self):
jobs = [job.to_dict() for job in jobs_store.values()]
return {"jobs": jobs}
@status_blueprint.route("/status/<job_id>", methods=["GET"])
class TranslationItem(MethodView):
@status_blueprint.response(200, StatusResponseSchema)
def get(self, job_id):
try:
job_id = uuid.UUID(job_id)
except ValueError:
abort(400, description="Invalid job ID format")
if job_id not in jobs_store:
abort(404, description=f"Job with ID {job_id} not found")
return jobs_store[job_id].to_dict()
async def start_monitor_status(job: TranslationJob):
await _monitor_job_status(job) # Directly await the monitoring function
async def _monitor_job_status(job: TranslationJob):
previous_status = job.get_status()
while True:
current_status = job.get_status()
if current_status != previous_status:
await WebhookService.send_webhook(job)
if current_status in [StatusEnum.COMPLETED, StatusEnum.ERROR]:
break
previous_status = current_status
await asyncio.sleep(1)
In here when the user post a video, it generates a TranslationJob that has a duration that just basically say for how long the video is processing and a webhook url to post when the video is finished.
I create the job, add it to the memory store and I want to call a function that constantly calls the status of the job to see if it is completed, however, I want it to be asynchronus. I don't want to wait to wait till the video is complete to return to the user that the job was created succesfully
If I leave the function as is I get
RuntimeWarning: coroutine 'start_monitor_status' was never awaited
return {"message": str(e)}, 500
however, if I make the post function async I get
The return type must be a string, dict, list, tuple with headers or status, Response instance, or WSGI callable, but it was a coroutine.
how can I modify this for it to work?
my init.py file looks like this
from flask import Flask
from flask_smorest import Api
from server.config import APIConfig
from server.routes import status_blueprint
def create_app() -> Flask:
app = Flask(__name__)
# app.config['ASYNC_MODE'] = True
app.config.from_object(APIConfig)
app.debug = True
api = Api(app)
api.register_blueprint(status_blueprint)
return app
and the program entry script looks like this (also trying to use async to make it work) import asyncio
from server import create_app
app = create_app()
async def main():
await app
if __name__ == "__main__":
asyncio.run(main())