Unbox and pin database streams.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-24 09:12:14 +00:00
parent 98affbdeaf
commit 71f3ccf140
26 changed files with 217 additions and 166 deletions

View File

@@ -1,6 +1,9 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use futures::{Future, FutureExt, TryFutureExt, future::ready};
use futures::{
Future, FutureExt, TryFutureExt,
future::{Either, ready},
};
use rocksdb::{DBPinnableSlice, ReadOptions};
use tokio::task;
use tuwunel_core::{Err, Result, err, implement, utils::result::MapExpect};
@@ -25,9 +28,9 @@ where
let cached = self.get_cached(key);
if matches!(cached, Err(_) | Ok(Some(_))) {
return task::consume_budget()
.map(move |()| cached.map_expect("data found in cache"))
.boxed();
return Either::Left(
task::consume_budget().map(move |()| cached.map_expect("data found in cache")),
);
}
debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
@@ -37,11 +40,12 @@ where
res: None,
};
self.engine
.pool
.execute_get(cmd)
.and_then(|mut res| ready(res.remove(0)))
.boxed()
Either::Right(
self.engine
.pool
.execute_get(cmd)
.and_then(|mut res| ready(res.remove(0))),
)
}
/// Fetch a value from the cache without I/O.

View File

@@ -50,12 +50,12 @@ where
.widen_then(automatic_width(), |chunk| {
self.engine.pool.execute_get(Get {
map: self.clone(),
res: None,
key: chunk
.iter()
.map(AsRef::as_ref)
.map(Into::into)
.collect(),
res: None,
})
})
.map_ok(|results| results.into_iter().stream())

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::Deserialize;
use tokio::task;
@@ -27,11 +27,12 @@ pub fn raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_fwd(None);
return task::consume_budget()
.map(move |()| stream::Keys::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
return Either::Left(
task::consume_budget()
.map(move |()| stream::Keys::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -42,11 +43,12 @@ pub fn raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()
.into_stream()
.try_flatten(),
)
}

View File

@@ -1,8 +1,9 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::{Deserialize, Serialize};
use tokio::task;
use tuwunel_core::{Result, implement};
use super::stream_from::is_cached;
@@ -64,7 +65,13 @@ where
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed();
let state = state.init_fwd(from.as_ref().into());
return Either::Left(
task::consume_budget()
.map(move |()| stream::Keys::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -75,11 +82,12 @@ where
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()
.into_stream()
.try_flatten(),
)
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::Deserialize;
use tokio::task;
@@ -27,11 +27,12 @@ pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + S
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_rev(None);
return task::consume_budget()
.map(move |()| stream::KeysRev::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
return Either::Left(
task::consume_budget()
.map(move |()| stream::KeysRev::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -42,11 +43,12 @@ pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + S
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()
.into_stream()
.try_flatten(),
)
}

View File

@@ -1,8 +1,9 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::{Deserialize, Serialize};
use tokio::task;
use tuwunel_core::{Result, implement};
use super::rev_stream_from::is_cached;
@@ -64,7 +65,13 @@ where
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
let state = state.init_rev(from.as_ref().into());
return Either::Left(
task::consume_budget()
.map(move |()| stream::KeysRev::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -75,11 +82,12 @@ where
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()
.into_stream()
.try_flatten(),
)
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::Deserialize;
use tokio::task;
@@ -35,11 +35,12 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_rev(None);
return task::consume_budget()
.map(move |()| stream::ItemsRev::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
return Either::Left(
task::consume_budget()
.map(move |()| stream::ItemsRev::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -50,13 +51,14 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
.into_stream()
.try_flatten(),
)
}
#[tracing::instrument(

View File

@@ -1,6 +1,6 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::{Deserialize, Serialize};
use tokio::task;
@@ -84,11 +84,12 @@ where
let state = stream::State::new(self, opts);
if is_cached(self, from) {
let state = state.init_rev(from.as_ref().into());
return task::consume_budget()
.map(move |()| stream::ItemsRev::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
return Either::Left(
task::consume_budget()
.map(move |()| stream::ItemsRev::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -99,13 +100,14 @@ where
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
.into_stream()
.try_flatten(),
)
}
#[tracing::instrument(

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::Deserialize;
use tokio::task;
@@ -35,11 +35,12 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_fwd(None);
return task::consume_budget()
.map(move |()| stream::Items::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
return Either::Left(
task::consume_budget()
.map(move |()| stream::Items::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -50,13 +51,14 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
.into_stream()
.try_flatten(),
)
}
#[tracing::instrument(

View File

@@ -1,6 +1,6 @@
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
use rocksdb::Direction;
use serde::{Deserialize, Serialize};
use tokio::task;
@@ -83,11 +83,12 @@ where
let state = stream::State::new(self, opts);
if is_cached(self, from) {
let state = state.init_fwd(from.as_ref().into());
return task::consume_budget()
.map(move |()| stream::Items::<'_>::from(state))
.into_stream()
.flatten()
.boxed();
return Either::Left(
task::consume_budget()
.map(move |()| stream::Items::<'_>::from(state))
.into_stream()
.flatten(),
);
}
let seek = Seek {
@@ -98,13 +99,14 @@ where
res: None,
};
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
.into_stream()
.try_flatten()
.boxed()
Either::Right(
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
.into_stream()
.try_flatten(),
)
}
#[tracing::instrument(

View File

@@ -22,7 +22,7 @@ pub(crate) struct State<'a> {
init: bool,
}
pub(crate) trait Cursor<'a, T> {
pub(crate) trait Cursor<'a, T>: Send {
fn state(&self) -> &State<'a>;
fn fetch(&self) -> Option<T>;
@@ -50,12 +50,12 @@ impl<'a> State<'a> {
#[inline]
pub(super) fn new(map: &'a Arc<Map>, opts: ReadOptions) -> Self {
Self {
init: true,
seek: false,
inner: map
.engine()
.db
.raw_iterator_cf_opt(&map.cf(), opts),
init: true,
seek: false,
}
}