1

I have aiogram bot with taskiq worker and scheduler (taskiq redis with taskiq_aiogram), in message handler I run dynamic interval schduled task, after some time (30-40 minutes), I getting: redis.exceptions.ResponseError: NOGROUP No such key 'taskiq' or consumer group 'taskiq' in XREADGROUP with GROUP option, but when i run static interval scheduled tasks (just decorator for async function, that starts working after worker and schduler run), everyting works without errors and tasks execute at correct interval.

here by brocker.py:

from typing import Annotated

from taskiq_redis import (
    RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource
)
from taskiq import TaskiqScheduler, Context, TaskiqDepends
from redis.asyncio import Redis
import taskiq_aiogram
from taskiq.events import TaskiqEvents

from bot import redis_client

from bot.config import REDIS_CLIENT_HOST, REDIS_CLIENT_PORT, REDIS_CLIENT_DB

redis_url = f"redis://{REDIS_CLIENT_HOST}:{REDIS_CLIENT_PORT}/{REDIS_CLIENT_DB}"

result_backend = RedisAsyncResultBackend(
    redis_url=redis_url,
)

broker = RedisStreamBroker(
    url=redis_url,
).with_result_backend(result_backend)


# ! run: taskiq worker bot.tasks.broker:broker

taskiq_aiogram.init(
    broker,
    "bot.main:dp",
    "bot.main:bot",
)

# define sources
dynamic_schedule_source = RedisScheduleSource(
    url=redis_url
)

scheduler = TaskiqScheduler(
    broker=broker,
    sources=[
        dynamic_schedule_source        
    ],
)
# ! run:  taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5


# * I also tried to fix error with this
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(context: Annotated[Context, TaskiqDepends()]):
    r = Redis.from_url(redis_url)

    try:
        await r.xgroup_create(
            name="taskiq",
            groupname="taskiq",
            id="$",
            mkstream=True,
        )
    except Exception as e:
        print(e)
        if "BUSYGROUP" not in str(e):
            raise

# help funcs
async def delete_schedule(id_: str):
    await dynamic_schedule_source.delete_schedule(id_)

async def delete_all_schedules():
    schedules = await dynamic_schedule_source.get_schedules()
    for schedule in schedules:
        await delete_schedule(schedule.schedule_id)

async def get_all_schedules_ids():
    schedules = await dynamic_schedule_source.get_schedules()
    return [schd.schedule_id for schd in schedules]

async def show_all_schedules():
    schedules = await dynamic_schedule_source.get_schedules()
    for schedule in schedules:
        print(schedule)


async def check_schedule(schedule_id: str) -> bool:
    key = f"{dynamic_schedule_source._prefix}:data:{schedule_id}"
    schedule = await redis_client.client.get(key)
    
    return schedule is not None

tasks.py:

from typing import Annotated, Awaitable, Any
from datetime import datetime, timedelta
from time import perf_counter, time

from taskiq import TaskiqDepends, Context
from aiogram import Bot

from bot.tasks import broker as tasks_broker
import bot.database as db
from bot.database.engine import session as db_session
from bot import redis_client

from bot.weather_parse import get_future_weather
from bot import texts
from bot.config import ADMIN_ID


@tasks_broker.broker.task
async def send_time_diff(start_seconds: float, user_id: int, context: Annotated[Context, TaskiqDepends()], bot: Bot = TaskiqDepends(), ):
    schedule_id = context.message.labels["schedule_id"]

    text = f"{(time() - start_seconds) / 60} {schedule_id}"
    await bot.send_message(user_id, text, disable_notification=True)

bot message handler where I run dynamic interval schedule:

start_seconds = time()

task = await tasks.send_time_diff.schedule_by_interval(
    source=dynamic_schedule_source,
    interval=WEATHER_TASK_SEND_INTERVAL,
    start_seconds=start_seconds,
    user_id=user_id
)

to run bot, taskiq and redis I use docker compose (maybe my mistake there)

docker-compose.yml:


version: '3.8'

services:
  bot:
    build: .
    depends_on:
      pg:
        condition: service_healthy
    environment:
      REDIS_CLIENT_HOST: redis
      REDIS_CLIENT_PORT: 6379
      DB_HOST: pg

    command: python -m bot.main

    volumes: 
      - ./app:/bot

  taskiq-worker:
    build: .
    command: taskiq worker bot.tasks.broker:broker
    depends_on:
      - redis
    environment:
      REDIS_CLIENT_HOST: redis
      REDIS_CLIENT_PORT: 6379
      DB_HOST: pg

    volumes:
      - ./app:/bot

  taskiq-scheduler:
    build: .
    command: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
    depends_on:
      - redis
    environment:
      REDIS_CLIENT_HOST: redis
      REDIS_CLIENT_PORT: 6379
      DB_HOST: pg
    volumes:
      - ./app:/bot

  pg:
    image: postgres:16.4
    environment:
      POSTGRES_DB: weather_bot_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: root
    volumes:
        - pg_data:/var/lib/postgresql/data/
  
    ports:
      - "5433:5432"

    healthcheck:
      test: [ "CMD", "pg_isready", "-q", "-d", "weather_bot_db", "-U", "postgres" ]
      interval: 10s
      timeout: 5s
      retries: 2

  pgadmin:
    image: dpage/pgadmin4:latest
    environment:
      PGADMIN_DEFAULT_EMAIL: [email protected]
      PGADMIN_DEFAULT_PASSWORD: admin
      PGADMIN_CONFIG_SERVER_MODE: 'False'
      
    ports:
      - "5050:80"
    depends_on:
      pg:
        condition: service_healthy


  redis:
    image: "redis:alpine"
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
    pg_data: 
    redis_data:

Dockerfile:

FROM python:3.12-alpine

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1


WORKDIR /app

COPY . .

RUN pip install --no-cache-dir -r requirements.txt


COPY . /app

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.