I have aiogram bot with taskiq worker and scheduler (taskiq redis with taskiq_aiogram), in message handler I run dynamic interval schduled task, after some time (30-40 minutes), I getting: redis.exceptions.ResponseError: NOGROUP No such key 'taskiq' or consumer group 'taskiq' in XREADGROUP with GROUP option, but when i run static interval scheduled tasks (just decorator for async function, that starts working after worker and schduler run), everyting works without errors and tasks execute at correct interval.
here by brocker.py:
from typing import Annotated
from taskiq_redis import (
RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource
)
from taskiq import TaskiqScheduler, Context, TaskiqDepends
from redis.asyncio import Redis
import taskiq_aiogram
from taskiq.events import TaskiqEvents
from bot import redis_client
from bot.config import REDIS_CLIENT_HOST, REDIS_CLIENT_PORT, REDIS_CLIENT_DB
redis_url = f"redis://{REDIS_CLIENT_HOST}:{REDIS_CLIENT_PORT}/{REDIS_CLIENT_DB}"
result_backend = RedisAsyncResultBackend(
redis_url=redis_url,
)
broker = RedisStreamBroker(
url=redis_url,
).with_result_backend(result_backend)
# ! run: taskiq worker bot.tasks.broker:broker
taskiq_aiogram.init(
broker,
"bot.main:dp",
"bot.main:bot",
)
# define sources
dynamic_schedule_source = RedisScheduleSource(
url=redis_url
)
scheduler = TaskiqScheduler(
broker=broker,
sources=[
dynamic_schedule_source
],
)
# ! run: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
# * I also tried to fix error with this
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(context: Annotated[Context, TaskiqDepends()]):
r = Redis.from_url(redis_url)
try:
await r.xgroup_create(
name="taskiq",
groupname="taskiq",
id="$",
mkstream=True,
)
except Exception as e:
print(e)
if "BUSYGROUP" not in str(e):
raise
# help funcs
async def delete_schedule(id_: str):
await dynamic_schedule_source.delete_schedule(id_)
async def delete_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
await delete_schedule(schedule.schedule_id)
async def get_all_schedules_ids():
schedules = await dynamic_schedule_source.get_schedules()
return [schd.schedule_id for schd in schedules]
async def show_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
print(schedule)
async def check_schedule(schedule_id: str) -> bool:
key = f"{dynamic_schedule_source._prefix}:data:{schedule_id}"
schedule = await redis_client.client.get(key)
return schedule is not None
tasks.py:
from typing import Annotated, Awaitable, Any
from datetime import datetime, timedelta
from time import perf_counter, time
from taskiq import TaskiqDepends, Context
from aiogram import Bot
from bot.tasks import broker as tasks_broker
import bot.database as db
from bot.database.engine import session as db_session
from bot import redis_client
from bot.weather_parse import get_future_weather
from bot import texts
from bot.config import ADMIN_ID
@tasks_broker.broker.task
async def send_time_diff(start_seconds: float, user_id: int, context: Annotated[Context, TaskiqDepends()], bot: Bot = TaskiqDepends(), ):
schedule_id = context.message.labels["schedule_id"]
text = f"{(time() - start_seconds) / 60} {schedule_id}"
await bot.send_message(user_id, text, disable_notification=True)
bot message handler where I run dynamic interval schedule:
start_seconds = time()
task = await tasks.send_time_diff.schedule_by_interval(
source=dynamic_schedule_source,
interval=WEATHER_TASK_SEND_INTERVAL,
start_seconds=start_seconds,
user_id=user_id
)
to run bot, taskiq and redis I use docker compose (maybe my mistake there)
docker-compose.yml:
version: '3.8'
services:
bot:
build: .
depends_on:
pg:
condition: service_healthy
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
command: python -m bot.main
volumes:
- ./app:/bot
taskiq-worker:
build: .
command: taskiq worker bot.tasks.broker:broker
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot
taskiq-scheduler:
build: .
command: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot
pg:
image: postgres:16.4
environment:
POSTGRES_DB: weather_bot_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: root
volumes:
- pg_data:/var/lib/postgresql/data/
ports:
- "5433:5432"
healthcheck:
test: [ "CMD", "pg_isready", "-q", "-d", "weather_bot_db", "-U", "postgres" ]
interval: 10s
timeout: 5s
retries: 2
pgadmin:
image: dpage/pgadmin4:latest
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: admin
PGADMIN_CONFIG_SERVER_MODE: 'False'
ports:
- "5050:80"
depends_on:
pg:
condition: service_healthy
redis:
image: "redis:alpine"
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
pg_data:
redis_data:
Dockerfile:
FROM python:3.12-alpine
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app