Skip to content

Commit

Permalink
reader streamcut for raeding from given streamcut
Browse files Browse the repository at this point in the history
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Feb 19, 2024
1 parent 3a825a5 commit 2b706cb
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#[macro_use]
extern crate cfg_if;

use crate::stream_reader_group::StreamReaderGroupConfig;
use crate::stream_reader_group::{ReaderStreamCut, StreamReaderGroupConfig};

mod byte_stream;
mod stream_manager;
Expand Down Expand Up @@ -58,6 +58,7 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<StreamReaderGroup>()?;
m.add_class::<StreamScalingPolicy>()?;
m.add_class::<StreamRetentionPolicy>()?;
m.add_class::<ReaderStreamCut>()?;
m.add_class::<ByteStream>()?;
let txn_exception = py.get_type::<TxnFailedException>();
txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?;
Expand Down
42 changes: 29 additions & 13 deletions src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// http://www.apache.org/licenses/LICENSE-2.0
//

use std::collections::HashMap;
use pravega_client::event::reader_group::{StreamCutV1, StreamCutVersioned};
use crate::stream_reader_group::ReaderStreamCut;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use crate::stream_writer_transactional::StreamTxnWriter;
Expand Down Expand Up @@ -558,32 +561,45 @@ impl StreamManager {
/// event.reader_group=manager.create_reader_group("rg1", "scope", "stream", true)
/// ```
///
#[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail)")]
#[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail, stream_cut)")]
#[args(read_from_tail = "false")]
pub fn create_reader_group(
&self,
reader_group_name: &str,
scope_name: &str,
stream_name: &str,
read_from_tail: bool,
stream_cut: Option<ReaderStreamCut>,
) -> PyResult<StreamReaderGroup> {
let scope = Scope::from(scope_name.to_string());
let stream = Stream::from(stream_name.to_string());
let scoped_stream = ScopedStream {
scope: scope.clone(),
stream: Stream::from(stream_name.to_string()),
stream: stream.clone(),
};
let handle = self.cf.runtime_handle();
let rg_config = if read_from_tail {
// Create a reader group to read from the current TAIL/end of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_tail_of_stream(scoped_stream)
.build()
} else {
// Create a reader group to read from current HEAD/start of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_head_of_stream(scoped_stream)
.build()
};
let rg_config = if let Some(ref stream_cut) = stream_cut {
let mut positions = HashMap::new();
// Iterate over the keys of the offset_map
for (segment_val, position) in stream_cut.reader_stream_cut.segment_offset_map.iter() {
let mut scoped_segment = ScopedSegment::new(scope.clone(), stream.clone(), Segment::from(*segment_val));
positions.insert(scoped_segment, *position);
}
let stream_cut_v1 = StreamCutV1::new(scoped_stream.clone(), positions);
// Create a reader group to read from given StreamCut .
ReaderGroupConfigBuilder::default().read_from_stream(scoped_stream.clone(), StreamCutVersioned::V1(stream_cut_v1)).build()
}else if read_from_tail {
// Create a reader group to read from the current TAIL/end of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_tail_of_stream(scoped_stream)
.build()
} else {
// Create a reader group to read from current HEAD/start of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_head_of_stream(scoped_stream)
.build()
};

let rg = handle.block_on(self.cf.create_reader_group_with_config(
reader_group_name.to_string(),
rg_config,
Expand Down
3 changes: 3 additions & 0 deletions src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ impl EventData {
fn data(&self) -> &[u8] {
self.value.as_slice()
}

///Return the offset
fn offset(&self) -> i64 { self.offset_in_segment }
/// Returns the string representation.
fn to_str(&self) -> String {
format!("offset {:?} data :{:?}", self.offset_in_segment, self.value)
Expand Down
79 changes: 78 additions & 1 deletion src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client_shared::ScopedStream;
use pravega_client::event::reader_group::ReaderGroup;
use pravega_client::event::reader_group::StreamCutVersioned;
use pravega_client::event::reader_group::StreamCutV1;
use std::collections::HashMap;
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::PyObjectProtocol;
Expand All @@ -22,9 +25,13 @@ cfg_if! {
use crate::stream_reader::StreamReader;
use pravega_client::event::reader_group::{ReaderGroupConfig, ReaderGroupConfigBuilder};
use pravega_client::event::reader_group_state::ReaderGroupStateError;
use pravega_client_shared::{Scope, Stream};
use pravega_client::event::reader_group_state::Offset;
use pravega_client_shared::{Scope, Stream, StreamCut};
use pravega_client_shared::{ScopedSegment, ScopedStream};
use pyo3::types::PyTuple;
use pyo3::exceptions;
use pyo3::prelude::*;
use pyo3::types::PyDict;
}
}

Expand Down Expand Up @@ -99,6 +106,36 @@ impl PyObjectProtocol for StreamReaderGroupConfig {
}
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(Clone)]
pub(crate) struct ReaderStreamCut {
pub(crate) reader_stream_cut: StreamCut,
}
#[cfg(feature = "python_binding")]
#[pymethods]
impl ReaderStreamCut {

/*fn get_scoped_stream(&self) -> ScopedStream {
self.reader_stream_cut.scoped_stream.clone()
}*/
fn get_segment_offset_map(&self) -> HashMap<i64, i64> {
self.reader_stream_cut.segment_offset_map.clone()
}

fn to_str(&self) -> String {
format!("ReaderStreamCut: {:?}", self.reader_stream_cut)
}
}

#[cfg(feature = "python_binding")]
#[pyproto]
impl PyObjectProtocol for ReaderStreamCut {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("ReaderStreamCut({:?})", self.to_str()))
}
}

///
/// This represents a Stream reader for a given Stream.
/// Note: A python object of StreamReader cannot be created directly without using the StreamManager.
Expand Down Expand Up @@ -179,6 +216,46 @@ impl StreamReaderGroup {
}
}

/// Return the latest StreamCut for the given reader.
/// Use this StreamCut in the ReaderGroupConfig to initiate reading from this streamcut.
pub fn get_reader_streamcut(&self, reader_name: &str) -> PyResult<ReaderStreamCut> {
info!(
"Get reader streamcut for reader {:?} ", reader_name
);

let streamcut = self
.runtime_handle
.block_on(self.reader_group.get_reader_streamcut(reader_name.to_string()));

match streamcut {
Ok(streamcut_value) => Ok(ReaderStreamCut{
reader_stream_cut: streamcut_value
}),
Err(e) => match e {
ReaderGroupStateError::SyncError { .. } => {
error!(
"Failed to get position for reader {:?} exception {:?} ",
reader_name, e
);
Err(exceptions::PyValueError::new_err(format!(
"Failed to get position for reader {:?} exception {:?} ",
reader_name, e
)))
}
ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => {
error!(
"Reader already offline, failed to get position for reader {:?} exception {:?} ",
reader_name, e
);
Err(exceptions::PyValueError::new_err(format!(
"Reader already offline, failed to get position for reader {:?} exception {:?} ",
reader_name, e
)))
}
},
}
}

/// Returns the string representation.
fn to_str(&self) -> String {
format!(
Expand Down

0 comments on commit 2b706cb

Please sign in to comment.