7,921 questions
0
votes
1
answer
28
views
asyncio.server: how to filter on remote IP-address
I have a Tcp socket server class that is to run in CPython and MicroPython. It does, no problem there. Only I would like to extend its functionality with an IP filter. From an earlier version that ...
-1
votes
0
answers
65
views
Python Async loop freezes when checking task.done()
I am running multiple asyncio. Task to execute user request faster on my fastAPI websocket server. However occasionally (once per week), my whole async loop freezes on line where I check if one of my ...
0
votes
1
answer
23
views
Asyncio: read new data from process stdout without blocking monitoring task
I write asynchronous process manager, and up till now I came up with this:
import asyncio
class AsyncLocalProcessManager:
def __init__(self):
self.process = None
async def submit(...
2
votes
0
answers
29
views
asyncio httpio boto3 how to find memory leaks
I'm struggling since a while to determine where my memory leaks are coming from.
I'm using asyncio, httpio boto3 and so on. The basic idea of my app is :
getting urls of the newest documents
...
0
votes
3
answers
134
views
Converting Python socket server to Asyncio
I have already converted a multi-treaded HTTP proxy ran by a socket server into a single-treaded proxy ran by Asyncio. And, it works well for HTTP. However, when I try to reproduce the same with SSL ...
2
votes
1
answer
49
views
VSCode Jupyter Notebook - Stop Cell Execution
I've an issue with proper termination of an async python code running in a jupyter notebook cell. I would like to use the stop button on a jupyter notebook cell to exit cleanly the code running in the ...
1
vote
2
answers
44
views
task.result() throws InvalidStateError instead of CancelledError on cancelled task
I built the following construct which is supposed to call a bunch of asynchronous operations and return as soon as any one of them completes or the timeout expires.
import asyncio
async def ...
2
votes
2
answers
74
views
Why is my asyncio task not executing immediately after `asyncio.create_task()`?
import asyncio
async def worker():
print("worker start")
await asyncio.sleep(1)
print("worker done")
async def main():
# Schedule the coroutine
asyncio....
2
votes
2
answers
70
views
Exceptions being hidden by asyncio Queue.join()
I am using an API client supplied by a vendor (Okta) that has very poor/old examples of running with async - for example (the Python documentation says not to use get_event_loop()):
from okta.client ...
2
votes
1
answer
45
views
Is a lock needed when multiple tasks push into the same asyncio Queue?
Consider this example where I have 3 worker tasks that push results in a queue and a tasks that deals with the pushed data.
async def worker1(queue: asyncio.Queue):
while True:
res = ...
0
votes
1
answer
26
views
Running WebSocket server and separate while loop simultaneously
The following code is for a RC flight controller based around an Raspberry Pi (Zero 2W). The purpose of the code is to receive input controls from my Iphone while also managing a few PID controllers ...
-1
votes
1
answer
33
views
Sending a discord message without needing to have any input in discord
I'm making an AI thing that uses voice to text and text to speech to communicate, and I thought it would be nice to give it access to Discord, just for fun, but I'm only finding how to send messages ...
1
vote
1
answer
151
views
Properly typing a Python decorator with ParamSpec and Concatenate that allows for arbitrary argument positioning?
I have an existing Python decorator that ensures that a method is given a psycopg AsyncConnection instance. I'm trying to update the typing to use ParamSpec and Concatenate as the current ...
0
votes
1
answer
88
views
How to set an end time for a LiveKit room and send a warning message 1 minute before disconnecting using Python?
I'm working with LiveKit and a multimodal agent in Python for a voice-based interview session. I'd like to implement the following:
Automatically disconnect the agent at a specific end time.
Send a &...
1
vote
1
answer
55
views
Mock asyncio.sleep to be faster in unittest
I want to mock asyncio.sleep to shorten the delay, e.g. by a factor of 10, to speed up my tests while also trying to surface any possible race conditions or other bugs as a crude sanity check. However ...