add conf item to toggle startup netburst (for developers).
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -90,6 +90,7 @@ pub struct Service {
|
||||
pub(super) maximum_requests: Arc<Semaphore>,
|
||||
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
|
||||
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
|
||||
startup_netburst: bool,
|
||||
}
|
||||
|
||||
enum TransactionStatus {
|
||||
@@ -106,6 +107,7 @@ impl Service {
|
||||
sender,
|
||||
receiver: Mutex::new(receiver),
|
||||
maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)),
|
||||
startup_netburst: config.startup_netburst,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -123,29 +125,29 @@ impl Service {
|
||||
let mut receiver = self.receiver.lock().await;
|
||||
|
||||
let mut futures = FuturesUnordered::new();
|
||||
|
||||
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
|
||||
|
||||
// Retry requests we could not finish yet
|
||||
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
|
||||
if self.startup_netburst {
|
||||
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
|
||||
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
|
||||
let entry = initial_transactions
|
||||
.entry(outgoing_kind.clone())
|
||||
.or_default();
|
||||
|
||||
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
|
||||
let entry = initial_transactions
|
||||
.entry(outgoing_kind.clone())
|
||||
.or_default();
|
||||
if entry.len() > 30 {
|
||||
warn!("Dropping some current events: {:?} {:?} {:?}", key, outgoing_kind, event);
|
||||
self.db.delete_active_request(key)?;
|
||||
continue;
|
||||
}
|
||||
|
||||
if entry.len() > 30 {
|
||||
warn!("Dropping some current events: {:?} {:?} {:?}", key, outgoing_kind, event);
|
||||
self.db.delete_active_request(key)?;
|
||||
continue;
|
||||
entry.push(event);
|
||||
}
|
||||
|
||||
entry.push(event);
|
||||
}
|
||||
|
||||
for (outgoing_kind, events) in initial_transactions {
|
||||
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
||||
futures.push(Self::handle_events(outgoing_kind.clone(), events));
|
||||
for (outgoing_kind, events) in initial_transactions {
|
||||
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
||||
futures.push(Self::handle_events(outgoing_kind.clone(), events));
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
|
||||
Reference in New Issue
Block a user