From f64ffed7209210b2d9b4ce9d14939561966c825f Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Mon, 19 Jan 2026 10:47:59 -0800 Subject: [PATCH 1/3] feat: add metadata (system/user) propagation Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/src/map/mod.rs | 221 +++++++++++++++++- packages/pynumaflow-lite/src/sink/mod.rs | 157 ++++++++++++- packages/pynumaflow-lite/src/source/mod.rs | 112 ++++++++- .../src/sourcetransform/mod.rs | 217 ++++++++++++++++- 4 files changed, 679 insertions(+), 28 deletions(-) diff --git a/packages/pynumaflow-lite/src/map/mod.rs b/packages/pynumaflow-lite/src/map/mod.rs index 5dd2e638..4609d1d4 100644 --- a/packages/pynumaflow-lite/src/map/mod.rs +++ b/packages/pynumaflow-lite/src/map/mod.rs @@ -11,6 +11,174 @@ pub mod server; use pyo3::prelude::*; use std::sync::Mutex; +/// SystemMetadata wraps system-generated metadata groups per message. +/// It is read-only to UDFs. +#[pyclass(module = "pynumaflow_lite.mapper")] +#[derive(Clone, Default, Debug)] +pub struct SystemMetadata { + data: HashMap>>, +} + +#[pymethods] +impl SystemMetadata { + #[new] + #[pyo3(signature = () -> "SystemMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the system metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the system metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the system metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + fn __repr__(&self) -> String { + format!("SystemMetadata(groups={:?})", self.groups()) + } +} + +impl From for SystemMetadata { + fn from(value: map::SystemMetadata) -> Self { + let mut data = HashMap::new(); + for group in value.groups() { + let mut kv = HashMap::new(); + for key in value.keys(&group) { + kv.insert(key.clone(), value.value(&group, &key)); + } + data.insert(group, kv); + } + Self { data } + } +} + +/// UserMetadata wraps user-defined metadata groups per message. +/// Users can read and write to this metadata. +#[pyclass(module = "pynumaflow_lite.mapper")] +#[derive(Clone, Default, Debug)] +pub struct UserMetadata { + data: HashMap>>, +} + +#[pymethods] +impl UserMetadata { + #[new] + #[pyo3(signature = () -> "UserMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the user metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the user metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the user metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + /// Creates a new group in the user metadata. + /// If the group already exists, this is a no-op. + #[pyo3(signature = (group: "str"))] + fn create_group(&mut self, group: String) { + self.data.entry(group).or_default(); + } + + /// Adds a key-value pair to the user metadata. + /// If the group is not present, it creates a new group. + #[pyo3(signature = (group: "str", key: "str", value: "bytes"))] + fn add_kv(&mut self, group: String, key: String, value: Vec) { + self.data.entry(group).or_default().insert(key, value); + } + + /// Removes a key from a group in the user metadata. + /// If the key or group is not present, it's a no-op. + #[pyo3(signature = (group: "str", key: "str"))] + fn remove_key(&mut self, group: &str, key: &str) { + if let Some(kv) = self.data.get_mut(group) { + kv.remove(key); + } + } + + /// Removes a group from the user metadata. + /// If the group is not present, it's a no-op. + #[pyo3(signature = (group: "str"))] + fn remove_group(&mut self, group: &str) { + self.data.remove(group); + } + + fn __repr__(&self) -> String { + format!("UserMetadata(groups={:?})", self.groups()) + } +} + +impl From for UserMetadata { + fn from(value: map::UserMetadata) -> Self { + let mut data = HashMap::new(); + for group in value.groups() { + let mut kv = HashMap::new(); + for key in value.keys(&group) { + kv.insert(key.clone(), value.value(&group, &key)); + } + data.insert(group, kv); + } + Self { data } + } +} + +impl From for map::UserMetadata { + fn from(value: UserMetadata) -> Self { + let mut user_metadata = map::UserMetadata::new(); + for (group, kv_map) in value.data { + for (key, val) in kv_map { + user_metadata.add_kv(group.clone(), key, val); + } + } + user_metadata + } +} + /// A collection of [Message]s. #[pyclass(module = "pynumaflow_lite.mapper")] #[derive(Clone, Debug)] @@ -52,16 +220,28 @@ pub struct Message { pub value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). pub tags: Option>, + /// User metadata for the message. + pub user_metadata: Option, } #[pymethods] impl Message { - /// Create a new [Message] with the given value, keys, and tags. + /// Create a new [Message] with the given value, keys, tags, and user_metadata. #[new] - #[pyo3(signature = (value: "bytes", keys: "list[str] | None"=None, tags: "list[str] | None"=None) -> "Message" + #[pyo3(signature = (value: "bytes", keys: "list[str] | None"=None, tags: "list[str] | None"=None, user_metadata: "UserMetadata | None"=None) -> "Message" )] - fn new(value: Vec, keys: Option>, tags: Option>) -> Self { - Self { keys, value, tags } + fn new( + value: Vec, + keys: Option>, + tags: Option>, + user_metadata: Option, + ) -> Self { + Self { + keys, + value, + tags, + user_metadata, + } } /// Drop a [Message], do not forward to the next vertex. @@ -72,6 +252,7 @@ impl Message { keys: None, value: vec![], tags: Some(vec![numaflow::shared::DROP.to_string()]), + user_metadata: None, } } } @@ -82,7 +263,7 @@ impl From for map::Message { keys: value.keys, value: value.value, tags: value.tags, - user_metadata: None, + user_metadata: value.user_metadata.map(|m| m.into()), } } } @@ -106,6 +287,12 @@ pub struct Datum { /// Headers for the message. #[pyo3(get)] pub headers: HashMap, + /// User metadata for the message. + #[pyo3(get)] + pub user_metadata: UserMetadata, + /// System metadata for the message. + #[pyo3(get)] + pub system_metadata: SystemMetadata, } impl Datum { @@ -115,6 +302,8 @@ impl Datum { watermark: DateTime, eventtime: DateTime, headers: HashMap, + user_metadata: UserMetadata, + system_metadata: SystemMetadata, ) -> Self { Self { keys, @@ -122,24 +311,34 @@ impl Datum { watermark, eventtime, headers, + user_metadata, + system_metadata, } } fn __repr__(&self) -> String { format!( - "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, headers={:?})", - self.keys, self.value, self.watermark, self.eventtime, self.headers + "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, headers={:?}, user_metadata={:?}, system_metadata={:?})", + self.keys, + self.value, + self.watermark, + self.eventtime, + self.headers, + self.user_metadata, + self.system_metadata ) } fn __str__(&self) -> String { format!( - "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, headers={:?})", + "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, headers={:?}, user_metadata={:?}, system_metadata={:?})", self.keys, String::from_utf8_lossy(&self.value), self.watermark, self.eventtime, - self.headers + self.headers, + self.user_metadata, + self.system_metadata ) } } @@ -152,6 +351,8 @@ impl From for Datum { value.watermark, value.eventtime, value.headers, + value.user_metadata.into(), + value.system_metadata.into(), ) } } @@ -208,6 +409,8 @@ impl MapAsyncServer { /// Helper to populate a PyModule with map types/functions. pub(crate) fn populate_py_module(m: &Bound) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/packages/pynumaflow-lite/src/sink/mod.rs b/packages/pynumaflow-lite/src/sink/mod.rs index 15087090..6ccf5fb0 100644 --- a/packages/pynumaflow-lite/src/sink/mod.rs +++ b/packages/pynumaflow-lite/src/sink/mod.rs @@ -13,6 +13,132 @@ use tokio::sync::mpsc; use pyo3::prelude::*; use std::sync::Mutex; +/// SystemMetadata wraps system-generated metadata groups per message. +/// Since sink is the last vertex in the pipeline, only GET methods are available. +#[pyclass(module = "pynumaflow_lite.sinker")] +#[derive(Clone, Default, Debug)] +pub struct SystemMetadata { + data: HashMap>>, +} + +#[pymethods] +impl SystemMetadata { + #[new] + #[pyo3(signature = () -> "SystemMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the system metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the system metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the system metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + fn __repr__(&self) -> String { + format!("SystemMetadata(groups={:?})", self.groups()) + } +} + +impl From for SystemMetadata { + fn from(value: sink::SystemMetadata) -> Self { + let mut data = HashMap::new(); + for group in value.groups() { + let mut kv = HashMap::new(); + for key in value.keys(&group) { + kv.insert(key.clone(), value.value(&group, &key)); + } + data.insert(group, kv); + } + Self { data } + } +} + +/// UserMetadata wraps user-defined metadata groups per message. +/// Since sink is the last vertex in the pipeline, only GET methods are available. +#[pyclass(module = "pynumaflow_lite.sinker")] +#[derive(Clone, Default, Debug)] +pub struct UserMetadata { + data: HashMap>>, +} + +#[pymethods] +impl UserMetadata { + #[new] + #[pyo3(signature = () -> "UserMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the user metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the user metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the user metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + fn __repr__(&self) -> String { + format!("UserMetadata(groups={:?})", self.groups()) + } +} + +impl From for UserMetadata { + fn from(value: sink::UserMetadata) -> Self { + let mut data = HashMap::new(); + for group in value.groups() { + let mut kv = HashMap::new(); + for key in value.keys(&group) { + kv.insert(key.clone(), value.value(&group, &key)); + } + data.insert(group, kv); + } + Self { data } + } +} + /// KeyValueGroup represents a group of key-value pairs for user metadata. #[pyclass(module = "pynumaflow_lite.sinker")] #[derive(Clone, Default, Debug)] @@ -248,6 +374,12 @@ pub struct Datum { /// Headers for the message. #[pyo3(get)] pub headers: HashMap, + /// User metadata for the message. + #[pyo3(get)] + pub user_metadata: UserMetadata, + /// System metadata for the message. + #[pyo3(get)] + pub system_metadata: SystemMetadata, } impl Datum { @@ -258,6 +390,8 @@ impl Datum { eventtime: DateTime, id: String, headers: HashMap, + user_metadata: UserMetadata, + system_metadata: SystemMetadata, ) -> Self { Self { keys, @@ -266,25 +400,36 @@ impl Datum { eventtime, id, headers, + user_metadata, + system_metadata, } } fn __repr__(&self) -> String { format!( - "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, id={}, headers={:?})", - self.keys, self.value, self.watermark, self.eventtime, self.id, self.headers + "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, id={}, headers={:?}, user_metadata={:?}, system_metadata={:?})", + self.keys, + self.value, + self.watermark, + self.eventtime, + self.id, + self.headers, + self.user_metadata, + self.system_metadata ) } fn __str__(&self) -> String { format!( - "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, id={}, headers={:?})", + "Datum(keys={:?}, value={:?}, watermark={}, eventtime={}, id={}, headers={:?}, user_metadata={:?}, system_metadata={:?})", self.keys, String::from_utf8_lossy(&self.value), self.watermark, self.eventtime, self.id, - self.headers + self.headers, + self.user_metadata, + self.system_metadata ) } } @@ -298,6 +443,8 @@ impl From for Datum { value.event_time, value.id, value.headers, + value.user_metadata.into(), + value.system_metadata.into(), ) } } @@ -388,6 +535,8 @@ impl SinkAsyncServer { /// Helper to populate a PyModule with sink types/functions. pub(crate) fn populate_py_module(m: &Bound) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/packages/pynumaflow-lite/src/source/mod.rs b/packages/pynumaflow-lite/src/source/mod.rs index a7da35f7..38188ac0 100644 --- a/packages/pynumaflow-lite/src/source/mod.rs +++ b/packages/pynumaflow-lite/src/source/mod.rs @@ -9,6 +9,98 @@ pub mod server; use pyo3::prelude::*; use std::sync::Mutex; +/// UserMetadata wraps user-defined metadata groups per message. +/// Source is the origin or the first vertex in the pipeline. +/// Here, for the first time, the user metadata can be set by the user. +#[pyclass(module = "pynumaflow_lite.sourcer")] +#[derive(Clone, Default, Debug)] +pub struct UserMetadata { + data: HashMap>>, +} + +#[pymethods] +impl UserMetadata { + #[new] + #[pyo3(signature = () -> "UserMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the user metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the user metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the user metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + /// Creates a new group in the user metadata. + /// If the group already exists, this is a no-op. + #[pyo3(signature = (group: "str"))] + fn create_group(&mut self, group: String) { + self.data.entry(group).or_default(); + } + + /// Adds a key-value pair to the user metadata. + /// If the group is not present, it creates a new group. + #[pyo3(signature = (group: "str", key: "str", value: "bytes"))] + fn add_kv(&mut self, group: String, key: String, value: Vec) { + self.data.entry(group).or_default().insert(key, value); + } + + /// Removes a key from a group in the user metadata. + /// If the key or group is not present, it's a no-op. + #[pyo3(signature = (group: "str", key: "str"))] + fn remove_key(&mut self, group: &str, key: &str) { + if let Some(kv) = self.data.get_mut(group) { + kv.remove(key); + } + } + + /// Removes a group from the user metadata. + /// If the group is not present, it's a no-op. + #[pyo3(signature = (group: "str"))] + fn remove_group(&mut self, group: &str) { + self.data.remove(group); + } + + fn __repr__(&self) -> String { + format!("UserMetadata(groups={:?})", self.groups()) + } +} + +impl From for numaflow::source::UserMetadata { + fn from(value: UserMetadata) -> Self { + let mut user_metadata = numaflow::source::UserMetadata::new(); + for (group, kv_map) in value.data { + for (key, val) in kv_map { + user_metadata.add_kv(group.clone(), key, val); + } + } + user_metadata + } +} + /// A message to be sent from the source. #[pyclass(module = "pynumaflow_lite.sourcer")] #[derive(Clone, Debug)] @@ -27,13 +119,15 @@ pub struct Message { /// Headers of the message. #[pyo3(get)] pub headers: HashMap, + /// User metadata for the message. + pub user_metadata: Option, } #[pymethods] impl Message { - /// Create a new [Message] with the given payload, offset, event_time, keys, and headers. + /// Create a new [Message] with the given payload, offset, event_time, keys, headers, and user_metadata. #[new] - #[pyo3(signature = (payload: "bytes", offset: "Offset", event_time: "datetime", keys: "list[str] | None"=None, headers: "dict[str, str] | None"=None) -> "Message" + #[pyo3(signature = (payload: "bytes", offset: "Offset", event_time: "datetime", keys: "list[str] | None"=None, headers: "dict[str, str] | None"=None, user_metadata: "UserMetadata | None"=None) -> "Message" )] fn new( payload: Vec, @@ -41,6 +135,7 @@ impl Message { event_time: DateTime, keys: Option>, headers: Option>, + user_metadata: Option, ) -> Self { Self { payload, @@ -48,6 +143,7 @@ impl Message { event_time, keys: keys.unwrap_or_default(), headers: headers.unwrap_or_default(), + user_metadata, } } @@ -59,19 +155,20 @@ impl Message { fn __repr__(&self) -> String { format!( - "Message(payload={:?}, offset={:?}, event_time={}, keys={:?}, headers={:?})", - self.payload, self.offset, self.event_time, self.keys, self.headers + "Message(payload={:?}, offset={:?}, event_time={}, keys={:?}, headers={:?}, user_metadata={:?})", + self.payload, self.offset, self.event_time, self.keys, self.headers, self.user_metadata ) } fn __str__(&self) -> String { format!( - "Message(payload={:?}, offset={:?}, event_time={}, keys={:?}, headers={:?})", + "Message(payload={:?}, offset={:?}, event_time={}, keys={:?}, headers={:?}, user_metadata={:?})", String::from_utf8_lossy(&self.payload), self.offset, self.event_time, self.keys, - self.headers + self.headers, + self.user_metadata ) } } @@ -84,7 +181,7 @@ impl From for numaflow::source::Message { event_time: value.event_time, keys: value.keys, headers: value.headers, - user_metadata: None, + user_metadata: value.user_metadata.map(|m| m.into()), } } } @@ -332,6 +429,7 @@ impl SourceAsyncServer { /// Helper to populate a PyModule with source types/functions. pub(crate) fn populate_py_module(m: &Bound) -> PyResult<()> { + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/packages/pynumaflow-lite/src/sourcetransform/mod.rs b/packages/pynumaflow-lite/src/sourcetransform/mod.rs index 9730ee8f..d1154578 100644 --- a/packages/pynumaflow-lite/src/sourcetransform/mod.rs +++ b/packages/pynumaflow-lite/src/sourcetransform/mod.rs @@ -11,6 +11,174 @@ pub mod server; use pyo3::prelude::*; use std::sync::Mutex; +/// SystemMetadata wraps system-generated metadata groups per message. +/// It is read-only to UDFs. +#[pyclass(module = "pynumaflow_lite.sourcetransformer")] +#[derive(Clone, Default, Debug)] +pub struct SystemMetadata { + data: HashMap>>, +} + +#[pymethods] +impl SystemMetadata { + #[new] + #[pyo3(signature = () -> "SystemMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the system metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the system metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the system metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + fn __repr__(&self) -> String { + format!("SystemMetadata(groups={:?})", self.groups()) + } +} + +impl From for SystemMetadata { + fn from(value: sourcetransform::SystemMetadata) -> Self { + let mut data = HashMap::new(); + for group in value.groups() { + let mut kv = HashMap::new(); + for key in value.keys(&group) { + kv.insert(key.clone(), value.value(&group, &key)); + } + data.insert(group, kv); + } + Self { data } + } +} + +/// UserMetadata wraps user-defined metadata groups per message. +/// Users can read and write to this metadata. +#[pyclass(module = "pynumaflow_lite.sourcetransformer")] +#[derive(Clone, Default, Debug)] +pub struct UserMetadata { + data: HashMap>>, +} + +#[pymethods] +impl UserMetadata { + #[new] + #[pyo3(signature = () -> "UserMetadata")] + fn new() -> Self { + Self::default() + } + + /// Returns the groups of the user metadata. + /// If there are no groups, it returns an empty list. + #[pyo3(signature = () -> "list[str]")] + fn groups(&self) -> Vec { + self.data.keys().cloned().collect() + } + + /// Returns the keys of the user metadata for the given group. + /// If there are no keys or the group is not present, it returns an empty list. + #[pyo3(signature = (group: "str") -> "list[str]")] + fn keys(&self, group: &str) -> Vec { + self.data + .get(group) + .map(|kv| kv.keys().cloned().collect()) + .unwrap_or_default() + } + + /// Returns the value of the user metadata for the given group and key. + /// If there is no value or the group or key is not present, it returns an empty bytes. + #[pyo3(signature = (group: "str", key: "str") -> "bytes")] + fn value(&self, group: &str, key: &str) -> Vec { + self.data + .get(group) + .and_then(|kv| kv.get(key)) + .cloned() + .unwrap_or_default() + } + + /// Creates a new group in the user metadata. + /// If the group already exists, this is a no-op. + #[pyo3(signature = (group: "str"))] + fn create_group(&mut self, group: String) { + self.data.entry(group).or_default(); + } + + /// Adds a key-value pair to the user metadata. + /// If the group is not present, it creates a new group. + #[pyo3(signature = (group: "str", key: "str", value: "bytes"))] + fn add_kv(&mut self, group: String, key: String, value: Vec) { + self.data.entry(group).or_default().insert(key, value); + } + + /// Removes a key from a group in the user metadata. + /// If the key or group is not present, it's a no-op. + #[pyo3(signature = (group: "str", key: "str"))] + fn remove_key(&mut self, group: &str, key: &str) { + if let Some(kv) = self.data.get_mut(group) { + kv.remove(key); + } + } + + /// Removes a group from the user metadata. + /// If the group is not present, it's a no-op. + #[pyo3(signature = (group: "str"))] + fn remove_group(&mut self, group: &str) { + self.data.remove(group); + } + + fn __repr__(&self) -> String { + format!("UserMetadata(groups={:?})", self.groups()) + } +} + +impl From for UserMetadata { + fn from(value: sourcetransform::UserMetadata) -> Self { + let mut data = HashMap::new(); + for group in value.groups() { + let mut kv = HashMap::new(); + for key in value.keys(&group) { + kv.insert(key.clone(), value.value(&group, &key)); + } + data.insert(group, kv); + } + Self { data } + } +} + +impl From for sourcetransform::UserMetadata { + fn from(value: UserMetadata) -> Self { + let mut user_metadata = sourcetransform::UserMetadata::new(); + for (group, kv_map) in value.data { + for (key, val) in kv_map { + user_metadata.add_kv(group.clone(), key, val); + } + } + user_metadata + } +} + /// A collection of [Message]s. #[pyclass(module = "pynumaflow_lite.sourcetransformer")] #[derive(Clone, Debug)] @@ -54,25 +222,29 @@ pub struct Message { pub event_time: DateTime, /// Tags are used for conditional forwarding. pub tags: Option>, + /// User metadata for the message. + pub user_metadata: Option, } #[pymethods] impl Message { - /// Create a new [Message] with the given value, event_time, keys, and tags. + /// Create a new [Message] with the given value, event_time, keys, tags, and user_metadata. #[new] - #[pyo3(signature = (value: "bytes", event_time: "datetime.datetime", keys: "list[str] | None"=None, tags: "list[str] | None"=None) -> "Message" + #[pyo3(signature = (value: "bytes", event_time: "datetime.datetime", keys: "list[str] | None"=None, tags: "list[str] | None"=None, user_metadata: "UserMetadata | None"=None) -> "Message" )] fn new( value: Vec, event_time: DateTime, keys: Option>, tags: Option>, + user_metadata: Option, ) -> Self { Self { keys, value, event_time, tags, + user_metadata, } } @@ -87,15 +259,22 @@ impl Message { value: vec![], event_time, tags: Some(vec![numaflow::shared::DROP.to_string()]), + user_metadata: None, } } } impl From for sourcetransform::Message { fn from(value: Message) -> Self { - Self::new(value.value, value.event_time) + let mut msg = Self::new(value.value, value.event_time) .with_keys(value.keys.unwrap_or_default()) - .with_tags(value.tags.unwrap_or_default()) + .with_tags(value.tags.unwrap_or_default()); + + if let Some(user_metadata) = value.user_metadata { + msg = msg.with_user_metadata(user_metadata.into()); + } + + msg } } @@ -117,6 +296,12 @@ pub struct Datum { /// Headers for the message. #[pyo3(get)] pub headers: HashMap, + /// User metadata for the message. + #[pyo3(get)] + pub user_metadata: UserMetadata, + /// System metadata for the message. + #[pyo3(get)] + pub system_metadata: SystemMetadata, } impl Datum { @@ -126,6 +311,8 @@ impl Datum { watermark: DateTime, event_time: DateTime, headers: HashMap, + user_metadata: UserMetadata, + system_metadata: SystemMetadata, ) -> Self { Self { keys, @@ -133,24 +320,34 @@ impl Datum { watermark, event_time, headers, + user_metadata, + system_metadata, } } fn __repr__(&self) -> String { format!( - "Datum(keys={:?}, value={:?}, watermark={}, event_time={}, headers={:?})", - self.keys, self.value, self.watermark, self.event_time, self.headers + "Datum(keys={:?}, value={:?}, watermark={}, event_time={}, headers={:?}, user_metadata={:?}, system_metadata={:?})", + self.keys, + self.value, + self.watermark, + self.event_time, + self.headers, + self.user_metadata, + self.system_metadata ) } fn __str__(&self) -> String { format!( - "Datum(keys={:?}, value={:?}, watermark={}, event_time={}, headers={:?})", + "Datum(keys={:?}, value={:?}, watermark={}, event_time={}, headers={:?}, user_metadata={:?}, system_metadata={:?})", self.keys, String::from_utf8_lossy(&self.value), self.watermark, self.event_time, - self.headers + self.headers, + self.user_metadata, + self.system_metadata ) } } @@ -163,6 +360,8 @@ impl From for Datum { value.watermark, value.eventtime, value.headers, + value.user_metadata.into(), + value.system_metadata.into(), ) } } @@ -219,6 +418,8 @@ impl SourceTransformAsyncServer { /// Helper to populate a PyModule with sourcetransform types/functions. pub(crate) fn populate_py_module(m: &Bound) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From 20f21136143dc6e71252ec38c16c68df11c2eeb0 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Mon, 19 Jan 2026 10:48:30 -0800 Subject: [PATCH 2/3] feat: type hints for metadata Signed-off-by: Vigith Maurice --- .../pynumaflow_lite/mapper.pyi | 62 +++++++++++++++++++ .../pynumaflow_lite/sinker.pyi | 44 +++++++++++++ .../pynumaflow_lite/sourcer.pyi | 39 ++++++++++++ .../pynumaflow_lite/sourcetransformer.pyi | 62 +++++++++++++++++++ 4 files changed, 207 insertions(+) diff --git a/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi b/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi index 3d9e2977..aa5635e9 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi @@ -7,6 +7,62 @@ import datetime as _dt from ._map_dtypes import Mapper as Mapper +class SystemMetadata: + """System-generated metadata groups per message (read-only).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the system metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the system metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the system metadata for the given group and key.""" + ... + + def __repr__(self) -> str: ... + + +class UserMetadata: + """User-defined metadata groups per message (read-write).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the user metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the user metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the user metadata for the given group and key.""" + ... + + def create_group(self, group: str) -> None: + """Creates a new group in the user metadata.""" + ... + + def add_kv(self, group: str, key: str, value: bytes) -> None: + """Adds a key-value pair to the user metadata.""" + ... + + def remove_key(self, group: str, key: str) -> None: + """Removes a key from a group in the user metadata.""" + ... + + def remove_group(self, group: str) -> None: + """Removes a group from the user metadata.""" + ... + + def __repr__(self) -> str: ... + + class Messages: def __init__(self) -> None: ... @@ -21,12 +77,14 @@ class Message: keys: Optional[List[str]] value: bytes tags: Optional[List[str]] + user_metadata: Optional[UserMetadata] def __init__( self, value: bytes, keys: Optional[List[str]] = ..., tags: Optional[List[str]] = ..., + user_metadata: Optional[UserMetadata] = ..., ) -> None: ... @staticmethod @@ -40,6 +98,8 @@ class Datum: watermark: _dt.datetime eventtime: _dt.datetime headers: Dict[str, str] + user_metadata: UserMetadata + system_metadata: SystemMetadata def __repr__(self) -> str: ... @@ -62,6 +122,8 @@ class MapAsyncServer: __all__ = [ + "SystemMetadata", + "UserMetadata", "Messages", "Message", "Datum", diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi index 1bb5b9d9..2a866337 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi @@ -4,6 +4,46 @@ from typing import Optional, List, Dict, Callable, Awaitable, Any, AsyncIterator import datetime as _dt +class SystemMetadata: + """System-generated metadata groups per message (read-only for sink).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the system metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the system metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the system metadata for the given group and key.""" + ... + + def __repr__(self) -> str: ... + + +class UserMetadata: + """User-defined metadata groups per message (read-only for sink).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the user metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the user metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the user metadata for the given group and key.""" + ... + + def __repr__(self) -> str: ... + + class KeyValueGroup: key_value: Dict[str, bytes] @@ -58,6 +98,8 @@ class Datum: eventtime: _dt.datetime id: str headers: Dict[str, str] + user_metadata: UserMetadata + system_metadata: SystemMetadata def __repr__(self) -> str: ... @@ -81,6 +123,8 @@ class Sinker: __all__ = [ + "SystemMetadata", + "UserMetadata", "KeyValueGroup", "Message", "Response", diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi index 1b0b1e84..f1f89b4e 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi @@ -7,6 +7,42 @@ import datetime as _dt from ._source_dtypes import Sourcer as Sourcer +class UserMetadata: + """User-defined metadata groups per message (read-write for source).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the user metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the user metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the user metadata for the given group and key.""" + ... + + def create_group(self, group: str) -> None: + """Creates a new group in the user metadata.""" + ... + + def add_kv(self, group: str, key: str, value: bytes) -> None: + """Adds a key-value pair to the user metadata.""" + ... + + def remove_key(self, group: str, key: str) -> None: + """Removes a key from a group in the user metadata.""" + ... + + def remove_group(self, group: str) -> None: + """Removes a group from the user metadata.""" + ... + + def __repr__(self) -> str: ... + + class Message: """A message to be sent from the source.""" payload: bytes @@ -14,6 +50,7 @@ class Message: event_time: _dt.datetime keys: List[str] headers: Dict[str, str] + user_metadata: Optional[UserMetadata] def __init__( self, @@ -22,6 +59,7 @@ class Message: event_time: _dt.datetime, keys: Optional[List[str]] = ..., headers: Optional[Dict[str, str]] = ..., + user_metadata: Optional[UserMetadata] = ..., ) -> None: ... def __repr__(self) -> str: ... @@ -122,6 +160,7 @@ class SourceAsyncServer: __all__ = [ + "UserMetadata", "Message", "Offset", "ReadRequest", diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi index 9d7d368c..70804128 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi @@ -7,6 +7,62 @@ import datetime as _dt from ._sourcetransformer_dtypes import SourceTransformer as SourceTransformer +class SystemMetadata: + """System-generated metadata groups per message (read-only).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the system metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the system metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the system metadata for the given group and key.""" + ... + + def __repr__(self) -> str: ... + + +class UserMetadata: + """User-defined metadata groups per message (read-write).""" + + def __init__(self) -> None: ... + + def groups(self) -> List[str]: + """Returns the groups of the user metadata.""" + ... + + def keys(self, group: str) -> List[str]: + """Returns the keys of the user metadata for the given group.""" + ... + + def value(self, group: str, key: str) -> bytes: + """Returns the value of the user metadata for the given group and key.""" + ... + + def create_group(self, group: str) -> None: + """Creates a new group in the user metadata.""" + ... + + def add_kv(self, group: str, key: str, value: bytes) -> None: + """Adds a key-value pair to the user metadata.""" + ... + + def remove_key(self, group: str, key: str) -> None: + """Removes a key from a group in the user metadata.""" + ... + + def remove_group(self, group: str) -> None: + """Removes a group from the user metadata.""" + ... + + def __repr__(self) -> str: ... + + class Messages: def __init__(self) -> None: ... @@ -22,6 +78,7 @@ class Message: value: bytes event_time: _dt.datetime tags: Optional[List[str]] + user_metadata: Optional[UserMetadata] def __init__( self, @@ -29,6 +86,7 @@ class Message: event_time: _dt.datetime, keys: Optional[List[str]] = ..., tags: Optional[List[str]] = ..., + user_metadata: Optional[UserMetadata] = ..., ) -> None: ... @staticmethod @@ -42,6 +100,8 @@ class Datum: watermark: _dt.datetime event_time: _dt.datetime headers: Dict[str, str] + user_metadata: UserMetadata + system_metadata: SystemMetadata def __repr__(self) -> str: ... @@ -61,6 +121,8 @@ class SourceTransformAsyncServer: __all__ = [ + "SystemMetadata", + "UserMetadata", "Messages", "Message", "Datum", From 02706c26e68dd25fdf3a334b80ffc2584e482bb1 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Mon, 19 Jan 2026 10:48:35 -0800 Subject: [PATCH 3/3] feat(example): add metadata in examples Signed-off-by: Vigith Maurice --- .../pynumaflow-lite/tests/examples/map_cat.py | 22 +++++++++- .../tests/examples/map_cat_class.py | 22 +++++++++- .../tests/examples/sink_log.py | 16 ++++++++ .../tests/examples/sink_log_class.py | 16 ++++++++ .../tests/examples/source_simple.py | 23 +++++++---- .../examples/sourcetransform_event_filter.py | 40 +++++++++++++++++-- 6 files changed, 126 insertions(+), 13 deletions(-) diff --git a/packages/pynumaflow-lite/tests/examples/map_cat.py b/packages/pynumaflow-lite/tests/examples/map_cat.py index 4d4463a3..939a9043 100644 --- a/packages/pynumaflow-lite/tests/examples/map_cat.py +++ b/packages/pynumaflow-lite/tests/examples/map_cat.py @@ -9,10 +9,30 @@ async def async_handler( ) -> mapper.Messages: messages = mapper.Messages() + # Read system metadata (read-only) + print(f"System metadata groups: {payload.system_metadata.groups()}") + for group in payload.system_metadata.groups(): + for key in payload.system_metadata.keys(group): + value = payload.system_metadata.value(group, key) + print(f" System[{group}][{key}] = {value}") + + # Read user metadata (read-only from input) + print(f"User metadata groups: {payload.user_metadata.groups()}") + for group in payload.user_metadata.groups(): + for key in payload.user_metadata.keys(group): + value = payload.user_metadata.value(group, key) + print(f" User[{group}][{key}] = {value}") + if payload.value == b"bad world": messages.append(mapper.Message.message_to_drop()) else: - messages.append(mapper.Message(payload.value, keys)) + # Create user metadata for the outgoing message + user_metadata = mapper.UserMetadata() + user_metadata.create_group("processing") + user_metadata.add_kv("processing", "handler", b"map_cat") + user_metadata.add_kv("processing", "msg_length", str(len(payload.value)).encode()) + + messages.append(mapper.Message(payload.value, keys, user_metadata=user_metadata)) return messages diff --git a/packages/pynumaflow-lite/tests/examples/map_cat_class.py b/packages/pynumaflow-lite/tests/examples/map_cat_class.py index 2d96e086..962a3f79 100644 --- a/packages/pynumaflow-lite/tests/examples/map_cat_class.py +++ b/packages/pynumaflow-lite/tests/examples/map_cat_class.py @@ -10,10 +10,30 @@ async def handler( messages = mapper.Messages() + # Read system metadata (read-only) + print(f"System metadata groups: {payload.system_metadata.groups()}") + for group in payload.system_metadata.groups(): + for key in payload.system_metadata.keys(group): + value = payload.system_metadata.value(group, key) + print(f" System[{group}][{key}] = {value}") + + # Read user metadata (read-only from input) + print(f"User metadata groups: {payload.user_metadata.groups()}") + for group in payload.user_metadata.groups(): + for key in payload.user_metadata.keys(group): + value = payload.user_metadata.value(group, key) + print(f" User[{group}][{key}] = {value}") + if payload.value == b"bad world": messages.append(mapper.Message.message_to_drop()) else: - messages.append(mapper.Message(payload.value, keys)) + # Create user metadata for the outgoing message + user_metadata = mapper.UserMetadata() + user_metadata.create_group("processing") + user_metadata.add_kv("processing", "handler", b"map_cat_class") + user_metadata.add_kv("processing", "msg_length", str(len(payload.value)).encode()) + + messages.append(mapper.Message(payload.value, keys, user_metadata=user_metadata)) return messages diff --git a/packages/pynumaflow-lite/tests/examples/sink_log.py b/packages/pynumaflow-lite/tests/examples/sink_log.py index 6d11382a..b60fb14e 100644 --- a/packages/pynumaflow-lite/tests/examples/sink_log.py +++ b/packages/pynumaflow-lite/tests/examples/sink_log.py @@ -13,10 +13,26 @@ async def async_handler(datums: collections.abc.AsyncIterator[sinker.Datum]) -> sinker.Responses: """ Simple log sink that logs each message and returns success responses. + Also demonstrates reading metadata (read-only for sink). """ responses = sinker.Responses() async for msg in datums: _LOGGER.info("User Defined Sink %s", msg.value.decode("utf-8")) + + # Read system metadata (read-only) + _LOGGER.info("System metadata groups: %s", msg.system_metadata.groups()) + for group in msg.system_metadata.groups(): + for key in msg.system_metadata.keys(group): + value = msg.system_metadata.value(group, key) + _LOGGER.info(" System[%s][%s] = %s", group, key, value) + + # Read user metadata (read-only) + _LOGGER.info("User metadata groups: %s", msg.user_metadata.groups()) + for group in msg.user_metadata.groups(): + for key in msg.user_metadata.keys(group): + value = msg.user_metadata.value(group, key) + _LOGGER.info(" User[%s][%s] = %s", group, key, value) + responses.append(sinker.Response.as_success(msg.id)) # if we are not able to write to sink and if we have a fallback sink configured # we can use Response.as_fallback(msg.id) to write the message to fallback sink diff --git a/packages/pynumaflow-lite/tests/examples/sink_log_class.py b/packages/pynumaflow-lite/tests/examples/sink_log_class.py index 6df2c4d6..ade74e73 100644 --- a/packages/pynumaflow-lite/tests/examples/sink_log_class.py +++ b/packages/pynumaflow-lite/tests/examples/sink_log_class.py @@ -15,12 +15,28 @@ class SimpleLogSink(Sinker): """ Simple log sink that logs each message and returns success responses. This is the class-based approach matching the user's example. + Also demonstrates reading metadata (read-only for sink). """ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses: responses = sinker.Responses() async for msg in datums: _LOGGER.info("User Defined Sink %s", msg.value.decode("utf-8")) + + # Read system metadata (read-only) + _LOGGER.info("System metadata groups: %s", msg.system_metadata.groups()) + for group in msg.system_metadata.groups(): + for key in msg.system_metadata.keys(group): + value = msg.system_metadata.value(group, key) + _LOGGER.info(" System[%s][%s] = %s", group, key, value) + + # Read user metadata (read-only) + _LOGGER.info("User metadata groups: %s", msg.user_metadata.groups()) + for group in msg.user_metadata.groups(): + for key in msg.user_metadata.keys(group): + value = msg.user_metadata.value(group, key) + _LOGGER.info(" User[%s][%s] = %s", group, key, value) + responses.append(sinker.Response.as_success(msg.id)) # if we are not able to write to sink and if we have a fallback sink configured # we can use Response.as_fallback(msg.id) to write the message to fallback sink diff --git a/packages/pynumaflow-lite/tests/examples/source_simple.py b/packages/pynumaflow-lite/tests/examples/source_simple.py index 2517e6a0..ca90ec03 100644 --- a/packages/pynumaflow-lite/tests/examples/source_simple.py +++ b/packages/pynumaflow-lite/tests/examples/source_simple.py @@ -25,34 +25,43 @@ def __init__(self): async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[sourcer.Message]: """ The simple source generates messages with incrementing numbers. + Also demonstrates creating user metadata (source is origin, so only user metadata). """ _LOGGER.info(f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}") - + # Generate the requested number of messages for i in range(datum.num_records): # Create message payload payload = f"message-{self.counter}".encode("utf-8") - + # Create offset offset = sourcer.Offset( offset=str(self.counter).encode("utf-8"), partition_id=self.partition_idx ) - + + # Create user metadata for the message + user_metadata = sourcer.UserMetadata() + user_metadata.create_group("source_info") + user_metadata.add_kv("source_info", "source_name", b"simple_source") + user_metadata.add_kv("source_info", "message_id", str(self.counter).encode()) + user_metadata.add_kv("source_info", "partition", str(self.partition_idx).encode()) + # Create message message = sourcer.Message( payload=payload, offset=offset, event_time=datetime.now(timezone.utc), keys=["key1"], - headers={"source": "simple"} + headers={"source": "simple"}, + user_metadata=user_metadata ) - + _LOGGER.info(f"Generated message: {self.counter}") self.counter += 1 - + yield message - + # Small delay to simulate real source await asyncio.sleep(0.1) diff --git a/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py b/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py index e39366f0..03647b06 100644 --- a/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py +++ b/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py @@ -11,12 +11,14 @@ class EventFilter(sourcetransformer.SourceTransformer): """ A source transformer that filters and routes messages based on event time. - + - Messages before 2022 are dropped - Messages within 2022 are tagged with "within_year_2022" - Messages after 2022 are tagged with "after_year_2022" + + Also demonstrates reading and creating metadata. """ - + async def handler( self, keys: list[str], datum: sourcetransformer.Datum ) -> sourcetransformer.Messages: @@ -24,27 +26,57 @@ async def handler( event_time = datum.event_time messages = sourcetransformer.Messages() + # Read system metadata (read-only) + print(f"System metadata groups: {datum.system_metadata.groups()}") + for group in datum.system_metadata.groups(): + for key in datum.system_metadata.keys(group): + value = datum.system_metadata.value(group, key) + print(f" System[{group}][{key}] = {value}") + + # Read user metadata (read-only from input) + print(f"User metadata groups: {datum.user_metadata.groups()}") + for group in datum.user_metadata.groups(): + for key in datum.user_metadata.keys(group): + value = datum.user_metadata.value(group, key) + print(f" User[{group}][{key}] = {value}") + if event_time < january_first_2022: print(f"Got event time: {event_time}, it is before 2022, so dropping") messages.append(sourcetransformer.Message.message_to_drop(event_time)) elif event_time < january_first_2023: print(f"Got event time: {event_time}, it is within year 2022, so forwarding to within_year_2022") + + # Create user metadata for the outgoing message + user_metadata = sourcetransformer.UserMetadata() + user_metadata.create_group("filter_info") + user_metadata.add_kv("filter_info", "filter_result", b"within_year_2022") + user_metadata.add_kv("filter_info", "original_event_time", str(event_time).encode()) + messages.append( sourcetransformer.Message( value=val, event_time=january_first_2022, keys=keys, - tags=["within_year_2022"] + tags=["within_year_2022"], + user_metadata=user_metadata ) ) else: print(f"Got event time: {event_time}, it is after year 2022, so forwarding to after_year_2022") + + # Create user metadata for the outgoing message + user_metadata = sourcetransformer.UserMetadata() + user_metadata.create_group("filter_info") + user_metadata.add_kv("filter_info", "filter_result", b"after_year_2022") + user_metadata.add_kv("filter_info", "original_event_time", str(event_time).encode()) + messages.append( sourcetransformer.Message( value=val, event_time=january_first_2023, keys=keys, - tags=["after_year_2022"] + tags=["after_year_2022"], + user_metadata=user_metadata ) )