Skip to content

Commit

Permalink
added RG streamcut and testcase to verify
Browse files Browse the repository at this point in the history
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Feb 22, 2024
1 parent 2b706cb commit b73a78e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 35 deletions.
44 changes: 9 additions & 35 deletions src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client_shared::ScopedStream;
use pravega_client::event::reader_group::ReaderGroup;
use pravega_client::event::reader_group::StreamCutVersioned;
use pravega_client::event::reader_group::StreamCutV1;
Expand Down Expand Up @@ -216,44 +215,19 @@ impl StreamReaderGroup {
}
}

/// Return the latest StreamCut for the given reader.
/// Return the latest StreamCut from ReaderGroup.
/// Use this StreamCut in the ReaderGroupConfig to initiate reading from this streamcut.
pub fn get_reader_streamcut(&self, reader_name: &str) -> PyResult<ReaderStreamCut> {
info!(
"Get reader streamcut for reader {:?} ", reader_name
);
pub fn get_streamcut(&self) -> PyResult<ReaderStreamCut> {

let streamcut = self
.runtime_handle
.block_on(self.reader_group.get_reader_streamcut(reader_name.to_string()));

match streamcut {
Ok(streamcut_value) => Ok(ReaderStreamCut{
reader_stream_cut: streamcut_value
}),
Err(e) => match e {
ReaderGroupStateError::SyncError { .. } => {
error!(
"Failed to get position for reader {:?} exception {:?} ",
reader_name, e
);
Err(exceptions::PyValueError::new_err(format!(
"Failed to get position for reader {:?} exception {:?} ",
reader_name, e
)))
}
ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => {
error!(
"Reader already offline, failed to get position for reader {:?} exception {:?} ",
reader_name, e
);
Err(exceptions::PyValueError::new_err(format!(
"Reader already offline, failed to get position for reader {:?} exception {:?} ",
reader_name, e
)))
}
},
}
.block_on(self.reader_group.get_streamcut());
info!(
"Got streamcut {:?} ", streamcut
);
Ok(ReaderStreamCut {
reader_stream_cut: streamcut
})
}

/// Returns the string representation.
Expand Down
65 changes: 65 additions & 0 deletions tests/pravega_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
import time

import pravega_client
import random
Expand Down Expand Up @@ -320,3 +321,67 @@ async def test_largeEvents(self):
count+=1
self.assertEqual(b'a'*100000, event.data(), "Invalid event data")
r1.release_segment(segment_slice)

# This test is to verify get_streamcut API
async def test_getReaderStreamcut(self):
suffix = str(random.randint(0, 100))
scope = "testReaderSC"
stream = "testStream" + suffix
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager = pravega_client.StreamManager("tcp://127.0.0.1:9090")
pravega_client.StreamReaderGroupConfig(True, scope, stream)
print("Creating a scope")
scope_result = stream_manager.create_scope(scope)
print(scope_result)
print("Creating a stream ", stream)
stream_result = stream_manager.create_stream(scope, stream, 1)
print(stream_result)

print("Creating a writer for Stream")
w1 = stream_manager.create_writer(scope, stream)

print("Write 4 events")
w1.write_event("test event1")
w1.write_event("test event2")
w1.write_event("test event3")
w1.write_event("test event4")
w1.flush()

# Create a reader Group Configuration to read from HEAD of stream.
rg_config = pravega_client.StreamReaderGroupConfig(False, scope, stream)
reader_group=stream_manager.create_reader_group_with_config("rg" + suffix, scope, rg_config)

r1 = reader_group.create_reader("reader-1")
segment_slice = await r1.get_segment_slice_async()
print(segment_slice)

# consume the segment slice for events. Read single event ie event0 and get ReaderStreamcut.
# Same should be used to construct readerGroup2 and that should start reading from event 2 ie event1
count=0
for event in segment_slice:
count+=1
print(event.data())
print("event offset is ")
print(event.offset())
if count == 2 :
break
self.assertEqual(count, 2, "Two events are expected")
time.sleep(3)
r1.release_segment(segment_slice)
rsm = reader_group.get_streamcut()
print("StreamnCut after reading 2 events from first slice and release segment")
print(rsm)

# create RG with streamcut starts from 2nd event
reader_group2=stream_manager.create_reader_group("rgnew" + suffix, scope, stream, False, rsm)
r2 = reader_group2.create_reader("reader-2")
segment_slice2 = await r2.get_segment_slice_async()
print(segment_slice2)
countnew=1
for event in segment_slice2:
countnew+=1
print(event.data())
print("event offset is ")
print(event.offset())

self.assertEqual(countnew, 2, "Two events are expected")

0 comments on commit b73a78e

Please sign in to comment.