Skip to content

Commit 55bbd32

Browse files
Better replacement latancey (#908)
## 📝 Summary Increassed the ORDERS_CONSUMED_PER_BATCH from 1024 to 4096. Implemented cancellation in orders queued for simlulation. ## ✅ I have completed the following steps: * [X] Run `make lint` * [X] Run `make test` * [ ] Added tests (if applicable) --------- Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com>
1 parent 80ebfc8 commit 55bbd32

File tree

5 files changed

+133
-90
lines changed

5 files changed

+133
-90
lines changed

crates/rbuilder/src/building/builders/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ use tracing::{error, info, warn};
4141

4242
use super::{simulated_order_command_to_sink, OrderPriority, PrioritizedOrderStore};
4343

44+
/// Orders that blocking_consume_next_commands will consume.
45+
/// A slow algorithm would check approx every 200ms, to fill this batch size it would take
46+
/// 8192/.2 = 40960 order/sec which is even more than what we see in the whole slot for a busy block.
47+
const ORDERS_CONSUMED_PER_BATCH: usize = 8192;
48+
4449
/// Block we built
4550
#[derive(Debug, Clone)]
4651
pub struct Block {
@@ -137,6 +142,7 @@ impl OrderConsumer {
137142
/// New commands are accumulatd in self.new_commands
138143
/// Call apply_new_commands to easily consume them.
139144
/// This method will block until the first command is received
145+
/// @Pending: This method consumes a fixed (ORDERS_CONSUMED_PER_BATCH) number of pending orders. We should reconsider doing something depending on the age of the orders?
140146
pub fn blocking_consume_next_commands(
141147
&mut self,
142148
) -> eyre::Result<Option<JournalSequenceNumber>> {
@@ -149,7 +155,7 @@ impl OrderConsumer {
149155
warn!(msg, "Builder thread lagging on sim orders channel");
150156
}
151157
}
152-
for _ in 0..1024 {
158+
for _ in 0..ORDERS_CONSUMED_PER_BATCH {
153159
match self.orders.try_recv() {
154160
Ok(order) => self.add_command(order),
155161
Err(TryRecvError::Empty) => {

crates/rbuilder/src/building/sim.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use reth_provider::StateProvider;
2222
use std::{
2323
cmp::{max, min, Ordering},
2424
collections::hash_map::Entry,
25-
sync::Arc,
25+
sync::{atomic::AtomicBool, Arc},
2626
time::{Duration, Instant},
2727
};
2828
use tracing::{error, trace};
@@ -55,13 +55,36 @@ struct PendingOrder {
5555

5656
pub type SimulationId = u64;
5757

58-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58+
#[derive(Debug, Clone)]
5959
pub struct SimulationRequest {
6060
pub id: SimulationId,
6161
pub order: Arc<Order>,
6262
pub parents: Vec<Arc<Order>>,
6363
}
6464

65+
/// SimulationRequest with an extra bool to be able to cancel it externally.
66+
#[derive(Debug, Clone)]
67+
pub struct CancellableSimulationRequest {
68+
request: SimulationRequest,
69+
// If cancelled we don't simulate.
70+
cancelled: Arc<AtomicBool>,
71+
}
72+
73+
impl CancellableSimulationRequest {
74+
pub fn new(request: SimulationRequest, cancelled: Arc<AtomicBool>) -> Self {
75+
Self { request, cancelled }
76+
}
77+
78+
/// If cancelled returns none.
79+
pub fn into_request(self) -> Option<SimulationRequest> {
80+
if self.cancelled.load(std::sync::atomic::Ordering::Relaxed) {
81+
None
82+
} else {
83+
Some(self.request)
84+
}
85+
}
86+
}
87+
6588
#[derive(Debug, Clone, PartialEq, Eq)]
6689
pub struct SimulatedResult {
6790
pub id: SimulationId,

crates/rbuilder/src/live_builder/simulation/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub mod simulation_job_tracer;
44

55
use crate::{
66
building::{
7-
sim::{SimTree, SimulatedResult, SimulationRequest},
7+
sim::{CancellableSimulationRequest, SimTree, SimulatedResult},
88
tx_sim_cache::TxExecutionCache,
99
BlockBuildingContext,
1010
},
@@ -36,7 +36,7 @@ type BlockContextId = u64;
3636
pub struct SimulationContext {
3737
pub block_ctx: BlockBuildingContext,
3838
/// Simulation requests come in through this channel.
39-
pub requests: flume::Receiver<SimulationRequest>,
39+
pub requests: flume::Receiver<CancellableSimulationRequest>,
4040
/// Simulation results go out through this channel.
4141
pub results: mpsc::Sender<SimulatedResult>,
4242
}

crates/rbuilder/src/live_builder/simulation/sim_worker.rs

Lines changed: 55 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -57,61 +57,63 @@ pub fn run_sim_worker<P>(
5757
continue 'main;
5858
}
5959
};
60-
while let Ok(task) = current_sim_context.requests.recv() {
61-
let sim_thread_wait_time = last_sim_finished.elapsed();
62-
let sim_start = Instant::now();
60+
while let Ok(cancellable_task) = current_sim_context.requests.recv() {
61+
if let Some(task) = cancellable_task.into_request() {
62+
let sim_thread_wait_time = last_sim_finished.elapsed();
63+
let sim_start = Instant::now();
6364

64-
let order_id = task.order.id();
65-
let start_time = Instant::now();
66-
let mut block_state = BlockState::new_arc(state_provider.clone());
67-
let sim_result = simulate_order(
68-
task.parents.clone(),
69-
task.order,
70-
&current_sim_context.block_ctx,
71-
&mut local_ctx,
72-
&mut block_state,
73-
);
74-
let sim_ok = match sim_result {
75-
Ok(sim_result) => {
76-
let sim_ok = match sim_result.result {
77-
OrderSimResult::Success(simulated_order, nonces_after) => {
78-
let result = SimulatedResult {
79-
id: task.id,
80-
simulated_order,
81-
previous_orders: task.parents,
82-
nonces_after: nonces_after
83-
.into_iter()
84-
.map(|(address, nonce)| NonceKey { address, nonce })
85-
.collect(),
86-
simulation_time: start_time.elapsed(),
87-
};
88-
current_sim_context
89-
.results
90-
.try_send(result)
91-
.unwrap_or_default();
92-
true
93-
}
94-
OrderSimResult::Failed(_) => false,
95-
};
96-
telemetry::inc_simulated_orders(sim_ok);
97-
telemetry::inc_simulation_gas_used(sim_result.gas_used);
98-
sim_ok
99-
}
100-
Err(err) => {
101-
error!(?err, ?order_id, "Critical error while simulating order");
102-
// @Metric
103-
break;
104-
}
105-
};
65+
let order_id = task.order.id();
66+
let start_time = Instant::now();
67+
let mut block_state = BlockState::new_arc(state_provider.clone());
68+
let sim_result = simulate_order(
69+
task.parents.clone(),
70+
task.order,
71+
&current_sim_context.block_ctx,
72+
&mut local_ctx,
73+
&mut block_state,
74+
);
75+
let sim_ok = match sim_result {
76+
Ok(sim_result) => {
77+
let sim_ok = match sim_result.result {
78+
OrderSimResult::Success(simulated_order, nonces_after) => {
79+
let result = SimulatedResult {
80+
id: task.id,
81+
simulated_order,
82+
previous_orders: task.parents,
83+
nonces_after: nonces_after
84+
.into_iter()
85+
.map(|(address, nonce)| NonceKey { address, nonce })
86+
.collect(),
87+
simulation_time: start_time.elapsed(),
88+
};
89+
current_sim_context
90+
.results
91+
.try_send(result)
92+
.unwrap_or_default();
93+
true
94+
}
95+
OrderSimResult::Failed(_) => false,
96+
};
97+
telemetry::inc_simulated_orders(sim_ok);
98+
telemetry::inc_simulation_gas_used(sim_result.gas_used);
99+
sim_ok
100+
}
101+
Err(err) => {
102+
error!(?err, ?order_id, "Critical error while simulating order");
103+
// @Metric
104+
break;
105+
}
106+
};
106107

107-
mark_order_simulation_end(order_id, sim_ok);
108-
last_sim_finished = Instant::now();
109-
let sim_thread_work_time = sim_start.elapsed();
110-
add_sim_thread_utilisation_timings(
111-
sim_thread_work_time,
112-
sim_thread_wait_time,
113-
worker_id,
114-
);
108+
mark_order_simulation_end(order_id, sim_ok);
109+
last_sim_finished = Instant::now();
110+
let sim_thread_work_time = sim_start.elapsed();
111+
add_sim_thread_utilisation_timings(
112+
sim_thread_work_time,
113+
sim_thread_wait_time,
114+
worker_id,
115+
);
116+
}
115117
}
116118
}
117119
}

crates/rbuilder/src/live_builder/simulation/simulation_job.rs

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1-
use std::{fmt, sync::Arc};
1+
use std::{
2+
fmt,
3+
sync::{
4+
atomic::{AtomicBool, Ordering},
5+
Arc,
6+
},
7+
};
28

39
use crate::{
4-
building::sim::{SimTree, SimulatedResult, SimulationRequest},
10+
building::sim::{CancellableSimulationRequest, SimTree, SimulatedResult},
511
live_builder::{
612
order_input::order_sink::OrderPoolCommand,
713
simulation::simulation_job_tracer::SimulationJobTracer,
814
},
915
};
10-
use ahash::HashSet;
16+
use ahash::{HashMap, HashSet};
1117
use alloy_primitives::utils::format_ether;
1218
use rbuilder_primitives::{BundleReplacementKey, Order, OrderId};
1319
use tokio::sync::mpsc;
@@ -32,7 +38,7 @@ pub struct SimulationJob {
3238
/// Input orders to be simulated
3339
new_order_sub: mpsc::UnboundedReceiver<OrderPoolCommand>,
3440
/// Here we send requests to the simulator pool
35-
sim_req_sender: flume::Sender<SimulationRequest>,
41+
sim_req_sender: flume::Sender<CancellableSimulationRequest>,
3642
/// Here we receive the results we asked to sim_req_sender
3743
sim_results_receiver: mpsc::Receiver<SimulatedResult>,
3844
/// Output of the simulations
@@ -50,7 +56,8 @@ pub struct SimulationJob {
5056

5157
/// Orders we got via new_order_sub and are still being processed (they could be inside the SimTree or in the sim queue)
5258
/// and were not cancelled.
53-
in_flight_orders: HashSet<OrderId>,
59+
/// When we remove the order we set the associated bool to false (which is checked before simulating)
60+
in_flight_orders: HashMap<OrderId, Arc<AtomicBool>>,
5461

5562
/// Orders for which we sent downstream SimulatedOrderCommand::Simulation but not SimulatedOrderCommand::Cancellation.
5663
/// We store them to avoid generating SimulatedOrderCommand::Cancellation for failed orders since they never generated
@@ -71,7 +78,7 @@ impl SimulationJob {
7178
pub fn new(
7279
block_cancellation: CancellationToken,
7380
new_order_sub: mpsc::UnboundedReceiver<OrderPoolCommand>,
74-
sim_req_sender: flume::Sender<SimulationRequest>,
81+
sim_req_sender: flume::Sender<CancellableSimulationRequest>,
7582
sim_results_receiver: mpsc::Receiver<SimulatedResult>,
7683
slot_sim_results_sender: mpsc::Sender<SimulatedOrderCommand>,
7784
sim_tree: SimTree,
@@ -144,40 +151,41 @@ impl SimulationJob {
144151
}
145152
}
146153

147-
/// Cancelled orders will return false
148-
fn order_still_valid(&self, order_id: &OrderId) -> bool {
149-
self.in_flight_orders.contains(order_id)
150-
}
151-
152154
/// Pops tasks from SimTree and sends them for simulation
153155
fn send_new_tasks_for_simulation(&mut self) {
154156
// submit sim tasks loop
155157
loop {
156-
let mut new_sim_request = self.sim_tree.pop_simulation_tasks(1024);
158+
let new_sim_request = self.sim_tree.pop_simulation_tasks(1024);
157159
if new_sim_request.is_empty() {
158160
break;
159161
}
160-
// filter out cancelled orders
161-
new_sim_request.retain(|s| self.order_still_valid(&s.order.id()));
162162

163163
for sim_request in new_sim_request {
164164
let order_id = sim_request.order.id();
165-
let delivered = match self.sim_req_sender.try_send(sim_request) {
166-
Ok(()) => true,
167-
Err(flume::TrySendError::Full(_)) => {
168-
warn!("Sim channel is full, dropping order");
169-
false
170-
// @Metric
171-
}
172-
Err(flume::TrySendError::Disconnected(_)) => {
173-
error!("Sim channel is closed, dropping order");
174-
false
175-
// @Metric
165+
if let Some(cancel_handle) = self.in_flight_orders.get(&order_id) {
166+
let delivered =
167+
match self
168+
.sim_req_sender
169+
.try_send(CancellableSimulationRequest::new(
170+
sim_request,
171+
cancel_handle.clone(),
172+
)) {
173+
Ok(()) => true,
174+
Err(flume::TrySendError::Full(_)) => {
175+
warn!("Sim channel is full, dropping order");
176+
false
177+
// @Metric
178+
}
179+
Err(flume::TrySendError::Disconnected(_)) => {
180+
error!("Sim channel is closed, dropping order");
181+
false
182+
// @Metric
183+
}
184+
};
185+
if !delivered {
186+
// Small bug, if a cancel arrives we are going to propagate it.
187+
self.in_flight_orders.remove(&order_id);
176188
}
177-
};
178-
if !delivered {
179-
// Small bug, if a cancel arrives we are going to propagate it.
180-
self.in_flight_orders.remove(&order_id);
181189
}
182190
}
183191
}
@@ -206,6 +214,7 @@ impl SimulationJob {
206214
if self
207215
.in_flight_orders
208216
.remove(&sim_result.simulated_order.id())
217+
.is_some()
209218
{
210219
valid_simulated_orders.push(sim_result.clone());
211220
// Only send if it's the first time.
@@ -260,8 +269,10 @@ impl SimulationJob {
260269

261270
/// return if everything went OK
262271
async fn process_order_cancellation(&mut self, cancellation_id: &OrderId) -> bool {
263-
if !self.in_flight_orders.remove(cancellation_id) {
264-
// if we removed from in_flight_orders it was never sent so there is no need to cancel
272+
if let Some(cancel_handle) = self.in_flight_orders.remove(cancellation_id) {
273+
cancel_handle.store(true, Ordering::Relaxed);
274+
} else {
275+
// Order was not in in_flight_orders (already simulated/sent), so forward the cancellation downstream.
265276
return self.send_cancel(cancellation_id).await;
266277
}
267278
true
@@ -280,7 +291,8 @@ impl SimulationJob {
280291
// @Metric
281292
return false;
282293
}
283-
self.in_flight_orders.insert(order_id);
294+
self.in_flight_orders
295+
.insert(order_id, Arc::new(AtomicBool::new(false)));
284296
true
285297
}
286298

0 commit comments

Comments
 (0)