diff --git a/integration/test_flow_integration.py b/integration/test_flow_integration.py index 283ef31d..7c4a1c4a 100644 --- a/integration/test_flow_integration.py +++ b/integration/test_flow_integration.py @@ -261,6 +261,46 @@ def test_write_to_v3io_stream(assign_stream_teardown_test): asyncio.run(async_test_write_to_v3io_stream(assign_stream_teardown_test)) +async def async_test_write_to_v3io_stream_flow_reuse(stream_path, flow, iteration): + + controller = flow.run() + for i in range(10): + await controller.emit(i) + + await asyncio.sleep(5) + + try: + shard0_data = await GetShardData().get_shard_data(f"{stream_path}/0") + assert shard0_data == [b"0", b"2", b"4", b"6", b"8"] * iteration + shard1_data = await GetShardData().get_shard_data(f"{stream_path}/1") + assert shard1_data == [b"1", b"3", b"5", b"7", b"9"] * iteration + finally: + await controller.terminate() + await controller.await_termination() + + +# ML-5351 +def test_write_to_v3io_stream_flow_reuse(assign_stream_teardown_test): + stream_path = assign_stream_teardown_test + flow = build_flow( + [ + AsyncEmitSource(), + Map(lambda x: str(x)), + StreamTarget( + V3ioDriver(), + stream_path, + sharding_func=lambda event: int(event.body), + batch_size=8, + shards=2, + full_event=False, + ), + ] + ) + + asyncio.run(async_test_write_to_v3io_stream_flow_reuse(stream_path, flow, 1)) + asyncio.run(async_test_write_to_v3io_stream_flow_reuse(stream_path, flow, 2)) + + async def async_test_write_to_v3io_stream_full_event_readback( setup_stream_teardown_test, ):