9

I'm trying to write my own awaiatbale function which could be used in an asyncio loop like asyncio.sleep() method or something like these pre-awaitable implemented methods.

Here is what I've done so far:

import asyncio

def coro1():
    for i in range(1, 10):
        yield i

def coro2():
    for i in range(1, 10):
        yield i*10

class Coro:  # Not used.
    def __await__(self):
        for i in range(1, 10):
            yield i * 100

@asyncio.coroutine
def wrapper1():
    return (yield from coro1())

@asyncio.coroutine
def wrapper2():
    return (yield from coro2())

for i in wrapper1():
    print(i)

print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))

async def async_wrapper():
    await wrapper1()
    await wrapper2()

asyncio.run(async_wrapper())

What I got as a result:

1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
  File "stack-coroutine.py", line 36, in <module>
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "stack-coroutine.py", line 30, in async_wrapper
    await wrapper1()
  File "stack-coroutine.py", line 18, in wrapper1
    return (yield from coro1())
  File "stack-coroutine.py", line 5, in coro1
    yield i
RuntimeError: Task got bad yield: 1

What I expect as a result:

1
10
2
20
3
30
.
.
.

[NOTE]:

  • I'm not looking for a multithread or multiprocess method.
  • This Question is almost similar to my question which has not resolved yet.
4
  • 1
    Can you show us working asyncio code a part of which you'd like to reimplement manually? Your async_wrapper runs the two coroutines in sequence, not in parallel. Commented Oct 7, 2019 at 13:22
  • Also, asyncio doesn't use generators to produce values to calling coroutines, it uses them to request suspension to the event loop. The yielded value is only visible to the event loop, not to intermediate coroutines, and in asyncio it contains a future object. Commented Oct 7, 2019 at 13:29
  • @user4815162342 Thanks for the response. I'm looking for a manner to implement an awaitable function like many pre-implemented awaitable methods (i.e. asyncio.sleep()). This code snippet is my effort to reach that. I would reach to mentioned result in my question with my own awaitable function. Commented Oct 7, 2019 at 13:35
  • 2
    You can take a look at the asyncio source code to see how basic coroutines like asyncio.sleep() are implemented. I can also recommend this lecture that shows how the event loop works by building one from scratch, live. The code in the question shows misconceptions about how async/await works in Python, so it the question doesn't appear answerable at this point, at least not within the format of a normal StackOverflow answer. Commented Oct 7, 2019 at 14:18

2 Answers 2

4

I found a concurrent/asynchronous approach using generators. However, it's not an asyncio approach:

from collections import deque

def coro1():
    for i in range(1, 5):
        yield i

def coro2():
    for i in range(1, 5):
        yield i*10

print('Async behaviour using default list with O(n)'.center(60, '#'))
tasks = list()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.pop(0)
    try:
        print(next(task))
        tasks.append(task)
    except StopIteration:
        pass

print('Async behaviour using deque with O(1)'.center(60, '#'))
tasks = deque()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.popleft()  # select and remove a task (coro1/coro2).
    try:
        print(next(task))
        tasks.append(task)  # add the removed task (coro1/coro2) for permutation.
    except StopIteration:
        pass

Out:

########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40

[UPDATE]:

Finally, I've solved this through asyncio syntax:

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

async def main():
    futures = [
        asyncio.create_task(coro1()),
        asyncio.create_task(coro2())
    ]
    await asyncio.gather(*futures)

asyncio.run(main())

Out:

1
10
2
20
3
30
4
40
5
50

And another concurrency coroutine approach via async-await expression and an event-loop manager based on Heap queue algorithm, without using asyncio library and its event-loop as well as without using asyncio.sleep() method:

import heapq
from time import sleep
from datetime import datetime, timedelta

class Sleep:
    def __init__(self, seconds):
        self.sleep_until = datetime.now() + timedelta(seconds=seconds)

    def __await__(self):
        yield self.sleep_until

async def coro1():
    for i in range(1, 6):
        await Sleep(0)
        print(i)

async def coro2():
    for i in range(1, 6):
        await Sleep(0)
        print(i * 10)

def coro_manager(*coros):
    coros = [(datetime.now(), coro) for coro in coros]
    heapq.heapify(coros)
    while coros:
        exec_at, coro = heapq.heappop(coros)
        if exec_at > datetime.now():
            sleep((exec_at - datetime.now()).total_seconds())
        try:
            heapq.heappush(coros, (coro.send(None), coro))
        except StopIteration:
            try:
                coros.remove(coro)
            except ValueError:
                pass

coro_manager(coro1(), coro2())

Out:

1
10
2
20
3
30
4
40
5
50
2

Usually you don't need to write low-level coroutines, using async def and awaiting inside it is a common way to achieve your goal.

However if you interested in implementation details here's source code of asyncio.sleep().

Similar to many other low-level asyncio functions it uses 3 main things to implement coroutine:

  • asyncio.Future() - "a bridge" between callbacks-world and coroutines-world
  • event loop's loop.call_later() method - on of several event loop's methods that tells directly to event loop when to do something
  • async def and await - just a syntax sugar for @asyncio.coroutine and yield from that allows to cast some function to generator (and execute it "one step at the time")

Here's my rough implementation of sleep that shows the idea:

import asyncio


# @asyncio.coroutine - for better tracebacks and edge cases, we can avoid it here
def my_sleep(delay):
    fut = asyncio.Future()

    loop = asyncio.get_event_loop()
    loop.call_later(
        delay,
        lambda *_: fut.set_result(True)
    )

    res = yield from fut
    return res


# Test:
@asyncio.coroutine
def main():
    yield from my_sleep(3)
    print('ok')


asyncio.run(main())

If you want to go lower than this you'll have to comprehend how generators (or coroutines) being managed by event loop. Video mentioned by user4815162342 - is a good place to start.

But again, all above - are details of implementation. You don't have to think about all this stuff unless you write something very-very low-level.

2
  • Thanks for the response. I see many libraries add the async version of their code by asyncio (i.e. pymodbus, pysnmp, etc). So I decided to write my own async method like them. I knew that the main async/await functionality of a method is suspending to reach another task at this time, thus I thought I should use the yield for suspension and I wouldn't eager to write a low-level manner or reimplement the implemented things. Commented Oct 8, 2019 at 5:06
  • @BenyaminJafari if you want to write async version for some library, easiest way is to run blocking code in executor with loop.run_in_executor() and await for the result. Second way is to use sync library's callbacks to set result for asyncio.Future that can be awaited in async code. But it's more complicated and not every sync library provides callbacks. It should suffice, there's no need to go into more low-level details. Commented Oct 8, 2019 at 16:08

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.