Add rocksdb event listener callbacks.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-03-08 12:29:14 +00:00
parent 57d4ae243a
commit 3fcfcafdd2
3 changed files with 246 additions and 1 deletions

View File

@@ -3,6 +3,7 @@ mod cf_opts;
pub(crate) mod context;
mod db_opts;
pub(crate) mod descriptor;
mod events;
mod files;
mod logger;
mod memory_usage;

View File

@@ -3,7 +3,7 @@ use std::{cmp, convert::TryFrom};
use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel};
use tuwunel_core::{Config, Result, utils};
use super::{cf_opts::cache_size_f64, logger::handle as handle_log};
use super::{cf_opts::cache_size_f64, events::Events, logger::handle as handle_log};
use crate::util::map_err;
/// Create database-wide options suitable for opening the database. This also
@@ -22,6 +22,7 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul
// Logging
set_logging_defaults(&mut opts, config);
opts.add_event_listener(Events::new(config, env));
// Processing
opts.set_max_background_jobs(num_threads::<i32>(config)?);

View File

@@ -0,0 +1,243 @@
use rocksdb::{
Env,
event_listener::{
CompactionJobInfo, DBBackgroundErrorReason, DBWriteStallCondition, EventListener,
FlushJobInfo, IngestionInfo, MemTableInfo, MutableStatus, SubcompactionJobInfo,
WriteStallInfo,
},
};
use tuwunel_core::{Config, debug, debug::INFO_SPAN_LEVEL, debug_info, error, info, warn};
pub(super) struct Events;
impl Events {
pub(super) fn new(_config: &Config, _env: &Env) -> Self { Self {} }
}
impl EventListener for Events {
#[tracing::instrument(name = "error", level = "error", skip_all)]
fn on_background_error(&self, reason: DBBackgroundErrorReason, _status: MutableStatus) {
error!(error = ?reason, "Critical RocksDB Error");
}
#[tracing::instrument(name = "stall", level = "warn", skip_all)]
fn on_stall_conditions_changed(&self, info: &WriteStallInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
let prev = info.prev();
match info.cur() {
| DBWriteStallCondition::KStopped => {
error!(?col, ?prev, "Database Stalled");
},
| DBWriteStallCondition::KDelayed if prev == DBWriteStallCondition::KStopped => {
warn!(?col, ?prev, "Database Stall Recovering");
},
| DBWriteStallCondition::KDelayed => {
warn!(?col, ?prev, "Database Stalling");
},
| DBWriteStallCondition::KNormal
if prev == DBWriteStallCondition::KStopped
|| prev == DBWriteStallCondition::KDelayed =>
{
info!(?col, ?prev, "Database Stall Recovered");
},
| DBWriteStallCondition::KNormal => {
debug!(?col, ?prev, "Database Normal");
},
}
}
#[tracing::instrument(
name = "compaction",
level = INFO_SPAN_LEVEL,
skip_all,
)]
fn on_compaction_begin(&self, info: &CompactionJobInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
let level = (info.base_input_level(), info.output_level());
let records = (info.input_records(), info.output_records());
let bytes = (info.total_input_bytes(), info.total_output_bytes());
let files = (
info.input_file_count(),
info.output_file_count(),
info.num_input_files_at_output_level(),
);
debug!(
status = ?info.status(),
?level,
?files,
?records,
?bytes,
micros = info.elapsed_micros(),
errs = info.num_corrupt_keys(),
reason = ?info.compaction_reason(),
?col,
"Compaction Starting",
);
}
#[tracing::instrument(
name = "compaction",
level = INFO_SPAN_LEVEL,
skip_all,
)]
fn on_compaction_completed(&self, info: &CompactionJobInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
let level = (info.base_input_level(), info.output_level());
let records = (info.input_records(), info.output_records());
let bytes = (info.total_input_bytes(), info.total_output_bytes());
let files = (
info.input_file_count(),
info.output_file_count(),
info.num_input_files_at_output_level(),
);
debug_info!(
status = ?info.status(),
?level,
?files,
?records,
?bytes,
micros = info.elapsed_micros(),
errs = info.num_corrupt_keys(),
reason = ?info.compaction_reason(),
?col,
"Compaction Complete",
);
}
#[tracing::instrument(name = "compaction", level = "debug", skip_all)]
fn on_subcompaction_begin(&self, info: &SubcompactionJobInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
let level = (info.base_input_level(), info.output_level());
debug!(
status = ?info.status(),
?level,
tid = info.thread_id(),
reason = ?info.compaction_reason(),
?col,
"Compaction Starting",
);
}
#[tracing::instrument(name = "compaction", level = "debug", skip_all)]
fn on_subcompaction_completed(&self, info: &SubcompactionJobInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
let level = (info.base_input_level(), info.output_level());
debug!(
status = ?info.status(),
?level,
tid = info.thread_id(),
reason = ?info.compaction_reason(),
?col,
"Compaction Complete",
);
}
#[tracing::instrument(
name = "flush",
level = INFO_SPAN_LEVEL,
skip_all,
)]
fn on_flush_begin(&self, info: &FlushJobInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
debug!(
seq_start = info.smallest_seqno(),
seq_end = info.largest_seqno(),
slow = info.triggered_writes_slowdown(),
stop = info.triggered_writes_stop(),
reason = ?info.flush_reason(),
?col,
"Flush Starting",
);
}
#[tracing::instrument(
name = "flush",
level = INFO_SPAN_LEVEL,
skip_all,
)]
fn on_flush_completed(&self, info: &FlushJobInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
debug_info!(
seq_start = info.smallest_seqno(),
seq_end = info.largest_seqno(),
slow = info.triggered_writes_slowdown(),
stop = info.triggered_writes_stop(),
reason = ?info.flush_reason(),
?col,
"Flush Complete",
);
}
#[tracing::instrument(
name = "memtable",
level = INFO_SPAN_LEVEL,
skip_all,
)]
fn on_memtable_sealed(&self, info: &MemTableInfo) {
let col = info.cf_name();
let col = col
.as_deref()
.map(str::from_utf8)
.expect("column has a name")
.expect("column name is valid utf8");
debug_info!(
seq_first = info.first_seqno(),
seq_early = info.earliest_seqno(),
ents = info.num_entries(),
dels = info.num_deletes(),
?col,
"Buffer Filled",
);
}
fn on_external_file_ingested(&self, _info: &IngestionInfo) {
unimplemented!();
}
}