diff --git a/src/stream_reader_group.rs b/src/stream_reader_group.rs index f88afa9..886189e 100644 --- a/src/stream_reader_group.rs +++ b/src/stream_reader_group.rs @@ -10,7 +10,6 @@ cfg_if! { if #[cfg(feature = "python_binding")] { - use pravega_client_shared::ScopedStream; use pravega_client::event::reader_group::ReaderGroup; use pravega_client::event::reader_group::StreamCutVersioned; use pravega_client::event::reader_group::StreamCutV1; @@ -216,44 +215,19 @@ impl StreamReaderGroup { } } - /// Return the latest StreamCut for the given reader. + /// Return the latest StreamCut from ReaderGroup. /// Use this StreamCut in the ReaderGroupConfig to initiate reading from this streamcut. - pub fn get_reader_streamcut(&self, reader_name: &str) -> PyResult { - info!( - "Get reader streamcut for reader {:?} ", reader_name - ); + pub fn get_streamcut(&self) -> PyResult { let streamcut = self .runtime_handle - .block_on(self.reader_group.get_reader_streamcut(reader_name.to_string())); - - match streamcut { - Ok(streamcut_value) => Ok(ReaderStreamCut{ - reader_stream_cut: streamcut_value - }), - Err(e) => match e { - ReaderGroupStateError::SyncError { .. } => { - error!( - "Failed to get position for reader {:?} exception {:?} ", - reader_name, e - ); - Err(exceptions::PyValueError::new_err(format!( - "Failed to get position for reader {:?} exception {:?} ", - reader_name, e - ))) - } - ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => { - error!( - "Reader already offline, failed to get position for reader {:?} exception {:?} ", - reader_name, e - ); - Err(exceptions::PyValueError::new_err(format!( - "Reader already offline, failed to get position for reader {:?} exception {:?} ", - reader_name, e - ))) - } - }, - } + .block_on(self.reader_group.get_streamcut()); + info!( + "Got streamcut {:?} ", streamcut + ); + Ok(ReaderStreamCut { + reader_stream_cut: streamcut + }) } /// Returns the string representation. diff --git a/tests/pravega_reader_test.py b/tests/pravega_reader_test.py index 7b633c8..3b1ea4e 100644 --- a/tests/pravega_reader_test.py +++ b/tests/pravega_reader_test.py @@ -7,6 +7,7 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # +import time import pravega_client import random @@ -320,3 +321,67 @@ async def test_largeEvents(self): count+=1 self.assertEqual(b'a'*100000, event.data(), "Invalid event data") r1.release_segment(segment_slice) + + # This test is to verify get_streamcut API + async def test_getReaderStreamcut(self): + suffix = str(random.randint(0, 100)) + scope = "testReaderSC" + stream = "testStream" + suffix + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager = pravega_client.StreamManager("tcp://127.0.0.1:9090") + pravega_client.StreamReaderGroupConfig(True, scope, stream) + print("Creating a scope") + scope_result = stream_manager.create_scope(scope) + print(scope_result) + print("Creating a stream ", stream) + stream_result = stream_manager.create_stream(scope, stream, 1) + print(stream_result) + + print("Creating a writer for Stream") + w1 = stream_manager.create_writer(scope, stream) + + print("Write 4 events") + w1.write_event("test event1") + w1.write_event("test event2") + w1.write_event("test event3") + w1.write_event("test event4") + w1.flush() + + # Create a reader Group Configuration to read from HEAD of stream. + rg_config = pravega_client.StreamReaderGroupConfig(False, scope, stream) + reader_group=stream_manager.create_reader_group_with_config("rg" + suffix, scope, rg_config) + + r1 = reader_group.create_reader("reader-1") + segment_slice = await r1.get_segment_slice_async() + print(segment_slice) + + # consume the segment slice for events. Read single event ie event0 and get ReaderStreamcut. + # Same should be used to construct readerGroup2 and that should start reading from event 2 ie event1 + count=0 + for event in segment_slice: + count+=1 + print(event.data()) + print("event offset is ") + print(event.offset()) + if count == 2 : + break + self.assertEqual(count, 2, "Two events are expected") + time.sleep(3) + r1.release_segment(segment_slice) + rsm = reader_group.get_streamcut() + print("StreamnCut after reading 2 events from first slice and release segment") + print(rsm) + + # create RG with streamcut starts from 2nd event + reader_group2=stream_manager.create_reader_group("rgnew" + suffix, scope, stream, False, rsm) + r2 = reader_group2.create_reader("reader-2") + segment_slice2 = await r2.get_segment_slice_async() + print(segment_slice2) + countnew=1 + for event in segment_slice2: + countnew+=1 + print(event.data()) + print("event offset is ") + print(event.offset()) + + self.assertEqual(countnew, 2, "Two events are expected") \ No newline at end of file