I currently have a function, which yields results from an external source, which may arrive over a long time period.
async def run_command(data):
async with httpx.AsyncClient() as client:
url = f"http://hostname.com:8000/"
async with client.stream("POST", url, json=data, timeout=None) as r:
async for line in r.aiter_lines():
yield (json.loads(line), 200)
This currently works fine for one data call, and I can process the result simply like so:
async for result in run_command(data):
yield result
The result is passed back to the initial calling function which streams it to a GUI using Django channels.
async for result, status in self.call_command(dataSet):
await self.channel_layer.group_send(
self.group_name, {
'type': "transmit",
'message': result,
'status': status,
},
)
However, I would now like to call the run_command function concurrently for multiple data sets. I'm trying to do it like so:
for future in asyncio.as_completed([run_command(data) for data in dataSet]):
yield future
I get a TypeError: An asyncio.Future, a coroutine or an awaitable is required.
Replacing the last line with the two below, still gives the same error.
result = await future
yield result
Changing the for loop into async for loop doesn't work either with the error: TypeError: 'async for' requires an object with __aiter__ method, got generator.
So is it possible to yield results as they arrive from multiple futures?
If I understand your requirements correctly, this sounds like a textbook case for a queue producer-consumer setup.
Let me try and abstract this for you. You have some asynchronous generator that you create by supplying some input data. Once it's running, it yields results at irregular intervals.
Now you have multiple sets of data. You want to have multiple generators concurrently crunching away at those and you want to be able to process a result as soon as one is yielded by any of the generators.
The solution I propose is to write two coroutine functions - a
queue_producerand aqueue_consumer. Both receive the sameasyncio.Queueinstance as argument.The producer also receives one single piece of input data. It sets up the aforementioned asynchronous generator and begins iterating through it. As soon as a new item is yielded to it, it puts it into the queue.
The consumer is actually itself an asynchronous generator. It receives a timeout argument in addition to the queue. It starts an infinite loop of awaiting the next item it can get from the queue. Once it gets an item, it yields that item. If the waiting takes longer than the specified timeout, it breaks out of the loop and ends.
To demonstrate this, I will use this very silly and simple asynchronous iterator implementation that simply iterates over characters in a string with random sleep times in between each letter. I'll call it
funky_string_iter.Here is a full working example:
The output will obviously be more or less randomly ordered, but here is one example output I got:
This demonstrates how the characters are yielded by the
queue_consumeras soon as they are available (in the queue), which in turn depends on whichqueue_producertask yields the next character from its string.You can transfer this to your specific case by substituting that funky string iterator for your
run_command. Naturally, adjust the rest as needed.As for that error you got from the
asyncio.as_completediteration, it is not entirely clear. But that function does not seem to be the right fit as it is not intended for asynchronous iterators.