Skip to content

Commit

Permalink
Expose Reader streamcut to read from given Stream Cut (#22)
Browse files Browse the repository at this point in the history
If Client application crashes and if user wants to continue read from the stream where it left off, currently, there is no method to obtain StreamCuts from the ReaderGroup.
StreamCuts are set of segment/offset pairs for a single stream that represent a consistent position in the stream.

Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak authored Feb 24, 2024
1 parent 3a825a5 commit 14a02ff
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#[macro_use]
extern crate cfg_if;

use crate::stream_reader_group::StreamReaderGroupConfig;
use crate::stream_reader_group::{StreamCuts, StreamReaderGroupConfig};

mod byte_stream;
mod stream_manager;
Expand Down Expand Up @@ -58,6 +58,7 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<StreamReaderGroup>()?;
m.add_class::<StreamScalingPolicy>()?;
m.add_class::<StreamRetentionPolicy>()?;
m.add_class::<StreamCuts>()?;
m.add_class::<ByteStream>()?;
let txn_exception = py.get_type::<TxnFailedException>();
txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?;
Expand Down
42 changes: 29 additions & 13 deletions src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// http://www.apache.org/licenses/LICENSE-2.0
//

use std::collections::HashMap;
use pravega_client::event::reader_group::{StreamCutV1, StreamCutVersioned};
use crate::stream_reader_group::StreamCuts;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use crate::stream_writer_transactional::StreamTxnWriter;
Expand Down Expand Up @@ -558,32 +561,45 @@ impl StreamManager {
/// event.reader_group=manager.create_reader_group("rg1", "scope", "stream", true)
/// ```
///
#[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail)")]
#[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail, stream_cut)")]
#[args(read_from_tail = "false")]
pub fn create_reader_group(
&self,
reader_group_name: &str,
scope_name: &str,
stream_name: &str,
read_from_tail: bool,
stream_cut: Option<StreamCuts>,
) -> PyResult<StreamReaderGroup> {
let scope = Scope::from(scope_name.to_string());
let stream = Stream::from(stream_name.to_string());
let scoped_stream = ScopedStream {
scope: scope.clone(),
stream: Stream::from(stream_name.to_string()),
stream: stream.clone(),
};
let handle = self.cf.runtime_handle();
let rg_config = if read_from_tail {
// Create a reader group to read from the current TAIL/end of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_tail_of_stream(scoped_stream)
.build()
} else {
// Create a reader group to read from current HEAD/start of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_head_of_stream(scoped_stream)
.build()
};
let rg_config = if let Some(ref stream_cut) = stream_cut {
let mut positions = HashMap::new();
// Iterate over the keys of the offset_map
for (segment_val, position) in stream_cut.stream_cuts.segment_offset_map.iter() {
let scoped_segment = ScopedSegment::new(scope.clone(), stream.clone(), Segment::from(*segment_val));
positions.insert(scoped_segment, *position);
}
let stream_cut_v1 = StreamCutV1::new(scoped_stream.clone(), positions);
// Create a reader group to read from given StreamCut .
ReaderGroupConfigBuilder::default().read_from_stream(scoped_stream.clone(), StreamCutVersioned::V1(stream_cut_v1)).build()
}else if read_from_tail {
// Create a reader group to read from the current TAIL/end of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_tail_of_stream(scoped_stream)
.build()
} else {
// Create a reader group to read from current HEAD/start of the Stream.
ReaderGroupConfigBuilder::default()
.read_from_head_of_stream(scoped_stream)
.build()
};

let rg = handle.block_on(self.cf.create_reader_group_with_config(
reader_group_name.to_string(),
rg_config,
Expand Down
3 changes: 3 additions & 0 deletions src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ impl EventData {
fn data(&self) -> &[u8] {
self.value.as_slice()
}

///Return the offset
fn offset(&self) -> i64 { self.offset_in_segment }
/// Returns the string representation.
fn to_str(&self) -> String {
format!("offset {:?} data :{:?}", self.offset_in_segment, self.value)
Expand Down
47 changes: 45 additions & 2 deletions src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client_shared::ScopedStream;
use pravega_client::event::reader_group::ReaderGroup;
use std::collections::HashMap;
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::PyObjectProtocol;
Expand All @@ -22,7 +22,8 @@ cfg_if! {
use crate::stream_reader::StreamReader;
use pravega_client::event::reader_group::{ReaderGroupConfig, ReaderGroupConfigBuilder};
use pravega_client::event::reader_group_state::ReaderGroupStateError;
use pravega_client_shared::{Scope, Stream};
use pravega_client_shared::{Scope, Stream, StreamCut};
use pravega_client_shared::ScopedStream;
use pyo3::types::PyTuple;
use pyo3::exceptions;
}
Expand Down Expand Up @@ -99,6 +100,33 @@ impl PyObjectProtocol for StreamReaderGroupConfig {
}
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(Clone)]
pub(crate) struct StreamCuts {
pub(crate) stream_cuts: StreamCut,
}
#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamCuts {

fn get_segment_offset_map(&self) -> HashMap<i64, i64> {
self.stream_cuts.segment_offset_map.clone()
}

fn to_str(&self) -> String {
format!("StreamCuts: {:?}", self.stream_cuts)
}
}

#[cfg(feature = "python_binding")]
#[pyproto]
impl PyObjectProtocol for StreamCuts {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("StreamCuts({:?})", self.to_str()))
}
}

///
/// This represents a Stream reader for a given Stream.
/// Note: A python object of StreamReader cannot be created directly without using the StreamManager.
Expand Down Expand Up @@ -179,6 +207,21 @@ impl StreamReaderGroup {
}
}

/// Return the latest StreamCut from ReaderGroup.
/// Use this StreamCut in the ReaderGroupConfig to initiate reading from this streamcut.
pub fn get_streamcut(&self) -> PyResult<StreamCuts> {

let streamcut = self
.runtime_handle
.block_on(self.reader_group.get_streamcut());
info!(
"Got streamcut {:?} ", streamcut
);
Ok(StreamCuts {
stream_cuts: streamcut
})
}

/// Returns the string representation.
fn to_str(&self) -> String {
format!(
Expand Down
1 change: 1 addition & 0 deletions tests/pravega_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
import time

import pravega_client
import random
Expand Down

0 comments on commit 14a02ff

Please sign in to comment.