Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 29 additions & 33 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use tips_core::{
use tokio::sync::{broadcast, mpsc};
use tokio::time::{Duration, Instant, timeout};
use tracing::{debug, info, warn};
use uuid::Uuid;

use crate::metrics::{Metrics, record_histogram};
use crate::queue::{BundleQueuePublisher, MessageQueue, UserOpQueuePublisher};
Expand Down Expand Up @@ -86,7 +85,7 @@ pub struct IngressService<Q: MessageQueue, M: Mempool> {
builder_backrun_tx: broadcast::Sender<AcceptedBundle>,
max_backrun_txs: usize,
max_backrun_gas_limit: u64,
bundle_cache: Cache<Uuid, ()>,
bundle_cache: Cache<B256, ()>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is OK since uuid::new_v5 takes in a bundle_hash

}

impl<Q: MessageQueue, M: Mempool> IngressService<Q, M> {
Expand Down Expand Up @@ -308,43 +307,40 @@ impl<Q: MessageQueue + 'static, M: Mempool + 'static> IngressApiServer for Ingre

let bundle_hash = &parsed_bundle.bundle_hash();

self.metrics.bundles_parsed.increment(1);

let meter_bundle_response = match self.meter_bundle(&bundle, bundle_hash).await {
Ok(response) => {
info!(message = "Metering succeeded for raw transaction", bundle_hash = %bundle_hash, response = ?response);
Some(response)
}
Err(e) => {
warn!(
bundle_hash = %bundle_hash,
error = %e,
"Metering failed for raw transaction"
);
None
}
};

if let Some(meter_info) = meter_bundle_response.as_ref() {
self.metrics.successful_simulations.increment(1);
_ = self.builder_tx.send(meter_info.clone());
} else {
self.metrics.failed_simulations.increment(1);
}

let accepted_bundle =
AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default());

let bundle_id = *accepted_bundle.uuid();
if self.bundle_cache.get(&bundle_id).await.is_some() {
if self.bundle_cache.get(bundle_hash).await.is_some() {
debug!(
message = "Duplicate bundle detected, skipping Kafka publish",
bundle_id = %bundle_id,
bundle_hash = %bundle_hash,
transaction_hash = %transaction.tx_hash(),
);
} else {
self.bundle_cache.insert(bundle_id, ()).await;
self.bundle_cache.insert(*bundle_hash, ()).await;
self.metrics.bundles_parsed.increment(1);

let meter_bundle_response = match self.meter_bundle(&bundle, bundle_hash).await {
Ok(response) => {
info!(message = "Metering succeeded for raw transaction", bundle_hash = %bundle_hash, response = ?response);
Some(response)
}
Err(e) => {
warn!(
bundle_hash = %bundle_hash,
error = %e,
"Metering failed for raw transaction"
);
None
}
};

if let Some(meter_info) = meter_bundle_response.as_ref() {
self.metrics.successful_simulations.increment(1);
_ = self.builder_tx.send(meter_info.clone());
} else {
self.metrics.failed_simulations.increment(1);
}

let accepted_bundle =
AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default());

if send_to_kafka {
if let Err(e) = self
Expand Down