Improve config options for missing and dropping db columns.
Implement actual drop functionality. Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5079,6 +5079,7 @@ dependencies = [
|
|||||||
"criterion",
|
"criterion",
|
||||||
"ctor",
|
"ctor",
|
||||||
"futures",
|
"futures",
|
||||||
|
"itertools 0.14.0",
|
||||||
"log",
|
"log",
|
||||||
"minicbor",
|
"minicbor",
|
||||||
"minicbor-serde",
|
"minicbor-serde",
|
||||||
|
|||||||
@@ -1288,19 +1288,20 @@ pub struct Config {
|
|||||||
#[serde(default = "default_rocksdb_stats_level")]
|
#[serde(default = "default_rocksdb_stats_level")]
|
||||||
pub rocksdb_stats_level: u8,
|
pub rocksdb_stats_level: u8,
|
||||||
|
|
||||||
/// Erases data no longer reachable in the current schema. The developers
|
/// Ignores the list of dropped columns set by developers.
|
||||||
/// expect this to be set to true which simplifies the schema and prevents
|
|
||||||
/// accumulation of old schemas remaining in the codebase forever. If this
|
|
||||||
/// is set to false, old columns which are not described in the current
|
|
||||||
/// schema will be ignored rather than erased, leaking their space.
|
|
||||||
///
|
///
|
||||||
/// This can be set to false when moving between versions in ways which are
|
/// This should be set to true when knowingly moving between versions in
|
||||||
/// not recommended or otherwise forbidden, or for diagnostic and
|
/// ways which are not recommended or otherwise forbidden, or for
|
||||||
/// development purposes; requiring preservation across such movements.
|
/// diagnostic and development purposes; requiring preservation across such
|
||||||
|
/// movements.
|
||||||
///
|
///
|
||||||
/// default: true
|
/// The developer's list of dropped columns is meant to safely reduce space
|
||||||
#[serde(default = "true_fn")]
|
/// by erasing data no longer in use. If this is set to true that storage
|
||||||
pub rocksdb_drop_missing_columns: bool,
|
/// will not be reclaimed as intended.
|
||||||
|
///
|
||||||
|
/// default: false
|
||||||
|
#[serde(default)]
|
||||||
|
pub rocksdb_never_drop_columns: bool,
|
||||||
|
|
||||||
/// This is a password that can be configured that will let you login to the
|
/// This is a password that can be configured that will let you login to the
|
||||||
/// server bot account (currently `@conduit`) for emergency troubleshooting
|
/// server bot account (currently `@conduit`) for emergency troubleshooting
|
||||||
|
|||||||
@@ -57,6 +57,7 @@ async-channel.workspace = true
|
|||||||
const-str.workspace = true
|
const-str.workspace = true
|
||||||
ctor.workspace = true
|
ctor.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
|
itertools.workspace = true
|
||||||
log.workspace = true
|
log.workspace = true
|
||||||
minicbor.workspace = true
|
minicbor.workspace = true
|
||||||
minicbor-serde.workspace = true
|
minicbor-serde.workspace = true
|
||||||
|
|||||||
@@ -4,15 +4,12 @@ use std::{
|
|||||||
sync::{Arc, atomic::AtomicU32},
|
sync::{Arc, atomic::AtomicU32},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use itertools::Itertools;
|
||||||
use rocksdb::{ColumnFamilyDescriptor, Options};
|
use rocksdb::{ColumnFamilyDescriptor, Options};
|
||||||
use tuwunel_core::{Result, debug, implement, info, warn};
|
use tuwunel_core::{Result, debug, debug_warn, implement, info, warn};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
Db, Engine,
|
Db, Engine, cf_opts::cf_options, context, db_opts::db_options, descriptor::Descriptor,
|
||||||
cf_opts::cf_options,
|
|
||||||
context,
|
|
||||||
db_opts::db_options,
|
|
||||||
descriptor::{self, Descriptor},
|
|
||||||
repair::repair,
|
repair::repair,
|
||||||
};
|
};
|
||||||
use crate::{Context, or_else};
|
use crate::{Context, or_else};
|
||||||
@@ -31,7 +28,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
|||||||
&ctx.row_cache.lock().expect("row cache locked"),
|
&ctx.row_cache.lock().expect("row cache locked"),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let cfds = Self::configure_cfds(&ctx, &db_opts, desc)?;
|
let (cfds, dropped) = Self::configure_cfds(&ctx, &db_opts, desc)?;
|
||||||
let num_cfds = cfds.len();
|
let num_cfds = cfds.len();
|
||||||
debug!("Configured {num_cfds} column descriptors...");
|
debug!("Configured {num_cfds} column descriptors...");
|
||||||
|
|
||||||
@@ -50,6 +47,13 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
|||||||
}
|
}
|
||||||
.or_else(or_else)?;
|
.or_else(or_else)?;
|
||||||
|
|
||||||
|
if !config.rocksdb_read_only && !config.rocksdb_secondary {
|
||||||
|
for name in &dropped {
|
||||||
|
debug!("Deleting dropped column {name:?} ...");
|
||||||
|
db.drop_cf(name).or_else(or_else)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
columns = num_cfds,
|
columns = num_cfds,
|
||||||
sequence = %db.latest_sequence_number(),
|
sequence = %db.latest_sequence_number(),
|
||||||
@@ -74,59 +78,86 @@ fn configure_cfds(
|
|||||||
ctx: &Arc<Context>,
|
ctx: &Arc<Context>,
|
||||||
db_opts: &Options,
|
db_opts: &Options,
|
||||||
desc: &[Descriptor],
|
desc: &[Descriptor],
|
||||||
) -> Result<Vec<ColumnFamilyDescriptor>> {
|
) -> Result<(Vec<ColumnFamilyDescriptor>, Vec<String>)> {
|
||||||
let server = &ctx.server;
|
let server = &ctx.server;
|
||||||
let config = &server.config;
|
let config = &server.config;
|
||||||
let path = &config.database_path;
|
let path = &config.database_path;
|
||||||
let existing = Self::discover_cfs(path, db_opts);
|
let existing = Self::discover_cfs(path, db_opts);
|
||||||
|
|
||||||
let creating = desc
|
// Found columns which are not described.
|
||||||
.iter()
|
|
||||||
.filter(|desc| !existing.contains(desc.name));
|
|
||||||
|
|
||||||
let missing = existing
|
let missing = existing
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|&name| name != "default")
|
.filter(|&name| name != "default")
|
||||||
.filter(|&name| !desc.iter().any(|desc| desc.name == name));
|
.filter(|&name| !desc.iter().any(|desc| desc.name == name));
|
||||||
|
|
||||||
|
// Described columns which are not found.
|
||||||
|
let creating = desc
|
||||||
|
.iter()
|
||||||
|
.filter(|desc| !desc.dropped)
|
||||||
|
.filter(|desc| !existing.contains(desc.name));
|
||||||
|
|
||||||
|
// Found columns which are described as dropped.
|
||||||
|
let dropping = desc
|
||||||
|
.iter()
|
||||||
|
.filter(|desc| desc.dropped)
|
||||||
|
.filter(|desc| existing.contains(desc.name))
|
||||||
|
.filter(|_| !config.rocksdb_never_drop_columns);
|
||||||
|
|
||||||
|
// Described dropped columns which are no longer found.
|
||||||
|
let dropped = desc
|
||||||
|
.iter()
|
||||||
|
.filter(|desc| desc.dropped)
|
||||||
|
.filter(|desc| !existing.contains(desc.name));
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
existing = existing.len(),
|
existing = existing.len(),
|
||||||
described = desc.len(),
|
described = desc.len(),
|
||||||
missing = missing.clone().count(),
|
missing = missing.clone().count(),
|
||||||
|
dropped = dropped.clone().count(),
|
||||||
creating = creating.clone().count(),
|
creating = creating.clone().count(),
|
||||||
|
dropping = dropping.clone().count(),
|
||||||
"Discovered database columns"
|
"Discovered database columns"
|
||||||
);
|
);
|
||||||
|
|
||||||
missing.clone().for_each(|name| {
|
missing.clone().for_each(|name| {
|
||||||
debug!("Found unrecognized column {name:?} in existing database.");
|
debug_warn!("Found undescribed column {name:?} in existing database.");
|
||||||
|
});
|
||||||
|
|
||||||
|
dropped.map(|desc| desc.name).for_each(|name| {
|
||||||
|
debug!("Previously dropped column {name:?} no longer found in database.");
|
||||||
});
|
});
|
||||||
|
|
||||||
creating.map(|desc| desc.name).for_each(|name| {
|
creating.map(|desc| desc.name).for_each(|name| {
|
||||||
debug!("Creating new column {name:?} not previously found in existing database.");
|
debug!("Creating new column {name:?} not previously found in existing database.");
|
||||||
});
|
});
|
||||||
|
|
||||||
let missing_descriptors = missing
|
dropping
|
||||||
.clone()
|
.clone()
|
||||||
.filter(|_| config.rocksdb_drop_missing_columns)
|
.map(|desc| desc.name)
|
||||||
.map(|_| descriptor::DROPPED);
|
.for_each(|name| {
|
||||||
|
warn!(
|
||||||
|
"Column {name:?} has been scheduled for deletion. Storage may not appear \
|
||||||
|
reclaimed until further restart or compaction."
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
let cfopts: Vec<_> = desc
|
let dropping_names: Vec<_> = dropping
|
||||||
.iter()
|
.clone()
|
||||||
.copied()
|
.map(|desc| desc.name)
|
||||||
.chain(missing_descriptors)
|
.map(ToOwned::to_owned)
|
||||||
.map(|ref desc| cf_options(ctx, db_opts.clone(), desc))
|
.collect();
|
||||||
.collect::<Result<_>>()?;
|
|
||||||
|
|
||||||
let cfds: Vec<_> = desc
|
let cfds: Vec<_> = desc
|
||||||
.iter()
|
.iter()
|
||||||
.map(|desc| desc.name)
|
.filter(|desc| !desc.dropped)
|
||||||
.map(ToOwned::to_owned)
|
.chain(dropping)
|
||||||
.chain(missing.cloned())
|
.copied()
|
||||||
.zip(cfopts.into_iter())
|
.inspect(|desc| debug!(name = desc.name, "Described column"))
|
||||||
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
|
.map(|desc| Ok((desc.name.to_owned(), cf_options(ctx, db_opts.clone(), &desc)?)))
|
||||||
.collect();
|
.map_ok(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
|
||||||
|
.collect::<Result<_>>()?;
|
||||||
|
|
||||||
Ok(cfds)
|
Ok((cfds, dropping_names))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Engine)]
|
#[implement(Engine)]
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ pub(super) fn open(engine: &Arc<Engine>) -> Result<Maps> { open_list(engine, MAP
|
|||||||
#[tracing::instrument(name = "maps", level = "debug", skip_all)]
|
#[tracing::instrument(name = "maps", level = "debug", skip_all)]
|
||||||
pub(super) fn open_list(engine: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
pub(super) fn open_list(engine: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
|
||||||
maps.iter()
|
maps.iter()
|
||||||
|
.filter(|desc| !desc.dropped)
|
||||||
.map(|desc| Ok((desc.name, Map::open(engine, desc.name)?)))
|
.map(|desc| Ok((desc.name, Map::open(engine, desc.name)?)))
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
@@ -165,6 +166,10 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||||||
name: "roomid_joinedcount",
|
name: "roomid_joinedcount",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
},
|
},
|
||||||
|
Descriptor {
|
||||||
|
name: "roomid_maxremotepowerlevel",
|
||||||
|
..descriptor::RANDOM_SMALL
|
||||||
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "roomid_pduleaves",
|
name: "roomid_pduleaves",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
@@ -447,8 +452,4 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||||||
name: "userroomid_notificationcount",
|
name: "userroomid_notificationcount",
|
||||||
..descriptor::RANDOM
|
..descriptor::RANDOM
|
||||||
},
|
},
|
||||||
Descriptor {
|
|
||||||
name: "roomid_maxremotepowerlevel",
|
|
||||||
..descriptor::RANDOM_SMALL
|
|
||||||
},
|
|
||||||
];
|
];
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
#![type_length_limit = "8192"]
|
#![type_length_limit = "65536"]
|
||||||
|
|
||||||
extern crate rust_rocksdb as rocksdb;
|
extern crate rust_rocksdb as rocksdb;
|
||||||
|
|
||||||
|
|||||||
@@ -1091,17 +1091,18 @@
|
|||||||
#
|
#
|
||||||
#rocksdb_stats_level = 1
|
#rocksdb_stats_level = 1
|
||||||
|
|
||||||
# Erases data no longer reachable in the current schema. The developers
|
# Ignores the list of dropped columns set by developers.
|
||||||
# expect this to be set to true which simplifies the schema and prevents
|
|
||||||
# accumulation of old schemas remaining in the codebase forever. If this
|
|
||||||
# is set to false, old columns which are not described in the current
|
|
||||||
# schema will be ignored rather than erased, leaking their space.
|
|
||||||
#
|
#
|
||||||
# This can be set to false when moving between versions in ways which are
|
# This should be set to true when knowingly moving between versions in
|
||||||
# not recommended or otherwise forbidden, or for diagnostic and
|
# ways which are not recommended or otherwise forbidden, or for
|
||||||
# development purposes; requiring preservation across such movements.
|
# diagnostic and development purposes; requiring preservation across such
|
||||||
|
# movements.
|
||||||
#
|
#
|
||||||
#rocksdb_drop_missing_columns = true
|
# The developer's list of dropped columns is meant to safely reduce space
|
||||||
|
# by erasing data no longer in use. If this is set to true that storage
|
||||||
|
# will not be reclaimed as intended.
|
||||||
|
#
|
||||||
|
#rocksdb_never_drop_columns = false
|
||||||
|
|
||||||
# This is a password that can be configured that will let you login to the
|
# This is a password that can be configured that will let you login to the
|
||||||
# server bot account (currently `@conduit`) for emergency troubleshooting
|
# server bot account (currently `@conduit`) for emergency troubleshooting
|
||||||
|
|||||||
Reference in New Issue
Block a user