summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zhukov <33721692+LeaveMyYard@users.noreply.github.com>2024-03-28 13:04:34 +0200
committerGitHub <noreply@github.com>2024-03-28 13:04:34 +0200
commitd3d964ab66a6f67b87286e8a40c176a948d1fb85 (patch)
treec72ec19f06bca49dc671ea412a515caf84975eab
parent5546aef84aee5c2e290735f9cc658b8461eef4a1 (diff)
parent6058075e6a95516942085172787e4abcfdc99413 (diff)
Merge branch 'main' into show_cluster_name_flag
-rw-r--r--robusta_krr/utils/async_gen_merge.py42
1 files changed, 16 insertions, 26 deletions
diff --git a/robusta_krr/utils/async_gen_merge.py b/robusta_krr/utils/async_gen_merge.py
index 7152895..35c2c86 100644
--- a/robusta_krr/utils/async_gen_merge.py
+++ b/robusta_krr/utils/async_gen_merge.py
@@ -11,39 +11,29 @@ T = TypeVar("T")
def async_gen_merge(*aiters: AsyncIterable[T]) -> AsyncIterable[T]:
- queue = asyncio.Queue(1)
- run_count = len(aiters)
- cancelling = False
+ queue = asyncio.Queue()
+ iters_remaining = set(aiters)
async def drain(aiter):
- nonlocal run_count
try:
async for item in aiter:
- await queue.put((False, item))
- except Exception as e:
- if not cancelling:
- await queue.put((True, e))
- else:
- raise
+ await queue.put(item)
+ except Exception:
+ logger.exception(f"Error in async generator {aiter}")
finally:
- run_count -= 1
+ iters_remaining.discard(aiter)
+ await queue.put(None)
async def merged():
- try:
- while run_count:
- raised, next_item = await queue.get()
- if raised:
- cancel_tasks()
- raise next_item
- yield next_item
- finally:
- cancel_tasks()
+ while iters_remaining or not queue.empty():
+ item = await queue.get()
+
+ if item is None:
+ continue
+
+ yield item
- def cancel_tasks():
- nonlocal cancelling
- cancelling = True
- for t in tasks:
- t.cancel()
+ for aiter in aiters:
+ asyncio.create_task(drain(aiter))
- tasks = [asyncio.create_task(drain(aiter)) for aiter in aiters]
return merged()