Skip to content

Commit

Permalink
chore: introduce enum for sink response type (#115)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Mar 10, 2025
1 parent c68963a commit 5925071
Show file tree
Hide file tree
Showing 30 changed files with 174 additions and 171 deletions.
1 change: 1 addition & 0 deletions examples/batchmap-cat/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
2 changes: 1 addition & 1 deletion examples/batchmap-flatmap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl batchmap::BatchMapper for Flatmap {
// Split the string by ","
let parts: Vec<&str> = s.split(',').collect();

// return the resulting parts
// return the resulting parts as a Vec<Message>
for part in parts {
response.append(Message::new(Vec::from(part)).with_keys(datum.keys.clone()));
}
Expand Down
1 change: 1 addition & 0 deletions examples/flatmap-stream/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
9 changes: 6 additions & 3 deletions examples/flatmap-stream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ struct Cat;
#[tonic::async_trait]
impl mapstream::MapStreamer for Cat {
async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender<Message>) {
for i in 0..2 {
let message = Message::new(input.value.clone())
.with_keys(vec![format!("key-{}", i)])
let payload_str = String::from_utf8(input.value).unwrap_or_default();
let splits: Vec<&str> = payload_str.split(',').collect();

for split in splits {
let message = Message::new(split.as_bytes().to_vec())
.with_keys(input.keys.clone())
.with_tags(vec![]);
if tx.send(message).await.is_err() {
break;
Expand Down
1 change: 1 addition & 0 deletions examples/map-cat/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/map-tickgen-serde/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/map-tickgen-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.103", features = ["derive"] }
serde_json = "1.0.103"
numaflow = { path = "../../numaflow" }
chrono = "0.4.26"
8 changes: 5 additions & 3 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{SecondsFormat, TimeZone, Utc};
use numaflow::map;
use numaflow::map::Message;
use serde::Serialize;
use std::time::UNIX_EPOCH;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand Down Expand Up @@ -35,11 +35,13 @@ impl map::Mapper for TickGen {
let Ok(payload) = serde_json::from_slice::<Payload>(&input.value) else {
return vec![];
};
let ts = UNIX_EPOCH + std::time::Duration::from_nanos(payload.created_ts as u64);
let ts = Utc
.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true);
let message = map::Message::new(
serde_json::to_vec(&ResultPayload {
value: payload.data.value,
time: format!("{:?}", ts),
time: ts,
})
.unwrap_or_default(),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/mapt-event-time-filter/.dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
target/
target/
1 change: 1 addition & 0 deletions examples/mapt-event-time-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ edition = "2021"
numaflow = { path = "../../numaflow" }
tokio = "1.38.0"
tonic = "0.12.0"
chrono = "0.4.38"
23 changes: 11 additions & 12 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ impl sourcetransform::SourceTransformer for EventTimeFilter {
}

mod filter_impl {
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::time::{Duration, UNIX_EPOCH};

/// Filters messages based on their event time.
/// Returns different types of messages depending on the event time comparison.
pub fn filter_event_time(input: SourceTransformRequest) -> Vec<Message> {
let jan_first_2022 = UNIX_EPOCH + Duration::new(1_640_995_200, 0); // 2022-01-01 00:00:00 UTC
let jan_first_2023 = UNIX_EPOCH + Duration::new(1_672_348_800, 0); // 2023-01-01 00:00:00 UTC
let jan_first_2022 = Utc.with_ymd_and_hms(2022, 1, 1, 0, 0, 0).unwrap();
let jan_first_2023 = Utc.with_ymd_and_hms(2023, 1, 1, 0, 0, 0).unwrap();

if input.eventtime < jan_first_2022 {
vec![Message::message_to_drop(input.eventtime)]
Expand All @@ -43,19 +43,18 @@ mod filter_impl {

#[cfg(test)]
mod tests {
use numaflow::sourcetransform::SourceTransformRequest;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crate::filter_impl::filter_event_time;
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::SourceTransformRequest;

/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
let time = UNIX_EPOCH + Duration::new(1_656_732_000, 0); // 2022-07-02 02:00:00 UTC
let time = Utc.with_ymd_and_hms(2022, 7, 2, 2, 0, 0).unwrap();
let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: SystemTime::now(),
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};
Expand All @@ -69,11 +68,11 @@ mod tests {
/// Tests that events from 2023 are tagged as after the year 2022.
#[test]
fn test_filter_event_time_should_return_after_year_2022() {
let time = UNIX_EPOCH + Duration::new(1_682_348_800, 0); // 2023-07-02 02:00:00 UTC
let time = Utc.with_ymd_and_hms(2023, 7, 2, 2, 0, 0).unwrap();
let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: SystemTime::now(),
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};
Expand All @@ -87,11 +86,11 @@ mod tests {
/// Tests that events before 2022 are dropped.
#[test]
fn test_filter_event_time_should_drop() {
let time = UNIX_EPOCH + Duration::new(1_594_732_000, 0); // 2021-07-02 02:00:00 UTC
let time = Utc.with_ymd_and_hms(2021, 7, 2, 2, 0, 0).unwrap();
let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: SystemTime::now(),
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};
Expand Down
1 change: 1 addition & 0 deletions examples/reduce-counter/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/sideinput/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/simple-source/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/simple-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ edition = "2021"
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
chrono = "0.4.38"
10 changes: 3 additions & 7 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

Expand Down Expand Up @@ -38,14 +38,10 @@ pub(crate) mod simple_source {
return;
}

let event_time = SystemTime::now();
let event_time = Utc::now();
let mut message_offsets = Vec::with_capacity(request.count);
for i in 0..request.count {
let offset = format!(
"{}-{}",
event_time.duration_since(UNIX_EPOCH).unwrap().as_nanos(),
i
);
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
let payload = self.counter.fetch_add(1, Ordering::Relaxed).to_string();
transmitter
.send(Message {
Expand Down
1 change: 1 addition & 0 deletions examples/sink-log/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/source-transformer-now/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions examples/source-transformer-now/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ edition = "2021"
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
chrono = "0.4.40"
3 changes: 1 addition & 2 deletions examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use numaflow::sourcetransform;
use std::time::SystemTime;

/// A simple source transformer which assigns event time to the current time in utc.
Expand All @@ -17,7 +16,7 @@ impl sourcetransform::SourceTransformer for NowCat {
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
vec![
sourcetransform::Message::new(input.value, SystemTime::now())
sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.with_keys(input.keys.clone()),
]
}
Expand Down
1 change: 1 addition & 0 deletions numaflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tracing = "0.1.40"
uuid = { version = "1.10.0", features = ["v4"] }
thiserror = "2.0.12"
hyper-util = "0.1.7"
chrono = "0.4.38"

[build-dependencies]
tonic-build = "0.12.3"
Expand Down
14 changes: 5 additions & 9 deletions numaflow/src/batchmap.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -83,9 +83,9 @@ pub struct Datum {
/// The value in the (key, value) terminology of map/reduce paradigm.
pub value: Vec<u8>,
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
pub watermark: SystemTime,
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub event_time: SystemTime,
pub event_time: DateTime<Utc>,
/// ID is the unique id of the message to be sent to the Batch Map.
pub id: String,
/// Headers for the message.
Expand All @@ -103,12 +103,8 @@ impl TryFrom<MapRequest> for Datum {
Ok(Self {
keys: request.keys,
value: request.value,
watermark: shared::prost_timestamp_to_system_time(
request.watermark.unwrap_or_default(),
),
event_time: shared::prost_timestamp_to_system_time(
request.event_time.unwrap_or_default(),
),
watermark: shared::utc_from_timestamp(request.watermark),
event_time: shared::utc_from_timestamp(request.event_time),
id: sr.id,
headers: request.headers,
})
Expand Down
10 changes: 5 additions & 5 deletions numaflow/src/map.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -163,9 +163,9 @@ pub struct MapRequest {
/// The value in the (key, value) terminology of map/reduce paradigm.
pub value: Vec<u8>,
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
pub watermark: SystemTime,
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: SystemTime,
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}
Expand All @@ -175,8 +175,8 @@ impl From<proto::map_request::Request> for MapRequest {
Self {
keys: value.keys,
value: value.value,
watermark: shared::prost_timestamp_to_system_time(value.watermark.unwrap_or_default()),
eventtime: shared::prost_timestamp_to_system_time(value.event_time.unwrap_or_default()),
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
headers: value.headers,
}
}
Expand Down
10 changes: 5 additions & 5 deletions numaflow/src/mapstream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -167,9 +167,9 @@ pub struct MapStreamRequest {
/// The value in the (key, value) terminology of map/reduce paradigm.
pub value: Vec<u8>,
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
pub watermark: SystemTime,
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: SystemTime,
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}
Expand All @@ -179,8 +179,8 @@ impl From<proto::map_request::Request> for MapStreamRequest {
Self {
keys: value.keys,
value: value.value,
watermark: shared::prost_timestamp_to_system_time(value.watermark.unwrap_or_default()),
eventtime: shared::prost_timestamp_to_system_time(value.event_time.unwrap_or_default()),
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
headers: value.headers,
}
}
Expand Down
Loading

0 comments on commit 5925071

Please sign in to comment.