From 2b706cb25ac66796e00dd0c5a96cf6db3396944b Mon Sep 17 00:00:00 2001 From: Shwetha N Date: Mon, 19 Feb 2024 16:02:45 +0530 Subject: [PATCH] reader streamcut for raeding from given streamcut Signed-off-by: Shwetha N --- src/lib.rs | 3 +- src/stream_manager.rs | 42 +++++++++++++------- src/stream_reader.rs | 3 ++ src/stream_reader_group.rs | 79 +++++++++++++++++++++++++++++++++++++- 4 files changed, 112 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7444dc6..b45487a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ #[macro_use] extern crate cfg_if; -use crate::stream_reader_group::StreamReaderGroupConfig; +use crate::stream_reader_group::{ReaderStreamCut, StreamReaderGroupConfig}; mod byte_stream; mod stream_manager; @@ -58,6 +58,7 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; let txn_exception = py.get_type::(); txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?; diff --git a/src/stream_manager.rs b/src/stream_manager.rs index 2d39752..180d5bd 100644 --- a/src/stream_manager.rs +++ b/src/stream_manager.rs @@ -8,6 +8,9 @@ // http://www.apache.org/licenses/LICENSE-2.0 // +use std::collections::HashMap; +use pravega_client::event::reader_group::{StreamCutV1, StreamCutVersioned}; +use crate::stream_reader_group::ReaderStreamCut; cfg_if! { if #[cfg(feature = "python_binding")] { use crate::stream_writer_transactional::StreamTxnWriter; @@ -558,7 +561,7 @@ impl StreamManager { /// event.reader_group=manager.create_reader_group("rg1", "scope", "stream", true) /// ``` /// - #[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail)")] + #[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail, stream_cut)")] #[args(read_from_tail = "false")] pub fn create_reader_group( &self, @@ -566,24 +569,37 @@ impl StreamManager { scope_name: &str, stream_name: &str, read_from_tail: bool, + stream_cut: Option, ) -> PyResult { let scope = Scope::from(scope_name.to_string()); + let stream = Stream::from(stream_name.to_string()); let scoped_stream = ScopedStream { scope: scope.clone(), - stream: Stream::from(stream_name.to_string()), + stream: stream.clone(), }; let handle = self.cf.runtime_handle(); - let rg_config = if read_from_tail { - // Create a reader group to read from the current TAIL/end of the Stream. - ReaderGroupConfigBuilder::default() - .read_from_tail_of_stream(scoped_stream) - .build() - } else { - // Create a reader group to read from current HEAD/start of the Stream. - ReaderGroupConfigBuilder::default() - .read_from_head_of_stream(scoped_stream) - .build() - }; + let rg_config = if let Some(ref stream_cut) = stream_cut { + let mut positions = HashMap::new(); + // Iterate over the keys of the offset_map + for (segment_val, position) in stream_cut.reader_stream_cut.segment_offset_map.iter() { + let mut scoped_segment = ScopedSegment::new(scope.clone(), stream.clone(), Segment::from(*segment_val)); + positions.insert(scoped_segment, *position); + } + let stream_cut_v1 = StreamCutV1::new(scoped_stream.clone(), positions); + // Create a reader group to read from given StreamCut . + ReaderGroupConfigBuilder::default().read_from_stream(scoped_stream.clone(), StreamCutVersioned::V1(stream_cut_v1)).build() + }else if read_from_tail { + // Create a reader group to read from the current TAIL/end of the Stream. + ReaderGroupConfigBuilder::default() + .read_from_tail_of_stream(scoped_stream) + .build() + } else { + // Create a reader group to read from current HEAD/start of the Stream. + ReaderGroupConfigBuilder::default() + .read_from_head_of_stream(scoped_stream) + .build() + }; + let rg = handle.block_on(self.cf.create_reader_group_with_config( reader_group_name.to_string(), rg_config, diff --git a/src/stream_reader.rs b/src/stream_reader.rs index 419c913..a817d04 100644 --- a/src/stream_reader.rs +++ b/src/stream_reader.rs @@ -148,6 +148,9 @@ impl EventData { fn data(&self) -> &[u8] { self.value.as_slice() } + + ///Return the offset + fn offset(&self) -> i64 { self.offset_in_segment } /// Returns the string representation. fn to_str(&self) -> String { format!("offset {:?} data :{:?}", self.offset_in_segment, self.value) diff --git a/src/stream_reader_group.rs b/src/stream_reader_group.rs index a17ca22..f88afa9 100644 --- a/src/stream_reader_group.rs +++ b/src/stream_reader_group.rs @@ -12,6 +12,9 @@ cfg_if! { if #[cfg(feature = "python_binding")] { use pravega_client_shared::ScopedStream; use pravega_client::event::reader_group::ReaderGroup; + use pravega_client::event::reader_group::StreamCutVersioned; + use pravega_client::event::reader_group::StreamCutV1; + use std::collections::HashMap; use pyo3::prelude::*; use pyo3::PyResult; use pyo3::PyObjectProtocol; @@ -22,9 +25,13 @@ cfg_if! { use crate::stream_reader::StreamReader; use pravega_client::event::reader_group::{ReaderGroupConfig, ReaderGroupConfigBuilder}; use pravega_client::event::reader_group_state::ReaderGroupStateError; - use pravega_client_shared::{Scope, Stream}; + use pravega_client::event::reader_group_state::Offset; + use pravega_client_shared::{Scope, Stream, StreamCut}; + use pravega_client_shared::{ScopedSegment, ScopedStream}; use pyo3::types::PyTuple; use pyo3::exceptions; + use pyo3::prelude::*; + use pyo3::types::PyDict; } } @@ -99,6 +106,36 @@ impl PyObjectProtocol for StreamReaderGroupConfig { } } +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(Clone)] +pub(crate) struct ReaderStreamCut { + pub(crate) reader_stream_cut: StreamCut, +} +#[cfg(feature = "python_binding")] +#[pymethods] +impl ReaderStreamCut { + + /*fn get_scoped_stream(&self) -> ScopedStream { + self.reader_stream_cut.scoped_stream.clone() + }*/ + fn get_segment_offset_map(&self) -> HashMap { + self.reader_stream_cut.segment_offset_map.clone() + } + + fn to_str(&self) -> String { + format!("ReaderStreamCut: {:?}", self.reader_stream_cut) + } +} + +#[cfg(feature = "python_binding")] +#[pyproto] +impl PyObjectProtocol for ReaderStreamCut { + fn __repr__(&self) -> PyResult { + Ok(format!("ReaderStreamCut({:?})", self.to_str())) + } +} + /// /// This represents a Stream reader for a given Stream. /// Note: A python object of StreamReader cannot be created directly without using the StreamManager. @@ -179,6 +216,46 @@ impl StreamReaderGroup { } } + /// Return the latest StreamCut for the given reader. + /// Use this StreamCut in the ReaderGroupConfig to initiate reading from this streamcut. + pub fn get_reader_streamcut(&self, reader_name: &str) -> PyResult { + info!( + "Get reader streamcut for reader {:?} ", reader_name + ); + + let streamcut = self + .runtime_handle + .block_on(self.reader_group.get_reader_streamcut(reader_name.to_string())); + + match streamcut { + Ok(streamcut_value) => Ok(ReaderStreamCut{ + reader_stream_cut: streamcut_value + }), + Err(e) => match e { + ReaderGroupStateError::SyncError { .. } => { + error!( + "Failed to get position for reader {:?} exception {:?} ", + reader_name, e + ); + Err(exceptions::PyValueError::new_err(format!( + "Failed to get position for reader {:?} exception {:?} ", + reader_name, e + ))) + } + ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => { + error!( + "Reader already offline, failed to get position for reader {:?} exception {:?} ", + reader_name, e + ); + Err(exceptions::PyValueError::new_err(format!( + "Reader already offline, failed to get position for reader {:?} exception {:?} ", + reader_name, e + ))) + } + }, + } + } + /// Returns the string representation. fn to_str(&self) -> String { format!(