diff --git a/crates/hotfix-web/src/lib.rs b/crates/hotfix-web/src/lib.rs index a0123ed..4d5d62b 100644 --- a/crates/hotfix-web/src/lib.rs +++ b/crates/hotfix-web/src/lib.rs @@ -5,7 +5,7 @@ mod session_controller; use crate::endpoints::build_api_router; use crate::session_controller::{HttpSessionController, SessionController}; use axum::Router; -use hotfix::message::FixMessage; +use hotfix::message::OutboundMessage; use hotfix::session::SessionHandle; #[derive(Clone)] @@ -21,13 +21,13 @@ pub struct RouterConfig { } /// Build a router with default configuration (admin endpoints disabled) -pub fn build_router(session_handle: SessionHandle) -> Router { +pub fn build_router(session_handle: SessionHandle) -> Router { build_router_with_config(session_handle, RouterConfig::default()) } /// Build a router with custom configuration -pub fn build_router_with_config( - session_handle: SessionHandle, +pub fn build_router_with_config( + session_handle: SessionHandle, config: RouterConfig, ) -> Router { let controller = HttpSessionController { session_handle }; diff --git a/crates/hotfix-web/src/session_controller.rs b/crates/hotfix-web/src/session_controller.rs index 32b1dd0..c2d99f2 100644 --- a/crates/hotfix-web/src/session_controller.rs +++ b/crates/hotfix-web/src/session_controller.rs @@ -1,4 +1,4 @@ -use hotfix::message::FixMessage; +use hotfix::message::OutboundMessage; use hotfix::session::{SessionHandle, SessionInfo}; /// Controller for session operations, providing both read access and administrative actions @@ -11,12 +11,12 @@ pub trait SessionController: Clone + Send + Sync { /// HTTP session controller implementation that wraps a SessionHandle #[derive(Clone)] -pub struct HttpSessionController { - pub(crate) session_handle: SessionHandle, +pub struct HttpSessionController { + pub(crate) session_handle: SessionHandle, } #[async_trait::async_trait] -impl SessionController for HttpSessionController { +impl SessionController for HttpSessionController { async fn get_session_info(&self) -> anyhow::Result { self.session_handle.get_session_info().await } @@ -34,7 +34,9 @@ impl SessionController for HttpSessionController { // Note: We can't use a blanket impl due to Rust's orphan rules (can't impl foreign trait for generic type) #[cfg(feature = "ui")] #[async_trait::async_trait] -impl hotfix_web_ui::SessionInfoProvider for HttpSessionController { +impl hotfix_web_ui::SessionInfoProvider + for HttpSessionController +{ async fn get_session_info(&self) -> anyhow::Result { // Reuse the SessionController implementation SessionController::get_session_info(self).await @@ -43,12 +45,12 @@ impl hotfix_web_ui::SessionInfoProvider for HttpSessionController // Allow extracting HttpSessionController from AppState for hotfix-web-ui #[cfg(feature = "ui")] -impl axum::extract::FromRef>> - for HttpSessionController +impl axum::extract::FromRef>> + for HttpSessionController where - M: FixMessage, + Outbound: OutboundMessage, { - fn from_ref(state: &crate::AppState>) -> Self { + fn from_ref(state: &crate::AppState>) -> Self { state.controller.clone() } } diff --git a/crates/hotfix/src/application.rs b/crates/hotfix/src/application.rs index e60bc7a..40f2f3d 100644 --- a/crates/hotfix/src/application.rs +++ b/crates/hotfix/src/application.rs @@ -1,14 +1,14 @@ #[async_trait::async_trait] /// The application users of HotFIX can implement to hook into the engine. -pub trait Application: Send + Sync + 'static { +pub trait Application: Send + Sync + 'static { /// Called when a message is sent to the engine to be sent to the counterparty. /// /// This is invoked before the raw message is persisted in the message store. - async fn on_outbound_message(&self, msg: &M) -> OutboundDecision; + async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision; /// Called when a message is received from the counterparty. /// /// This is invoked after the message is verified and parsed into a typed message. - async fn on_inbound_message(&self, msg: M) -> InboundDecision; + async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision; /// Called when the session is logged out. async fn on_logout(&mut self, reason: &str); /// Called when the session is logged on. diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index a7b1e76..a9360e7 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -13,22 +13,22 @@ use tracing::{debug, warn}; use crate::application::Application; use crate::config::SessionConfig; -use crate::message::FixMessage; +use crate::message::{InboundMessage, OutboundMessage}; use crate::session::{InternalSessionRef, SessionHandle}; use crate::store::MessageStore; use crate::transport::connect; #[derive(Clone)] -pub struct Initiator { +pub struct Initiator { pub config: SessionConfig, - session_handle: SessionHandle, + session_handle: SessionHandle, completion_rx: watch::Receiver, } -impl Initiator { - pub async fn start( +impl Initiator { + pub async fn start( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + Send + Sync + 'static, ) -> Self { let session_ref = InternalSessionRef::new(config.clone(), application, store); @@ -47,7 +47,7 @@ impl Initiator { } } - pub async fn send_message(&self, msg: M) -> anyhow::Result<()> { + pub async fn send_message(&self, msg: Outbound) -> anyhow::Result<()> { self.session_handle.send_message(msg).await?; Ok(()) @@ -57,7 +57,7 @@ impl Initiator { self.config.sender_comp_id == sender_comp_id && self.config.target_comp_id == target_comp_id } - pub fn session_handle(&self) -> SessionHandle { + pub fn session_handle(&self) -> SessionHandle { self.session_handle.clone() } @@ -85,9 +85,9 @@ impl Initiator { } } -async fn establish_connection( +async fn establish_connection( config: SessionConfig, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, completion_tx: watch::Sender, ) { loop { diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index f512c5e..8395f17 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -18,11 +18,13 @@ pub mod verification; pub use parser::RawFixMessage; pub use resend_request::ResendRequest; -pub trait FixMessage: Clone + Send + 'static { +pub trait OutboundMessage: Clone + Send + 'static { fn write(&self, msg: &mut Message); fn message_type(&self) -> &str; +} +pub trait InboundMessage: Clone + Send + 'static { fn parse(message: &Message) -> Self; } @@ -31,7 +33,7 @@ pub fn generate_message( sender_comp_id: &str, target_comp_id: &str, msg_seq_num: u64, - message: impl FixMessage, + message: impl OutboundMessage, ) -> Result, EncodeError> { let mut msg = Message::new(begin_string, message.message_type()); msg.set(SENDER_COMP_ID, sender_comp_id); @@ -43,9 +45,3 @@ pub fn generate_message( msg.encode(&Config::default()) } - -pub trait WriteMessage { - fn write(&self, msg: &mut Message); - - fn message_type(&self) -> &str; -} diff --git a/crates/hotfix/src/message/heartbeat.rs b/crates/hotfix/src/message/heartbeat.rs index 7c1ec22..9d4c639 100644 --- a/crates/hotfix/src/message/heartbeat.rs +++ b/crates/hotfix/src/message/heartbeat.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::{InboundMessage, OutboundMessage}; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEST_REQ_ID; @@ -16,7 +16,7 @@ impl Heartbeat { } } -impl FixMessage for Heartbeat { +impl OutboundMessage for Heartbeat { fn write(&self, msg: &mut Message) { if let Some(req_id) = &self.test_req_id { msg.set(TEST_REQ_ID, req_id.as_str()); @@ -26,7 +26,9 @@ impl FixMessage for Heartbeat { fn message_type(&self) -> &str { "0" } +} +impl InboundMessage for Heartbeat { fn parse(_message: &Message) -> Self { // TODO: this needs to be implemented properly when we're implementing Test Requests Heartbeat { test_req_id: None } diff --git a/crates/hotfix/src/message/logon.rs b/crates/hotfix/src/message/logon.rs index 8f34609..8571618 100644 --- a/crates/hotfix/src/message/logon.rs +++ b/crates/hotfix/src/message/logon.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::message::Message; use hotfix_message::session_fields::{ ENCRYPT_METHOD, HEART_BT_INT, NEXT_EXPECTED_MSG_SEQ_NUM, RESET_SEQ_NUM_FLAG, @@ -33,7 +33,7 @@ impl Logon { } } -impl FixMessage for Logon { +impl OutboundMessage for Logon { fn write(&self, msg: &mut Message) { msg.set(ENCRYPT_METHOD, self.encrypt_method); msg.set(HEART_BT_INT, self.heartbeat_interval); @@ -47,10 +47,6 @@ impl FixMessage for Logon { fn message_type(&self) -> &str { "A" } - - fn parse(_message: &Message) -> Self { - todo!() - } } #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FieldType)] diff --git a/crates/hotfix/src/message/logout.rs b/crates/hotfix/src/message/logout.rs index 4b4d55f..12141af 100644 --- a/crates/hotfix/src/message/logout.rs +++ b/crates/hotfix/src/message/logout.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEXT; @@ -14,7 +14,7 @@ impl Logout { } } -impl FixMessage for Logout { +impl OutboundMessage for Logout { fn write(&self, msg: &mut Message) { if let Some(value) = &self.text { msg.set(TEXT, value.as_str()); @@ -24,8 +24,4 @@ impl FixMessage for Logout { fn message_type(&self) -> &str { "5" } - - fn parse(_message: &Message) -> Self { - unimplemented!() - } } diff --git a/crates/hotfix/src/message/reject.rs b/crates/hotfix/src/message/reject.rs index 7eec3d6..ddf99df 100644 --- a/crates/hotfix/src/message/reject.rs +++ b/crates/hotfix/src/message/reject.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::{InboundMessage, OutboundMessage}; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::{ @@ -52,7 +52,7 @@ impl Reject { } } -impl FixMessage for Reject { +impl OutboundMessage for Reject { fn write(&self, msg: &mut Message) { msg.set(REF_SEQ_NUM, self.ref_seq_num); @@ -73,7 +73,9 @@ impl FixMessage for Reject { fn message_type(&self) -> &str { "3" } +} +impl InboundMessage for Reject { fn parse(message: &Message) -> Self { Self { ref_seq_num: message.get(REF_SEQ_NUM).unwrap(), diff --git a/crates/hotfix/src/message/resend_request.rs b/crates/hotfix/src/message/resend_request.rs index 3eddc96..1aca0ce 100644 --- a/crates/hotfix/src/message/resend_request.rs +++ b/crates/hotfix/src/message/resend_request.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::{BEGIN_SEQ_NO, END_SEQ_NO}; @@ -18,7 +18,7 @@ impl ResendRequest { } } -impl FixMessage for ResendRequest { +impl OutboundMessage for ResendRequest { fn write(&self, msg: &mut Message) { msg.set(BEGIN_SEQ_NO, self.begin_seq_no); msg.set(END_SEQ_NO, self.end_seq_no); @@ -27,8 +27,4 @@ impl FixMessage for ResendRequest { fn message_type(&self) -> &str { "2" } - - fn parse(_message: &Message) -> Self { - todo!() - } } diff --git a/crates/hotfix/src/message/sequence_reset.rs b/crates/hotfix/src/message/sequence_reset.rs index 2f4a5d4..152d9ed 100644 --- a/crates/hotfix/src/message/sequence_reset.rs +++ b/crates/hotfix/src/message/sequence_reset.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::field_types::Timestamp; use hotfix_message::message::Message; @@ -12,7 +12,7 @@ pub struct SequenceReset { pub new_seq_no: u64, } -impl FixMessage for SequenceReset { +impl OutboundMessage for SequenceReset { fn write(&self, msg: &mut Message) { msg.set(GAP_FILL_FLAG, self.gap_fill); msg.set(NEW_SEQ_NO, self.new_seq_no); @@ -24,8 +24,4 @@ impl FixMessage for SequenceReset { fn message_type(&self) -> &str { "4" } - - fn parse(_message: &Message) -> Self { - todo!() - } } diff --git a/crates/hotfix/src/message/test_request.rs b/crates/hotfix/src/message/test_request.rs index afb2381..0dac034 100644 --- a/crates/hotfix/src/message/test_request.rs +++ b/crates/hotfix/src/message/test_request.rs @@ -1,4 +1,4 @@ -use crate::message::FixMessage; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEST_REQ_ID; @@ -14,7 +14,7 @@ impl TestRequest { } } -impl FixMessage for TestRequest { +impl OutboundMessage for TestRequest { fn write(&self, msg: &mut Message) { msg.set(TEST_REQ_ID, self.test_req_id.as_str()); } @@ -22,8 +22,4 @@ impl FixMessage for TestRequest { fn message_type(&self) -> &str { "1" } - - fn parse(_message: &Message) -> Self { - unimplemented!() - } } diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index adce28f..2b6f77a 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -6,11 +6,11 @@ pub mod session_ref; mod state; use crate::config::SessionConfig; -use crate::message::FixMessage; -use crate::message::generate_message; +use crate::message::OutboundMessage; use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::parser::RawFixMessage; +use crate::message::{InboundMessage, generate_message}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; use anyhow::{Result, anyhow}; @@ -54,7 +54,7 @@ pub use crate::session::session_handle::SessionHandle; const SCHEDULE_CHECK_INTERVAL: u64 = 1; -struct Session { +struct Session { message_config: MessageConfig, config: SessionConfig, schedule: SessionSchedule, @@ -64,11 +64,21 @@ struct Session { store: S, schedule_check_timer: Pin>, reset_on_next_logon: bool, - _phantom: std::marker::PhantomData M>, + _phantom: std::marker::PhantomData (I, O)>, } -impl, M: FixMessage, S: MessageStore> Session { - fn new(config: SessionConfig, application: A, store: S) -> Session { +impl Session +where + App: Application, + Inbound: InboundMessage, + Outbound: OutboundMessage, + Store: MessageStore, +{ + fn new( + config: SessionConfig, + application: App, + store: Store, + ) -> Session { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config); @@ -219,7 +229,7 @@ impl, M: FixMessage, S: MessageStore> Session { async fn process_app_message(&mut self, message: &Message) -> Result<()> { match self.verify_message(message, true) { Ok(_) => { - let parsed_message = M::parse(message); + let parsed_message = Inbound::parse(message); if matches!( self.application.on_inbound_message(parsed_message).await, InboundDecision::TerminateSession @@ -732,7 +742,7 @@ impl, M: FixMessage, S: MessageStore> Session { .reset_peer_timer(self.config.heartbeat_interval, test_request_id); } - async fn send_app_message(&mut self, message: M) { + async fn send_app_message(&mut self, message: Outbound) { match self.application.on_outbound_message(&message).await { OutboundDecision::Send => { self.send_message(message).await; @@ -747,7 +757,7 @@ impl, M: FixMessage, S: MessageStore> Session { } } - async fn send_message(&mut self, message: impl FixMessage) { + async fn send_message(&mut self, message: impl OutboundMessage) { let seq_num = self.store.next_sender_seq_number(); self.store.increment_sender_seq_number().await.unwrap(); @@ -869,7 +879,7 @@ impl, M: FixMessage, S: MessageStore> Session { } } - async fn handle_outbound_message(&mut self, message: M) { + async fn handle_outbound_message(&mut self, message: Outbound) { self.send_app_message(message).await; } @@ -964,15 +974,16 @@ impl, M: FixMessage, S: MessageStore> Session { } } -async fn run_session( - mut session: Session, +async fn run_session( + mut session: Session, mut event_receiver: mpsc::Receiver, - mut outbound_message_receiver: mpsc::Receiver, + mut outbound_message_receiver: mpsc::Receiver, mut admin_request_receiver: mpsc::Receiver, ) where - A: Application, - M: FixMessage, - S: MessageStore + Send + 'static, + App: Application, + Inbound: InboundMessage, + Outbound: OutboundMessage, + Store: MessageStore + Send + 'static, { loop { select! { diff --git a/crates/hotfix/src/session/session_handle.rs b/crates/hotfix/src/session/session_handle.rs index 213ab38..6dfe823 100644 --- a/crates/hotfix/src/session/session_handle.rs +++ b/crates/hotfix/src/session/session_handle.rs @@ -10,12 +10,12 @@ use tokio::sync::{mpsc, oneshot}; /// such as inbound message processing and disconnects, [`SessionHandle`] is public /// and only exposes APIs intended for consumers of the engine. #[derive(Clone, Debug)] -pub struct SessionHandle { - outbound_message_sender: mpsc::Sender, +pub struct SessionHandle { + outbound_message_sender: mpsc::Sender, admin_request_sender: mpsc::Sender, } -impl SessionHandle { +impl SessionHandle { pub async fn get_session_info(&self) -> anyhow::Result { let (sender, receiver) = oneshot::channel::(); self.admin_request_sender @@ -24,7 +24,7 @@ impl SessionHandle { Ok(receiver.await?) } - pub async fn send_message(&self, msg: M) -> anyhow::Result<()> { + pub async fn send_message(&self, msg: Outbound) -> anyhow::Result<()> { self.outbound_message_sender .send(msg) .await diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index bc16443..8a449d8 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -1,5 +1,5 @@ use crate::config::SessionConfig; -use crate::message::{FixMessage, RawFixMessage}; +use crate::message::{InboundMessage, OutboundMessage, RawFixMessage}; use crate::session::Session; use crate::session::admin_request::AdminRequest; use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent}; @@ -10,20 +10,20 @@ use tokio::sync::{mpsc, oneshot}; use tracing::debug; #[derive(Clone)] -pub struct InternalSessionRef { +pub struct InternalSessionRef { pub(crate) event_sender: mpsc::Sender, - pub(crate) outbound_message_sender: mpsc::Sender, + pub(crate) outbound_message_sender: mpsc::Sender, pub(crate) admin_request_sender: mpsc::Sender, } -impl InternalSessionRef { - pub fn new( +impl InternalSessionRef { + pub fn new( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + Send + Sync + 'static, ) -> Self { let (event_sender, event_receiver) = mpsc::channel::(100); - let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::(10); + let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::(10); let (admin_request_sender, admin_request_receiver) = mpsc::channel::(10); let session = Session::new(config, application, store); tokio::spawn(session::run_session( diff --git a/crates/hotfix/src/transport/socket.rs b/crates/hotfix/src/transport/socket.rs index d1c1750..4bcf4b8 100644 --- a/crates/hotfix/src/transport/socket.rs +++ b/crates/hotfix/src/transport/socket.rs @@ -6,10 +6,10 @@ pub mod tls; use std::io; use tokio::io::{AsyncRead, AsyncWrite}; +use crate::message::OutboundMessage; use crate::session::InternalSessionRef; use crate::{ config::SessionConfig, - message::FixMessage, transport::{ FixConnection, socket_reader::spawn_socket_reader, socket_writer::spawn_socket_writer, tcp::create_tcp_connection, tls::create_tcp_over_tls_connection, @@ -19,7 +19,7 @@ use crate::{ /// Connect over TCP/TLS and return a FixConnection pub async fn connect( config: &SessionConfig, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, ) -> io::Result { let use_tls = config.tls_config.is_some(); @@ -34,12 +34,12 @@ pub async fn connect( Ok(conn) } -async fn _create_io_refs( - session_ref: InternalSessionRef, +async fn _create_io_refs( + session_ref: InternalSessionRef, stream: Stream, ) -> FixConnection where - M: FixMessage, + Outbound: OutboundMessage, Stream: AsyncRead + AsyncWrite + Send + 'static, { let (reader, writer) = tokio::io::split(stream); diff --git a/crates/hotfix/src/transport/socket/socket_reader.rs b/crates/hotfix/src/transport/socket/socket_reader.rs index 407f102..cc28b10 100644 --- a/crates/hotfix/src/transport/socket/socket_reader.rs +++ b/crates/hotfix/src/transport/socket/socket_reader.rs @@ -2,14 +2,14 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadHalf}; use tokio::sync::oneshot; use tracing::debug; -use crate::message::FixMessage; +use crate::message::OutboundMessage; use crate::message::parser::Parser; use crate::session::InternalSessionRef; use crate::transport::reader::ReaderRef; pub fn spawn_socket_reader( reader: ReadHalf, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, ) -> ReaderRef { let (dc_sender, dc_receiver) = oneshot::channel(); let actor = ReaderActor::new(reader, session_ref, dc_sender); @@ -38,9 +38,9 @@ impl ReaderActor { } } -async fn run_reader(mut actor: ReaderActor) +async fn run_reader(mut actor: ReaderActor) where - M: FixMessage, + Outbound: OutboundMessage, R: AsyncRead, { let mut parser = Parser::default(); @@ -78,7 +78,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::message::{FixMessage, Message}; + use crate::message::Message; use crate::session::admin_request::AdminRequest; use crate::session::event::SessionEvent; use tokio::io::{AsyncWriteExt, duplex}; @@ -87,16 +87,12 @@ mod tests { #[derive(Clone, Debug, PartialEq)] struct TestMessage; - impl FixMessage for TestMessage { + impl OutboundMessage for TestMessage { fn write(&self, _msg: &mut Message) {} fn message_type(&self) -> &str { "TEST" } - - fn parse(_message: &Message) -> Self { - TestMessage - } } /// Creates a test InternalSessionRef that captures events for verification diff --git a/crates/hotfix/tests/common/actions.rs b/crates/hotfix/tests/common/actions.rs index 1c3cf73..a862f0b 100644 --- a/crates/hotfix/tests/common/actions.rs +++ b/crates/hotfix/tests/common/actions.rs @@ -1,6 +1,6 @@ use crate::common::fakes::{FakeCounterparty, SessionSpy}; use crate::common::test_messages::TestMessage; -use hotfix::message::FixMessage; +use hotfix::message::OutboundMessage; use std::time::Duration; pub struct When { @@ -26,7 +26,7 @@ impl When<&SessionSpy> { } impl When<&mut FakeCounterparty> { - pub async fn has_previously_sent(&mut self, message: impl FixMessage) { + pub async fn has_previously_sent(&mut self, message: impl OutboundMessage) { self.target.push_previously_sent_message(message).await; } @@ -38,7 +38,7 @@ impl When<&mut FakeCounterparty> { self.target.resend_message(sequence_number, true).await; } - pub async fn sends_message(&mut self, message: impl FixMessage) { + pub async fn sends_message(&mut self, message: impl OutboundMessage) { self.target.send_message(message).await; } diff --git a/crates/hotfix/tests/common/fakes/fake_application.rs b/crates/hotfix/tests/common/fakes/fake_application.rs index 68ab7bb..06c0218 100644 --- a/crates/hotfix/tests/common/fakes/fake_application.rs +++ b/crates/hotfix/tests/common/fakes/fake_application.rs @@ -13,7 +13,7 @@ impl FakeApplication { } #[async_trait::async_trait] -impl Application for FakeApplication { +impl Application for FakeApplication { async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision { OutboundDecision::Send } diff --git a/crates/hotfix/tests/common/fakes/fake_counterparty.rs b/crates/hotfix/tests/common/fakes/fake_counterparty.rs index 4621592..33cb643 100644 --- a/crates/hotfix/tests/common/fakes/fake_counterparty.rs +++ b/crates/hotfix/tests/common/fakes/fake_counterparty.rs @@ -1,7 +1,7 @@ use hotfix::config::SessionConfig; use hotfix::message::logon::{Logon, ResetSeqNumConfig}; use hotfix::message::sequence_reset::SequenceReset; -use hotfix::message::{FixMessage, RawFixMessage, generate_message}; +use hotfix::message::{OutboundMessage, RawFixMessage, generate_message}; use hotfix::session::InternalSessionRef; use hotfix::transport::FixConnection; use hotfix::transport::reader::ReaderRef; @@ -14,11 +14,11 @@ use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; -pub struct FakeCounterparty { +pub struct FakeCounterparty { receiver: Receiver, received_messages: Vec, sent_messages: Vec>, - session_ref: InternalSessionRef, + session_ref: InternalSessionRef, session_config: SessionConfig, message_builder: MessageBuilder, message_config: MessageConfig, @@ -26,11 +26,14 @@ pub struct FakeCounterparty { _dc_sender: oneshot::Sender<()>, } -impl FakeCounterparty +impl FakeCounterparty where - M: FixMessage, + Outbound: OutboundMessage, { - pub async fn start(session_ref: InternalSessionRef, session_config: SessionConfig) -> Self { + pub async fn start( + session_ref: InternalSessionRef, + session_config: SessionConfig, + ) -> Self { let (writer_ref, receiver) = Self::create_writer(); let (reader_ref, dc_sender) = Self::create_reader(); let connection = FixConnection::new(writer_ref, reader_ref); @@ -68,7 +71,7 @@ where } } - pub async fn push_previously_sent_message(&mut self, message: impl FixMessage) { + pub async fn push_previously_sent_message(&mut self, message: impl OutboundMessage) { let raw_message = generate_message( &self.session_config.begin_string, &self.session_config.sender_comp_id, @@ -148,7 +151,7 @@ where self.send_message(logon).await; } - pub async fn send_message(&mut self, message: impl FixMessage) { + pub async fn send_message(&mut self, message: impl OutboundMessage) { let raw_message = generate_message( &self.session_config.begin_string, &self.session_config.sender_comp_id, diff --git a/crates/hotfix/tests/common/test_messages.rs b/crates/hotfix/tests/common/test_messages.rs index b90a917..f17e605 100644 --- a/crates/hotfix/tests/common/test_messages.rs +++ b/crates/hotfix/tests/common/test_messages.rs @@ -1,7 +1,7 @@ use crate::common::setup::{COUNTERPARTY_COMP_ID, OUR_COMP_ID}; use chrono::TimeDelta; use hotfix::Message as HotfixMessage; -use hotfix::message::{FixMessage, generate_message}; +use hotfix::message::{InboundMessage, OutboundMessage, generate_message}; use hotfix_message::dict::{FieldLocation, FixDatatype}; use hotfix_message::field_types::Timestamp; use hotfix_message::message::{Config, Message}; @@ -63,7 +63,7 @@ impl TestMessage { } } -impl FixMessage for TestMessage { +impl OutboundMessage for TestMessage { fn write(&self, msg: &mut HotfixMessage) { match self { TestMessage::ExecutionReport { @@ -109,7 +109,9 @@ impl FixMessage for TestMessage { TestMessage::NewOrderSingle { .. } => "D", } } +} +impl InboundMessage for TestMessage { fn parse(msg: &HotfixMessage) -> Self { let msg_type: &str = msg.header().get(fix44::MSG_TYPE).unwrap(); match msg_type { @@ -188,7 +190,7 @@ impl Default for ExecutionReportWithInvalidField { } } -impl FixMessage for ExecutionReportWithInvalidField { +impl OutboundMessage for ExecutionReportWithInvalidField { fn write(&self, msg: &mut Message) { msg.set(fix44::ORDER_ID, self.order_id.as_str()); msg.set(fix44::EXEC_ID, self.exec_id.as_str()); @@ -206,11 +208,6 @@ impl FixMessage for ExecutionReportWithInvalidField { fn message_type(&self) -> &str { "D" } - - fn parse(_message: &Message) -> Self { - // we never parse this message - unimplemented!() - } } pub const CUSTOM_FIELD: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition { diff --git a/crates/hotfix/tests/session_test_cases/business_tests.rs b/crates/hotfix/tests/session_test_cases/business_tests.rs index d4f785b..f480199 100644 --- a/crates/hotfix/tests/session_test_cases/business_tests.rs +++ b/crates/hotfix/tests/session_test_cases/business_tests.rs @@ -3,7 +3,7 @@ use crate::common::assertions::then; use crate::common::cleanup::finally; use crate::common::setup::given_an_active_session; use crate::common::test_messages::TestMessage; -use hotfix::message::FixMessage; +use hotfix::message::{InboundMessage, OutboundMessage}; use hotfix_message::{FieldType, fix44::MsgType}; #[tokio::test] diff --git a/crates/hotfix/tests/session_test_cases/resend_tests.rs b/crates/hotfix/tests/session_test_cases/resend_tests.rs index 865652c..82278cf 100644 --- a/crates/hotfix/tests/session_test_cases/resend_tests.rs +++ b/crates/hotfix/tests/session_test_cases/resend_tests.rs @@ -5,7 +5,7 @@ use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session}; use crate::common::test_messages::{ TestMessage, build_execution_report_with_incorrect_body_length, build_invalid_resend_request, }; -use hotfix::message::{FixMessage, ResendRequest}; +use hotfix::message::{OutboundMessage, ResendRequest}; use hotfix::session::Status; use hotfix_message::fix44::{GAP_FILL_FLAG, MsgType, NEW_SEQ_NO}; use hotfix_message::{FieldType, Part}; diff --git a/examples/load-testing/src/application.rs b/examples/load-testing/src/application.rs index 330d9d3..981c42e 100644 --- a/examples/load-testing/src/application.rs +++ b/examples/load-testing/src/application.rs @@ -1,4 +1,4 @@ -use crate::messages::{ExecutionReport, Message}; +use crate::messages::{ExecutionReport, InboundMsg, OutboundMsg}; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; use tokio::sync::mpsc::UnboundedSender; @@ -15,17 +15,14 @@ impl LoadTestingApplication { } #[async_trait::async_trait] -impl Application for LoadTestingApplication { - async fn on_outbound_message(&self, _msg: &Message) -> OutboundDecision { +impl Application for LoadTestingApplication { + async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, msg: Message) -> InboundDecision { + async fn on_inbound_message(&self, msg: InboundMsg) -> InboundDecision { match msg { - Message::NewOrderSingle(_) => { - unimplemented!("we should not receive orders"); - } - Message::Unimplemented(data) => { + InboundMsg::Unimplemented(data) => { let pretty_bytes: Vec = data .iter() .map(|b| if *b == b'\x01' { b'|' } else { *b }) @@ -33,7 +30,7 @@ impl Application for LoadTestingApplication { let s = std::str::from_utf8(&pretty_bytes).unwrap_or("invalid characters"); info!("received message: {:?}", s); } - Message::ExecutionReport(report) => { + InboundMsg::ExecutionReport(report) => { if self.sender.send(report).is_err() { return InboundDecision::TerminateSession; } diff --git a/examples/load-testing/src/main.rs b/examples/load-testing/src/main.rs index 12c8b5f..52001de 100644 --- a/examples/load-testing/src/main.rs +++ b/examples/load-testing/src/main.rs @@ -2,7 +2,7 @@ mod application; mod messages; use crate::application::LoadTestingApplication; -use crate::messages::{ExecutionReport, Message, NewOrderSingle}; +use crate::messages::{ExecutionReport, NewOrderSingle, OutboundMsg}; use clap::{Parser, ValueEnum}; use hotfix::config::SessionConfig; use hotfix::field_types::{Date, Timestamp}; @@ -91,7 +91,7 @@ async fn start_session( session_config: SessionConfig, db_config: Database, app: LoadTestingApplication, -) -> Initiator { +) -> Initiator { match db_config { Database::Memory => { let store = hotfix::store::in_memory::InMemoryMessageStore::default(); @@ -110,16 +110,16 @@ async fn start_session( } } -async fn submit_messages(session_handle: SessionHandle, message_count: u32) { +async fn submit_messages(session_handle: SessionHandle, message_count: u32) { for _ in 0..message_count { submit_message(&session_handle).await; } } -async fn submit_message(session_handle: &SessionHandle) { +async fn submit_message(session_handle: &SessionHandle) { let mut order_id = format!("{}", uuid::Uuid::new_v4()); order_id.truncate(12); - let order = NewOrderSingle { + let order = OutboundMsg::NewOrderSingle(NewOrderSingle { transact_time: Timestamp::utc_now(), symbol: "EUR/USD".to_string(), cl_ord_id: order_id, @@ -131,11 +131,10 @@ async fn submit_message(session_handle: &SessionHandle) { number_of_allocations: 1, allocation_account: "acc1".to_string(), allocation_quantity: 230, - }; - let msg = Message::NewOrderSingle(order); + }); session_handle - .send_message(msg) + .send_message(order) .await .expect("session to accept message"); } diff --git a/examples/load-testing/src/messages.rs b/examples/load-testing/src/messages.rs index d3f2feb..02ad607 100644 --- a/examples/load-testing/src/messages.rs +++ b/examples/load-testing/src/messages.rs @@ -2,7 +2,7 @@ use hotfix::Message as HotfixMessage; use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; use hotfix::fix44::{OrdStatus, OrdType, Side}; -use hotfix::message::{FixMessage, Part, RepeatingGroup}; +use hotfix::message::{InboundMessage, OutboundMessage, Part, RepeatingGroup}; #[derive(Debug, Clone)] #[allow(dead_code)] @@ -38,13 +38,17 @@ pub struct NewOrderSingle { } #[derive(Debug, Clone)] -pub enum Message { +pub enum InboundMsg { ExecutionReport(ExecutionReport), - NewOrderSingle(NewOrderSingle), Unimplemented(Vec), } -impl Message { +#[derive(Debug, Clone)] +pub enum OutboundMsg { + NewOrderSingle(NewOrderSingle), +} + +impl InboundMsg { fn parse_execution_report_ack(message: &HotfixMessage) -> Self { let report = ExecutionReport { order_id: message.get::<&str>(fix44::ORDER_ID).unwrap().to_string(), @@ -62,10 +66,10 @@ impl Message { } } -impl FixMessage for Message { +impl OutboundMessage for OutboundMsg { fn write(&self, msg: &mut HotfixMessage) { match self { - Self::NewOrderSingle(order) => { + OutboundMsg::NewOrderSingle(order) => { // order details msg.set(fix44::TRANSACT_TIME, order.transact_time.clone()); msg.set(fix44::SYMBOL, order.symbol.as_str()); @@ -83,17 +87,17 @@ impl FixMessage for Message { allocation.set(fix44::ALLOC_QTY, order.allocation_quantity); msg.set_groups(vec![allocation]); } - _ => unimplemented!(), } } fn message_type(&self) -> &str { match self { - Self::NewOrderSingle(_) => "D", - _ => unimplemented!(), + OutboundMsg::NewOrderSingle(_) => "D", } } +} +impl InboundMessage for InboundMsg { fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); if message_type == "8" { diff --git a/examples/simple-new-order/src/application.rs b/examples/simple-new-order/src/application.rs index 3bbf24c..9f0219f 100644 --- a/examples/simple-new-order/src/application.rs +++ b/examples/simple-new-order/src/application.rs @@ -1,4 +1,4 @@ -use crate::messages::Message; +use crate::messages::{InboundMsg, OutboundMsg}; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; use tracing::info; @@ -7,17 +7,14 @@ use tracing::info; pub struct TestApplication {} #[async_trait::async_trait] -impl Application for TestApplication { - async fn on_outbound_message(&self, _msg: &Message) -> OutboundDecision { +impl Application for TestApplication { + async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, msg: Message) -> InboundDecision { + async fn on_inbound_message(&self, msg: InboundMsg) -> InboundDecision { match msg { - Message::NewOrderSingle(_) => { - unimplemented!("we should not receive orders"); - } - Message::UnimplementedMessage(data) => { + InboundMsg::Unimplemented(data) => { let pretty_bytes: Vec = data .iter() .map(|b| if *b == b'\x01' { b'|' } else { *b }) diff --git a/examples/simple-new-order/src/main.rs b/examples/simple-new-order/src/main.rs index bd782b5..e09c707 100644 --- a/examples/simple-new-order/src/main.rs +++ b/examples/simple-new-order/src/main.rs @@ -2,7 +2,7 @@ mod application; mod messages; use crate::application::TestApplication; -use crate::messages::{Message, NewOrderSingle}; +use crate::messages::{NewOrderSingle, OutboundMsg}; use clap::{Parser, ValueEnum}; use hotfix::config::Config; use hotfix::field_types::{Date, Timestamp}; @@ -79,7 +79,7 @@ async fn main() { .expect("graceful shutdown to succeed"); } -async fn user_loop(session: &Initiator) { +async fn user_loop(session: &Initiator) { loop { println!("(q) to quit, (s) to send message"); @@ -105,7 +105,7 @@ async fn user_loop(session: &Initiator) { } } -async fn send_message(session: &Initiator) { +async fn send_message(session: &Initiator) { let mut order_id = format!("{}", uuid::Uuid::new_v4()); order_id.truncate(12); let order = NewOrderSingle { @@ -120,7 +120,7 @@ async fn send_message(session: &Initiator) { allocation_account: "acc1".to_string(), allocation_quantity: 230, }; - let msg = Message::NewOrderSingle(order); + let msg = OutboundMsg::NewOrderSingle(order); session.send_message(msg).await.unwrap(); } @@ -129,7 +129,7 @@ async fn start_session( config_path: &str, db_config: &Database, app: TestApplication, -) -> Initiator { +) -> Initiator { let mut config = Config::load_from_path(config_path); let session_config = config.sessions.pop().expect("config to include a session"); @@ -154,7 +154,7 @@ async fn start_session( } async fn start_web_service( - session_handle: SessionHandle, + session_handle: SessionHandle, cancellation_token: CancellationToken, ) { let config = RouterConfig { diff --git a/examples/simple-new-order/src/messages.rs b/examples/simple-new-order/src/messages.rs index 3fa899d..dbee98f 100644 --- a/examples/simple-new-order/src/messages.rs +++ b/examples/simple-new-order/src/messages.rs @@ -1,7 +1,7 @@ use hotfix::Message as HotfixMessage; use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; -use hotfix::message::{FixMessage, Part, RepeatingGroup}; +use hotfix::message::{InboundMessage, OutboundMessage, Part, RepeatingGroup}; #[derive(Debug, Clone)] pub struct NewOrderSingle { @@ -21,15 +21,19 @@ pub struct NewOrderSingle { } #[derive(Debug, Clone)] -pub enum Message { +pub enum InboundMsg { + Unimplemented(Vec), +} + +#[derive(Debug, Clone)] +pub enum OutboundMsg { NewOrderSingle(NewOrderSingle), - UnimplementedMessage(Vec), } -impl FixMessage for Message { +impl OutboundMessage for OutboundMsg { fn write(&self, msg: &mut HotfixMessage) { match self { - Self::NewOrderSingle(order) => { + OutboundMsg::NewOrderSingle(order) => { // order details msg.set(fix44::TRANSACT_TIME, order.transact_time.clone()); msg.set(fix44::SYMBOL, order.symbol.as_str()); @@ -46,19 +50,19 @@ impl FixMessage for Message { allocation.set(fix44::ALLOC_QTY, order.allocation_quantity); msg.set_groups(vec![allocation]); } - _ => unimplemented!(), } } fn message_type(&self) -> &str { match self { - Self::NewOrderSingle(_) => "D", - _ => unimplemented!(), + OutboundMsg::NewOrderSingle(_) => "D", } } +} +impl InboundMessage for InboundMsg { fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); - Self::UnimplementedMessage(message_type.as_bytes().to_vec()) + Self::Unimplemented(message_type.as_bytes().to_vec()) } }