0

I am trying to implement a simple mock api that implements webhooks using flask-smorest.

Basically, I am trying to mimic an api that process a video and sends a post response of succesful/error once the video has finished processing the video to the entered webhook url.

I am using asyncio to achieve this. However, I am not able to call an asynchronous function inside the post route

my routes.py look like this

from flask import abort
from flask.views import MethodView
from flask_smorest import Blueprint
from marshmallow import ValidationError

from server.models import TranslationJob
from server.schema import CreateStatusSchema, StatusResponseSchema
import uuid
from server.services.webhook import WebhookService
from server.models import StatusEnum

# In-memory storage
jobs_store = {}

status_blueprint = Blueprint(
    "status",
    "status",
    url_prefix="/translation",
    description="Status of translation jobs",
)

loop = asyncio.get_event_loop()


@status_blueprint.route("/status", methods=["POST", "GET"])
class TranslationCollection(MethodView):
    @status_blueprint.arguments(CreateStatusSchema, location="json")
    def post(self, job_data):
        try:
            job = TranslationJob(
                duration=job_data["duration"],
                webhook_url=job_data["webhook_url"]
            )
            jobs_store[job.id] = job
            # Start webhook monitoring if URL provided
            asyncio.create_task(start_monitor_status(job))
            return {"message": "Job created successfully", "job_id": str(job.id)}
        except ValidationError as e:
            return {"message": "Validation error", "errors": e.messages}, 400
        except Exception as e:
            return {"message": str(e)}, 500
    
    def get(self):
        jobs = [job.to_dict() for job in jobs_store.values()]
        return {"jobs": jobs}

@status_blueprint.route("/status/<job_id>", methods=["GET"])
class TranslationItem(MethodView):
    @status_blueprint.response(200, StatusResponseSchema)
    def get(self, job_id):
        try:
            job_id = uuid.UUID(job_id)
        except ValueError:
            abort(400, description="Invalid job ID format")

        if job_id not in jobs_store:
            abort(404, description=f"Job with ID {job_id} not found")

        return jobs_store[job_id].to_dict()
    
async def start_monitor_status(job: TranslationJob):
    await _monitor_job_status(job)  # Directly await the monitoring function

async def _monitor_job_status(job: TranslationJob):
    previous_status = job.get_status()
    
    while True:
        current_status = job.get_status()
        
        if current_status != previous_status:
            await WebhookService.send_webhook(job)
            
            if current_status in [StatusEnum.COMPLETED, StatusEnum.ERROR]:
                break
                
        previous_status = current_status
        await asyncio.sleep(1)
    

In here when the user post a video, it generates a TranslationJob that has a duration that just basically say for how long the video is processing and a webhook url to post when the video is finished.

I create the job, add it to the memory store and I want to call a function that constantly calls the status of the job to see if it is completed, however, I want it to be asynchronus. I don't want to wait to wait till the video is complete to return to the user that the job was created succesfully

If I leave the function as is I get

RuntimeWarning: coroutine 'start_monitor_status' was never awaited
  return {"message": str(e)}, 500

however, if I make the post function async I get

The return type must be a string, dict, list, tuple with headers or status, Response instance, or WSGI callable, but it was a coroutine.

how can I modify this for it to work?

my init.py file looks like this

from flask import Flask
from flask_smorest import Api

from server.config import APIConfig
from server.routes import status_blueprint

def create_app() -> Flask:
    app = Flask(__name__)
    # app.config['ASYNC_MODE'] = True
    app.config.from_object(APIConfig)
    app.debug = True
    
    api = Api(app)
    api.register_blueprint(status_blueprint)
    
    return app

and the program entry script looks like this (also trying to use async to make it work) import asyncio

from server import create_app

app = create_app()

async def main():
    await app

if __name__ == "__main__":
    asyncio.run(main())

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.