Improve config options for missing and dropping db columns.
Implement actual drop functionality. Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -4,15 +4,12 @@ use std::{
|
||||
sync::{Arc, atomic::AtomicU32},
|
||||
};
|
||||
|
||||
use itertools::Itertools;
|
||||
use rocksdb::{ColumnFamilyDescriptor, Options};
|
||||
use tuwunel_core::{Result, debug, implement, info, warn};
|
||||
use tuwunel_core::{Result, debug, debug_warn, implement, info, warn};
|
||||
|
||||
use super::{
|
||||
Db, Engine,
|
||||
cf_opts::cf_options,
|
||||
context,
|
||||
db_opts::db_options,
|
||||
descriptor::{self, Descriptor},
|
||||
Db, Engine, cf_opts::cf_options, context, db_opts::db_options, descriptor::Descriptor,
|
||||
repair::repair,
|
||||
};
|
||||
use crate::{Context, or_else};
|
||||
@@ -31,7 +28,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
||||
&ctx.row_cache.lock().expect("row cache locked"),
|
||||
)?;
|
||||
|
||||
let cfds = Self::configure_cfds(&ctx, &db_opts, desc)?;
|
||||
let (cfds, dropped) = Self::configure_cfds(&ctx, &db_opts, desc)?;
|
||||
let num_cfds = cfds.len();
|
||||
debug!("Configured {num_cfds} column descriptors...");
|
||||
|
||||
@@ -50,6 +47,13 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
||||
}
|
||||
.or_else(or_else)?;
|
||||
|
||||
if !config.rocksdb_read_only && !config.rocksdb_secondary {
|
||||
for name in &dropped {
|
||||
debug!("Deleting dropped column {name:?} ...");
|
||||
db.drop_cf(name).or_else(or_else)?;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
columns = num_cfds,
|
||||
sequence = %db.latest_sequence_number(),
|
||||
@@ -74,59 +78,86 @@ fn configure_cfds(
|
||||
ctx: &Arc<Context>,
|
||||
db_opts: &Options,
|
||||
desc: &[Descriptor],
|
||||
) -> Result<Vec<ColumnFamilyDescriptor>> {
|
||||
) -> Result<(Vec<ColumnFamilyDescriptor>, Vec<String>)> {
|
||||
let server = &ctx.server;
|
||||
let config = &server.config;
|
||||
let path = &config.database_path;
|
||||
let existing = Self::discover_cfs(path, db_opts);
|
||||
|
||||
let creating = desc
|
||||
.iter()
|
||||
.filter(|desc| !existing.contains(desc.name));
|
||||
|
||||
// Found columns which are not described.
|
||||
let missing = existing
|
||||
.iter()
|
||||
.filter(|&name| name != "default")
|
||||
.filter(|&name| !desc.iter().any(|desc| desc.name == name));
|
||||
|
||||
// Described columns which are not found.
|
||||
let creating = desc
|
||||
.iter()
|
||||
.filter(|desc| !desc.dropped)
|
||||
.filter(|desc| !existing.contains(desc.name));
|
||||
|
||||
// Found columns which are described as dropped.
|
||||
let dropping = desc
|
||||
.iter()
|
||||
.filter(|desc| desc.dropped)
|
||||
.filter(|desc| existing.contains(desc.name))
|
||||
.filter(|_| !config.rocksdb_never_drop_columns);
|
||||
|
||||
// Described dropped columns which are no longer found.
|
||||
let dropped = desc
|
||||
.iter()
|
||||
.filter(|desc| desc.dropped)
|
||||
.filter(|desc| !existing.contains(desc.name));
|
||||
|
||||
debug!(
|
||||
existing = existing.len(),
|
||||
described = desc.len(),
|
||||
missing = missing.clone().count(),
|
||||
dropped = dropped.clone().count(),
|
||||
creating = creating.clone().count(),
|
||||
dropping = dropping.clone().count(),
|
||||
"Discovered database columns"
|
||||
);
|
||||
|
||||
missing.clone().for_each(|name| {
|
||||
debug!("Found unrecognized column {name:?} in existing database.");
|
||||
debug_warn!("Found undescribed column {name:?} in existing database.");
|
||||
});
|
||||
|
||||
dropped.map(|desc| desc.name).for_each(|name| {
|
||||
debug!("Previously dropped column {name:?} no longer found in database.");
|
||||
});
|
||||
|
||||
creating.map(|desc| desc.name).for_each(|name| {
|
||||
debug!("Creating new column {name:?} not previously found in existing database.");
|
||||
});
|
||||
|
||||
let missing_descriptors = missing
|
||||
dropping
|
||||
.clone()
|
||||
.filter(|_| config.rocksdb_drop_missing_columns)
|
||||
.map(|_| descriptor::DROPPED);
|
||||
.map(|desc| desc.name)
|
||||
.for_each(|name| {
|
||||
warn!(
|
||||
"Column {name:?} has been scheduled for deletion. Storage may not appear \
|
||||
reclaimed until further restart or compaction."
|
||||
);
|
||||
});
|
||||
|
||||
let cfopts: Vec<_> = desc
|
||||
.iter()
|
||||
.copied()
|
||||
.chain(missing_descriptors)
|
||||
.map(|ref desc| cf_options(ctx, db_opts.clone(), desc))
|
||||
.collect::<Result<_>>()?;
|
||||
let dropping_names: Vec<_> = dropping
|
||||
.clone()
|
||||
.map(|desc| desc.name)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect();
|
||||
|
||||
let cfds: Vec<_> = desc
|
||||
.iter()
|
||||
.map(|desc| desc.name)
|
||||
.map(ToOwned::to_owned)
|
||||
.chain(missing.cloned())
|
||||
.zip(cfopts.into_iter())
|
||||
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
|
||||
.collect();
|
||||
.filter(|desc| !desc.dropped)
|
||||
.chain(dropping)
|
||||
.copied()
|
||||
.inspect(|desc| debug!(name = desc.name, "Described column"))
|
||||
.map(|desc| Ok((desc.name.to_owned(), cf_options(ctx, db_opts.clone(), &desc)?)))
|
||||
.map_ok(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
Ok(cfds)
|
||||
Ok((cfds, dropping_names))
|
||||
}
|
||||
|
||||
#[implement(Engine)]
|
||||
|
||||
Reference in New Issue
Block a user