finished zero-copy migration!
now the entire networking and persistence stack is zero-copy with single-allocation, single-copy reads. Closes #128 Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
@@ -30,7 +30,7 @@ use crate::networking::{
|
|||||||
pub const BLOB_THRESHOLD: usize = 64 * 1024;
|
pub const BLOB_THRESHOLD: usize = 64 * 1024;
|
||||||
|
|
||||||
/// Hash type for blob references
|
/// Hash type for blob references
|
||||||
pub type BlobHash = Vec<u8>;
|
pub type BlobHash = bytes::Bytes;
|
||||||
|
|
||||||
/// Bevy resource for managing blobs
|
/// Bevy resource for managing blobs
|
||||||
///
|
///
|
||||||
@@ -40,7 +40,7 @@ pub type BlobHash = Vec<u8>;
|
|||||||
#[derive(Resource, Clone)]
|
#[derive(Resource, Clone)]
|
||||||
pub struct BlobStore {
|
pub struct BlobStore {
|
||||||
/// In-memory cache of blobs (hash -> data)
|
/// In-memory cache of blobs (hash -> data)
|
||||||
cache: Arc<Mutex<HashMap<BlobHash, Vec<u8>>>>,
|
cache: Arc<Mutex<HashMap<BlobHash, bytes::Bytes>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlobStore {
|
impl BlobStore {
|
||||||
@@ -72,7 +72,7 @@ impl BlobStore {
|
|||||||
self.cache
|
self.cache
|
||||||
.lock()
|
.lock()
|
||||||
.map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))?
|
.map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))?
|
||||||
.insert(hash.clone(), data);
|
.insert(hash.clone(), bytes::Bytes::from(data));
|
||||||
|
|
||||||
Ok(hash)
|
Ok(hash)
|
||||||
}
|
}
|
||||||
@@ -80,7 +80,7 @@ impl BlobStore {
|
|||||||
/// Retrieve a blob by its hash
|
/// Retrieve a blob by its hash
|
||||||
///
|
///
|
||||||
/// Returns `None` if the blob is not in the cache.
|
/// Returns `None` if the blob is not in the cache.
|
||||||
pub fn get_blob(&self, hash: &BlobHash) -> Result<Option<Vec<u8>>> {
|
pub fn get_blob(&self, hash: &BlobHash) -> Result<Option<bytes::Bytes>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.cache
|
.cache
|
||||||
.lock()
|
.lock()
|
||||||
@@ -104,7 +104,7 @@ impl BlobStore {
|
|||||||
///
|
///
|
||||||
/// This is safer than calling `has_blob()` followed by `get_blob()` because
|
/// This is safer than calling `has_blob()` followed by `get_blob()` because
|
||||||
/// it's atomic - the blob can't be removed between the check and get.
|
/// it's atomic - the blob can't be removed between the check and get.
|
||||||
pub fn get_blob_if_exists(&self, hash: &BlobHash) -> Result<Option<Vec<u8>>> {
|
pub fn get_blob_if_exists(&self, hash: &BlobHash) -> Result<Option<bytes::Bytes>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.cache
|
.cache
|
||||||
.lock()
|
.lock()
|
||||||
@@ -142,7 +142,7 @@ impl BlobStore {
|
|||||||
|
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
hasher.update(data);
|
hasher.update(data);
|
||||||
hasher.finalize().to_vec()
|
bytes::Bytes::from(hasher.finalize().to_vec())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,11 +192,11 @@ pub fn should_use_blob(data: &[u8]) -> bool {
|
|||||||
/// let large_data = vec![0u8; 100_000];
|
/// let large_data = vec![0u8; 100_000];
|
||||||
/// let component_data = create_component_data(large_data, &store).unwrap();
|
/// let component_data = create_component_data(large_data, &store).unwrap();
|
||||||
/// ```
|
/// ```
|
||||||
pub fn create_component_data(data: Vec<u8>, blob_store: &BlobStore) -> Result<ComponentData> {
|
pub fn create_component_data(data: bytes::Bytes, blob_store: &BlobStore) -> Result<ComponentData> {
|
||||||
if should_use_blob(&data) {
|
if should_use_blob(&data) {
|
||||||
let size = data.len() as u64;
|
let size = data.len() as u64;
|
||||||
let hash = blob_store.store_blob(data)?;
|
let hash = blob_store.store_blob(data.to_vec())?;
|
||||||
Ok(ComponentData::BlobRef { hash, size })
|
Ok(ComponentData::BlobRef { hash: bytes::Bytes::from(hash), size })
|
||||||
} else {
|
} else {
|
||||||
Ok(ComponentData::Inline(data))
|
Ok(ComponentData::Inline(data))
|
||||||
}
|
}
|
||||||
@@ -218,11 +218,11 @@ pub fn create_component_data(data: Vec<u8>, blob_store: &BlobStore) -> Result<Co
|
|||||||
/// let store = BlobStore::new();
|
/// let store = BlobStore::new();
|
||||||
///
|
///
|
||||||
/// // Inline data
|
/// // Inline data
|
||||||
/// let inline = ComponentData::Inline(vec![1, 2, 3]);
|
/// let inline = ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3]));
|
||||||
/// let data = get_component_data(&inline, &store).unwrap();
|
/// let data = get_component_data(&inline, &store).unwrap();
|
||||||
/// assert_eq!(data, vec![1, 2, 3]);
|
/// assert_eq!(data, vec![1, 2, 3]);
|
||||||
/// ```
|
/// ```
|
||||||
pub fn get_component_data(data: &ComponentData, blob_store: &BlobStore) -> Result<Vec<u8>> {
|
pub fn get_component_data(data: &ComponentData, blob_store: &BlobStore) -> Result<bytes::Bytes> {
|
||||||
match data {
|
match data {
|
||||||
| ComponentData::Inline(bytes) => Ok(bytes.clone()),
|
| ComponentData::Inline(bytes) => Ok(bytes.clone()),
|
||||||
| ComponentData::BlobRef { hash, size: _ } => blob_store
|
| ComponentData::BlobRef { hash, size: _ } => blob_store
|
||||||
@@ -268,7 +268,7 @@ mod tests {
|
|||||||
let hash = store.store_blob(data.clone()).unwrap();
|
let hash = store.store_blob(data.clone()).unwrap();
|
||||||
let retrieved = store.get_blob(&hash).unwrap();
|
let retrieved = store.get_blob(&hash).unwrap();
|
||||||
|
|
||||||
assert_eq!(retrieved, Some(data));
|
assert_eq!(retrieved, Some(bytes::Bytes::from(data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -291,7 +291,7 @@ mod tests {
|
|||||||
assert!(store.has_blob(&hash).unwrap());
|
assert!(store.has_blob(&hash).unwrap());
|
||||||
|
|
||||||
let fake_hash = vec![0; 32];
|
let fake_hash = vec![0; 32];
|
||||||
assert!(!store.has_blob(&fake_hash).unwrap());
|
assert!(!store.has_blob(&bytes::Bytes::from(fake_hash)).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -326,7 +326,7 @@ mod tests {
|
|||||||
let store = BlobStore::new();
|
let store = BlobStore::new();
|
||||||
let small_data = vec![1, 2, 3];
|
let small_data = vec![1, 2, 3];
|
||||||
|
|
||||||
let component_data = create_component_data(small_data.clone(), &store).unwrap();
|
let component_data = create_component_data(bytes::Bytes::from(small_data.clone()), &store).unwrap();
|
||||||
|
|
||||||
match component_data {
|
match component_data {
|
||||||
| ComponentData::Inline(data) => assert_eq!(data, small_data),
|
| ComponentData::Inline(data) => assert_eq!(data, small_data),
|
||||||
@@ -339,7 +339,7 @@ mod tests {
|
|||||||
let store = BlobStore::new();
|
let store = BlobStore::new();
|
||||||
let large_data = vec![0u8; 100_000];
|
let large_data = vec![0u8; 100_000];
|
||||||
|
|
||||||
let component_data = create_component_data(large_data.clone(), &store).unwrap();
|
let component_data = create_component_data(bytes::Bytes::from(large_data.clone()), &store).unwrap();
|
||||||
|
|
||||||
match component_data {
|
match component_data {
|
||||||
| ComponentData::BlobRef { hash, size } => {
|
| ComponentData::BlobRef { hash, size } => {
|
||||||
@@ -353,7 +353,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_get_component_data_inline() {
|
fn test_get_component_data_inline() {
|
||||||
let store = BlobStore::new();
|
let store = BlobStore::new();
|
||||||
let inline = ComponentData::Inline(vec![1, 2, 3]);
|
let inline = ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3]));
|
||||||
|
|
||||||
let data = get_component_data(&inline, &store).unwrap();
|
let data = get_component_data(&inline, &store).unwrap();
|
||||||
assert_eq!(data, vec![1, 2, 3]);
|
assert_eq!(data, vec![1, 2, 3]);
|
||||||
@@ -380,7 +380,7 @@ mod tests {
|
|||||||
let fake_hash = vec![0; 32];
|
let fake_hash = vec![0; 32];
|
||||||
|
|
||||||
let blob_ref = ComponentData::BlobRef {
|
let blob_ref = ComponentData::BlobRef {
|
||||||
hash: fake_hash,
|
hash: bytes::Bytes::from(fake_hash),
|
||||||
size: 1000,
|
size: 1000,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ use crate::networking::{
|
|||||||
pub fn build_join_request(
|
pub fn build_join_request(
|
||||||
node_id: uuid::Uuid,
|
node_id: uuid::Uuid,
|
||||||
session_id: SessionId,
|
session_id: SessionId,
|
||||||
session_secret: Option<Vec<u8>>,
|
session_secret: Option<bytes::Bytes>,
|
||||||
last_known_clock: Option<VectorClock>,
|
last_known_clock: Option<VectorClock>,
|
||||||
join_type: JoinType,
|
join_type: JoinType,
|
||||||
) -> VersionedMessage {
|
) -> VersionedMessage {
|
||||||
@@ -442,7 +442,7 @@ mod tests {
|
|||||||
let request = build_join_request(
|
let request = build_join_request(
|
||||||
node_id,
|
node_id,
|
||||||
session_id.clone(),
|
session_id.clone(),
|
||||||
Some(secret.clone()),
|
Some(bytes::Bytes::from(secret.clone())),
|
||||||
None,
|
None,
|
||||||
JoinType::Fresh,
|
JoinType::Fresh,
|
||||||
);
|
);
|
||||||
@@ -456,7 +456,7 @@ mod tests {
|
|||||||
join_type,
|
join_type,
|
||||||
} => {
|
} => {
|
||||||
assert_eq!(req_session_id, session_id);
|
assert_eq!(req_session_id, session_id);
|
||||||
assert_eq!(session_secret, Some(secret));
|
assert_eq!(session_secret, Some(bytes::Bytes::from(secret)));
|
||||||
assert!(last_known_clock.is_none());
|
assert!(last_known_clock.is_none());
|
||||||
assert!(matches!(join_type, JoinType::Fresh));
|
assert!(matches!(join_type, JoinType::Fresh));
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -218,13 +218,13 @@ mod tests {
|
|||||||
|
|
||||||
let op1 = ComponentOp::Set {
|
let op1 = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(data.clone()),
|
data: ComponentData::Inline(bytes::Bytes::from(data.clone())),
|
||||||
vector_clock: clock.clone(),
|
vector_clock: clock.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let op2 = ComponentOp::Set {
|
let op2 = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(data.clone()),
|
data: ComponentData::Inline(bytes::Bytes::from(data.clone())),
|
||||||
vector_clock: clock,
|
vector_clock: clock,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -245,13 +245,13 @@ mod tests {
|
|||||||
|
|
||||||
let op1 = ComponentOp::Set {
|
let op1 = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(vec![1, 2, 3]),
|
data: ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3])),
|
||||||
vector_clock: clock1,
|
vector_clock: clock1,
|
||||||
};
|
};
|
||||||
|
|
||||||
let op2 = ComponentOp::Set {
|
let op2 = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(vec![4, 5, 6]),
|
data: ComponentData::Inline(bytes::Bytes::from(vec![4, 5, 6])),
|
||||||
vector_clock: clock2,
|
vector_clock: clock2,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ pub enum SyncMessage {
|
|||||||
session_id: SessionId,
|
session_id: SessionId,
|
||||||
|
|
||||||
/// Optional session secret for authentication
|
/// Optional session secret for authentication
|
||||||
session_secret: Option<Vec<u8>>,
|
session_secret: Option<bytes::Bytes>,
|
||||||
|
|
||||||
/// Vector clock from when we last left this session
|
/// Vector clock from when we last left this session
|
||||||
/// None = fresh join, Some = rejoin
|
/// None = fresh join, Some = rejoin
|
||||||
@@ -189,12 +189,12 @@ pub struct ComponentState {
|
|||||||
#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, PartialEq, Eq)]
|
||||||
pub enum ComponentData {
|
pub enum ComponentData {
|
||||||
/// Inline data for small components (<64KB)
|
/// Inline data for small components (<64KB)
|
||||||
Inline(Vec<u8>),
|
Inline(bytes::Bytes),
|
||||||
|
|
||||||
/// Reference to a blob for large components (>64KB)
|
/// Reference to a blob for large components (>64KB)
|
||||||
BlobRef {
|
BlobRef {
|
||||||
/// iroh-blobs hash
|
/// iroh-blobs hash
|
||||||
hash: Vec<u8>,
|
hash: bytes::Bytes,
|
||||||
|
|
||||||
/// Size of the blob in bytes
|
/// Size of the blob in bytes
|
||||||
size: u64,
|
size: u64,
|
||||||
@@ -206,11 +206,11 @@ impl ComponentData {
|
|||||||
pub const BLOB_THRESHOLD: usize = 64 * 1024;
|
pub const BLOB_THRESHOLD: usize = 64 * 1024;
|
||||||
|
|
||||||
/// Create component data, automatically choosing inline vs blob
|
/// Create component data, automatically choosing inline vs blob
|
||||||
pub fn new(data: Vec<u8>) -> Self {
|
pub fn new(data: bytes::Bytes) -> Self {
|
||||||
if data.len() > Self::BLOB_THRESHOLD {
|
if data.len() > Self::BLOB_THRESHOLD {
|
||||||
// Will be populated later when uploaded to iroh-blobs
|
// Will be populated later when uploaded to iroh-blobs
|
||||||
Self::BlobRef {
|
Self::BlobRef {
|
||||||
hash: Vec::new(),
|
hash: bytes::Bytes::new(),
|
||||||
size: data.len() as u64,
|
size: data.len() as u64,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -309,7 +309,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_component_data_inline() {
|
fn test_component_data_inline() {
|
||||||
let data = vec![1, 2, 3, 4];
|
let data = vec![1, 2, 3, 4];
|
||||||
let component_data = ComponentData::new(data.clone());
|
let component_data = ComponentData::new(bytes::Bytes::from(data.clone()));
|
||||||
|
|
||||||
assert!(!component_data.is_blob());
|
assert!(!component_data.is_blob());
|
||||||
assert_eq!(component_data.as_inline(), Some(data.as_slice()));
|
assert_eq!(component_data.as_inline(), Some(data.as_slice()));
|
||||||
@@ -319,7 +319,7 @@ mod tests {
|
|||||||
fn test_component_data_blob() {
|
fn test_component_data_blob() {
|
||||||
// Create data larger than threshold
|
// Create data larger than threshold
|
||||||
let data = vec![0u8; ComponentData::BLOB_THRESHOLD + 1];
|
let data = vec![0u8; ComponentData::BLOB_THRESHOLD + 1];
|
||||||
let component_data = ComponentData::new(data.clone());
|
let component_data = ComponentData::new(bytes::Bytes::from(data.clone()));
|
||||||
|
|
||||||
assert!(component_data.is_blob());
|
assert!(component_data.is_blob());
|
||||||
assert_eq!(component_data.as_inline(), None);
|
assert_eq!(component_data.as_inline(), None);
|
||||||
|
|||||||
@@ -27,7 +27,7 @@
|
|||||||
//! let builder = ComponentOpBuilder::new(node_id, clock.clone());
|
//! let builder = ComponentOpBuilder::new(node_id, clock.clone());
|
||||||
//! let op = builder.set(
|
//! let op = builder.set(
|
||||||
//! "Transform".to_string(),
|
//! "Transform".to_string(),
|
||||||
//! ComponentData::Inline(vec![1, 2, 3]),
|
//! ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3])),
|
||||||
//! );
|
//! );
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ pub enum ComponentOp {
|
|||||||
operation_id: uuid::Uuid,
|
operation_id: uuid::Uuid,
|
||||||
|
|
||||||
/// Element being added (serialized)
|
/// Element being added (serialized)
|
||||||
element: Vec<u8>,
|
element: bytes::Bytes,
|
||||||
|
|
||||||
/// Vector clock when this add was created
|
/// Vector clock when this add was created
|
||||||
vector_clock: VectorClock,
|
vector_clock: VectorClock,
|
||||||
@@ -106,7 +106,7 @@ pub enum ComponentOp {
|
|||||||
after_id: Option<uuid::Uuid>,
|
after_id: Option<uuid::Uuid>,
|
||||||
|
|
||||||
/// Element being inserted (serialized)
|
/// Element being inserted (serialized)
|
||||||
element: Vec<u8>,
|
element: bytes::Bytes,
|
||||||
|
|
||||||
/// Vector clock when this insert was created
|
/// Vector clock when this insert was created
|
||||||
vector_clock: VectorClock,
|
vector_clock: VectorClock,
|
||||||
@@ -218,7 +218,7 @@ impl ComponentOpBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Build a SetAdd operation (OR-Set)
|
/// Build a SetAdd operation (OR-Set)
|
||||||
pub fn set_add(mut self, discriminant: u16, element: Vec<u8>) -> ComponentOp {
|
pub fn set_add(mut self, discriminant: u16, element: bytes::Bytes) -> ComponentOp {
|
||||||
self.vector_clock.increment(self.node_id);
|
self.vector_clock.increment(self.node_id);
|
||||||
ComponentOp::SetAdd {
|
ComponentOp::SetAdd {
|
||||||
discriminant,
|
discriminant,
|
||||||
@@ -247,7 +247,7 @@ impl ComponentOpBuilder {
|
|||||||
mut self,
|
mut self,
|
||||||
discriminant: u16,
|
discriminant: u16,
|
||||||
after_id: Option<uuid::Uuid>,
|
after_id: Option<uuid::Uuid>,
|
||||||
element: Vec<u8>,
|
element: bytes::Bytes,
|
||||||
) -> ComponentOp {
|
) -> ComponentOp {
|
||||||
self.vector_clock.increment(self.node_id);
|
self.vector_clock.increment(self.node_id);
|
||||||
ComponentOp::SequenceInsert {
|
ComponentOp::SequenceInsert {
|
||||||
@@ -290,7 +290,7 @@ mod tests {
|
|||||||
fn test_discriminant() {
|
fn test_discriminant() {
|
||||||
let op = ComponentOp::Set {
|
let op = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(vec![1, 2, 3]),
|
data: ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3])),
|
||||||
vector_clock: VectorClock::new(),
|
vector_clock: VectorClock::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -310,7 +310,7 @@ mod tests {
|
|||||||
fn test_is_set() {
|
fn test_is_set() {
|
||||||
let op = ComponentOp::Set {
|
let op = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(vec![1, 2, 3]),
|
data: ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3])),
|
||||||
vector_clock: VectorClock::new(),
|
vector_clock: VectorClock::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -325,7 +325,7 @@ mod tests {
|
|||||||
let op = ComponentOp::SetAdd {
|
let op = ComponentOp::SetAdd {
|
||||||
discriminant: 2,
|
discriminant: 2,
|
||||||
operation_id: uuid::Uuid::new_v4(),
|
operation_id: uuid::Uuid::new_v4(),
|
||||||
element: vec![1, 2, 3],
|
element: bytes::Bytes::from(vec![1, 2, 3]),
|
||||||
vector_clock: VectorClock::new(),
|
vector_clock: VectorClock::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -341,7 +341,7 @@ mod tests {
|
|||||||
discriminant: 3,
|
discriminant: 3,
|
||||||
operation_id: uuid::Uuid::new_v4(),
|
operation_id: uuid::Uuid::new_v4(),
|
||||||
after_id: None,
|
after_id: None,
|
||||||
element: vec![1, 2, 3],
|
element: bytes::Bytes::from(vec![1, 2, 3]),
|
||||||
vector_clock: VectorClock::new(),
|
vector_clock: VectorClock::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -359,7 +359,7 @@ mod tests {
|
|||||||
let builder = ComponentOpBuilder::new(node_id, clock);
|
let builder = ComponentOpBuilder::new(node_id, clock);
|
||||||
let op = builder.set(
|
let op = builder.set(
|
||||||
1,
|
1,
|
||||||
ComponentData::Inline(vec![1, 2, 3]),
|
ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3])),
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(op.is_set());
|
assert!(op.is_set());
|
||||||
@@ -372,7 +372,7 @@ mod tests {
|
|||||||
let clock = VectorClock::new();
|
let clock = VectorClock::new();
|
||||||
|
|
||||||
let builder = ComponentOpBuilder::new(node_id, clock);
|
let builder = ComponentOpBuilder::new(node_id, clock);
|
||||||
let op = builder.set_add(2, vec![1, 2, 3]);
|
let op = builder.set_add(2, bytes::Bytes::from(vec![1, 2, 3]));
|
||||||
|
|
||||||
assert!(op.is_or_set());
|
assert!(op.is_or_set());
|
||||||
assert_eq!(op.vector_clock().get(node_id), 1);
|
assert_eq!(op.vector_clock().get(node_id), 1);
|
||||||
@@ -382,7 +382,7 @@ mod tests {
|
|||||||
fn test_serialization() -> anyhow::Result<()> {
|
fn test_serialization() -> anyhow::Result<()> {
|
||||||
let op = ComponentOp::Set {
|
let op = ComponentOp::Set {
|
||||||
discriminant: 1,
|
discriminant: 1,
|
||||||
data: ComponentData::Inline(vec![1, 2, 3]),
|
data: ComponentData::Inline(bytes::Bytes::from(vec![1, 2, 3])),
|
||||||
vector_clock: VectorClock::new(),
|
vector_clock: VectorClock::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -128,12 +128,12 @@ impl Default for NetworkingConfig {
|
|||||||
/// .run();
|
/// .run();
|
||||||
/// ```
|
/// ```
|
||||||
#[derive(Resource, Clone)]
|
#[derive(Resource, Clone)]
|
||||||
pub struct SessionSecret(Vec<u8>);
|
pub struct SessionSecret(bytes::Bytes);
|
||||||
|
|
||||||
impl SessionSecret {
|
impl SessionSecret {
|
||||||
/// Create a new session secret from bytes
|
/// Create a new session secret from bytes
|
||||||
pub fn new(secret: impl Into<Vec<u8>>) -> Self {
|
pub fn new(secret: impl Into<Vec<u8>>) -> Self {
|
||||||
Self(secret.into())
|
Self(bytes::Bytes::from(secret.into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the secret as a byte slice
|
/// Get the secret as a byte slice
|
||||||
|
|||||||
@@ -196,7 +196,7 @@ pub struct Session {
|
|||||||
pub state: SessionState,
|
pub state: SessionState,
|
||||||
|
|
||||||
/// Optional encrypted session secret for access control
|
/// Optional encrypted session secret for access control
|
||||||
pub secret: Option<Vec<u8>>,
|
pub secret: Option<bytes::Bytes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ pub trait SyncComponent: Component + Reflect + Sized {
|
|||||||
/// Serialize this component to bytes
|
/// Serialize this component to bytes
|
||||||
///
|
///
|
||||||
/// Uses rkyv for zero-copy binary serialization.
|
/// Uses rkyv for zero-copy binary serialization.
|
||||||
fn serialize_sync(&self) -> anyhow::Result<Vec<u8>>;
|
fn serialize_sync(&self) -> anyhow::Result<bytes::Bytes>;
|
||||||
|
|
||||||
/// Deserialize this component from bytes
|
/// Deserialize this component from bytes
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -485,7 +485,7 @@ pub fn save_session(conn: &mut Connection, session: &crate::networking::Session)
|
|||||||
session.last_active,
|
session.last_active,
|
||||||
session.entity_count as i64,
|
session.entity_count as i64,
|
||||||
session.state.to_string(),
|
session.state.to_string(),
|
||||||
session.secret,
|
session.secret.as_ref().map(|b| b.as_ref()),
|
||||||
],
|
],
|
||||||
)?;
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -517,7 +517,8 @@ pub fn load_session(
|
|||||||
last_active: row.get(3)?,
|
last_active: row.get(3)?,
|
||||||
entity_count: row.get::<_, i64>(4)? as usize,
|
entity_count: row.get::<_, i64>(4)? as usize,
|
||||||
state,
|
state,
|
||||||
secret: row.get(6)?,
|
secret: row.get::<_, Option<std::borrow::Cow<'_, [u8]>>>(6)?
|
||||||
|
.map(|cow| bytes::Bytes::copy_from_slice(&cow)),
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -548,7 +549,8 @@ pub fn get_last_active_session(conn: &Connection) -> Result<Option<crate::networ
|
|||||||
last_active: row.get(3)?,
|
last_active: row.get(3)?,
|
||||||
entity_count: row.get::<_, i64>(4)? as usize,
|
entity_count: row.get::<_, i64>(4)? as usize,
|
||||||
state,
|
state,
|
||||||
secret: row.get(6)?,
|
secret: row.get::<_, Option<std::borrow::Cow<'_, [u8]>>>(6)?
|
||||||
|
.map(|cow| bytes::Bytes::copy_from_slice(&cow)),
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -643,10 +645,10 @@ pub fn load_entity_components(
|
|||||||
|
|
||||||
let components: Vec<LoadedComponent> = stmt
|
let components: Vec<LoadedComponent> = stmt
|
||||||
.query_map([entity_id.as_bytes()], |row| {
|
.query_map([entity_id.as_bytes()], |row| {
|
||||||
let data_vec: Vec<u8> = row.get(1)?;
|
let data_cow: std::borrow::Cow<'_, [u8]> = row.get(1)?;
|
||||||
Ok(LoadedComponent {
|
Ok(LoadedComponent {
|
||||||
component_type: row.get(0)?,
|
component_type: row.get(0)?,
|
||||||
data: bytes::Bytes::from(data_vec),
|
data: bytes::Bytes::copy_from_slice(&data_cow),
|
||||||
})
|
})
|
||||||
})?
|
})?
|
||||||
.collect::<std::result::Result<Vec<_>, _>>()?;
|
.collect::<std::result::Result<Vec<_>, _>>()?;
|
||||||
@@ -669,7 +671,7 @@ pub fn load_entity_by_network_id(
|
|||||||
WHERE id = ?1",
|
WHERE id = ?1",
|
||||||
[network_id.as_bytes()],
|
[network_id.as_bytes()],
|
||||||
|row| {
|
|row| {
|
||||||
let id_bytes: Vec<u8> = row.get(0)?;
|
let id_bytes: std::borrow::Cow<'_, [u8]> = row.get(0)?;
|
||||||
let mut id_array = [0u8; 16];
|
let mut id_array = [0u8; 16];
|
||||||
id_array.copy_from_slice(&id_bytes);
|
id_array.copy_from_slice(&id_bytes);
|
||||||
let id = uuid::Uuid::from_bytes(id_array);
|
let id = uuid::Uuid::from_bytes(id_array);
|
||||||
@@ -714,7 +716,7 @@ pub fn load_all_entities(conn: &Connection) -> Result<Vec<LoadedEntity>> {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
let entity_rows = stmt.query_map([], |row| {
|
let entity_rows = stmt.query_map([], |row| {
|
||||||
let id_bytes: Vec<u8> = row.get(0)?;
|
let id_bytes: std::borrow::Cow<'_, [u8]> = row.get(0)?;
|
||||||
let mut id_array = [0u8; 16];
|
let mut id_array = [0u8; 16];
|
||||||
id_array.copy_from_slice(&id_bytes);
|
id_array.copy_from_slice(&id_bytes);
|
||||||
let id = uuid::Uuid::from_bytes(id_array);
|
let id = uuid::Uuid::from_bytes(id_array);
|
||||||
@@ -761,7 +763,7 @@ pub fn load_entities_by_type(conn: &Connection, entity_type: &str) -> Result<Vec
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
let entity_rows = stmt.query_map([entity_type], |row| {
|
let entity_rows = stmt.query_map([entity_type], |row| {
|
||||||
let id_bytes: Vec<u8> = row.get(0)?;
|
let id_bytes: std::borrow::Cow<'_, [u8]> = row.get(0)?;
|
||||||
let mut id_array = [0u8; 16];
|
let mut id_array = [0u8; 16];
|
||||||
id_array.copy_from_slice(&id_bytes);
|
id_array.copy_from_slice(&id_bytes);
|
||||||
let id = uuid::Uuid::from_bytes(id_array);
|
let id = uuid::Uuid::from_bytes(id_array);
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
//! Zero-copy component type registry using rkyv and inventory
|
//! Type registry using rkyv and inventory
|
||||||
//!
|
//!
|
||||||
//! This module provides a runtime type registry that collects all synced components
|
//! This module provides a runtime type registry that collects all synced
|
||||||
//! via the `inventory` crate and assigns them numeric discriminants for efficient
|
//! components via the `inventory` crate and assigns them numeric discriminants
|
||||||
//! serialization.
|
//! for efficient serialization.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
any::TypeId,
|
any::TypeId,
|
||||||
@@ -26,10 +26,13 @@ pub struct ComponentMeta {
|
|||||||
/// Deserialization function that returns a boxed component
|
/// Deserialization function that returns a boxed component
|
||||||
pub deserialize_fn: fn(&[u8]) -> Result<Box<dyn std::any::Any>>,
|
pub deserialize_fn: fn(&[u8]) -> Result<Box<dyn std::any::Any>>,
|
||||||
|
|
||||||
/// Serialization function that reads from an entity (returns None if entity doesn't have this component)
|
/// Serialization function that reads from an entity (returns None if entity
|
||||||
pub serialize_fn: fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<bytes::Bytes>,
|
/// doesn't have this component)
|
||||||
|
pub serialize_fn:
|
||||||
|
fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<bytes::Bytes>,
|
||||||
|
|
||||||
/// Insert function that takes a boxed component and inserts it into an entity
|
/// Insert function that takes a boxed component and inserts it into an
|
||||||
|
/// entity
|
||||||
pub insert_fn: fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>),
|
pub insert_fn: fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,10 +50,14 @@ pub struct ComponentTypeRegistry {
|
|||||||
discriminant_to_deserializer: HashMap<u16, fn(&[u8]) -> Result<Box<dyn std::any::Any>>>,
|
discriminant_to_deserializer: HashMap<u16, fn(&[u8]) -> Result<Box<dyn std::any::Any>>>,
|
||||||
|
|
||||||
/// Discriminant to serialization function
|
/// Discriminant to serialization function
|
||||||
discriminant_to_serializer: HashMap<u16, fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<bytes::Bytes>>,
|
discriminant_to_serializer: HashMap<
|
||||||
|
u16,
|
||||||
|
fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<bytes::Bytes>,
|
||||||
|
>,
|
||||||
|
|
||||||
/// Discriminant to insert function
|
/// Discriminant to insert function
|
||||||
discriminant_to_inserter: HashMap<u16, fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)>,
|
discriminant_to_inserter:
|
||||||
|
HashMap<u16, fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)>,
|
||||||
|
|
||||||
/// Discriminant to type name (for debugging)
|
/// Discriminant to type name (for debugging)
|
||||||
discriminant_to_name: HashMap<u16, &'static str>,
|
discriminant_to_name: HashMap<u16, &'static str>,
|
||||||
@@ -138,7 +145,10 @@ impl ComponentTypeRegistry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the insert function for a discriminant
|
/// Get the insert function for a discriminant
|
||||||
pub fn get_insert_fn(&self, discriminant: u16) -> Option<fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)> {
|
pub fn get_insert_fn(
|
||||||
|
&self,
|
||||||
|
discriminant: u16,
|
||||||
|
) -> Option<fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)> {
|
||||||
self.discriminant_to_inserter.get(&discriminant).copied()
|
self.discriminant_to_inserter.get(&discriminant).copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,8 +158,13 @@ impl ComponentTypeRegistry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the deserialize function for a discriminant
|
/// Get the deserialize function for a discriminant
|
||||||
pub fn get_deserialize_fn(&self, discriminant: u16) -> Option<fn(&[u8]) -> Result<Box<dyn std::any::Any>>> {
|
pub fn get_deserialize_fn(
|
||||||
self.discriminant_to_deserializer.get(&discriminant).copied()
|
&self,
|
||||||
|
discriminant: u16,
|
||||||
|
) -> Option<fn(&[u8]) -> Result<Box<dyn std::any::Any>>> {
|
||||||
|
self.discriminant_to_deserializer
|
||||||
|
.get(&discriminant)
|
||||||
|
.copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get type path for a discriminant
|
/// Get type path for a discriminant
|
||||||
@@ -158,7 +173,10 @@ impl ComponentTypeRegistry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the deserialize function by type path
|
/// Get the deserialize function by type path
|
||||||
pub fn get_deserialize_fn_by_path(&self, type_path: &str) -> Option<fn(&[u8]) -> Result<Box<dyn std::any::Any>>> {
|
pub fn get_deserialize_fn_by_path(
|
||||||
|
&self,
|
||||||
|
type_path: &str,
|
||||||
|
) -> Option<fn(&[u8]) -> Result<Box<dyn std::any::Any>>> {
|
||||||
// Linear search through discriminant_to_path to find matching type_path
|
// Linear search through discriminant_to_path to find matching type_path
|
||||||
for (discriminant, path) in &self.discriminant_to_path {
|
for (discriminant, path) in &self.discriminant_to_path {
|
||||||
if *path == type_path {
|
if *path == type_path {
|
||||||
@@ -169,7 +187,10 @@ impl ComponentTypeRegistry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the insert function by type path
|
/// Get the insert function by type path
|
||||||
pub fn get_insert_fn_by_path(&self, type_path: &str) -> Option<fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)> {
|
pub fn get_insert_fn_by_path(
|
||||||
|
&self,
|
||||||
|
type_path: &str,
|
||||||
|
) -> Option<fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)> {
|
||||||
// Linear search through discriminant_to_path to find matching type_path
|
// Linear search through discriminant_to_path to find matching type_path
|
||||||
for (discriminant, path) in &self.discriminant_to_path {
|
for (discriminant, path) in &self.discriminant_to_path {
|
||||||
if *path == type_path {
|
if *path == type_path {
|
||||||
@@ -191,7 +212,8 @@ impl ComponentTypeRegistry {
|
|||||||
|
|
||||||
/// Serialize all registered components from an entity
|
/// Serialize all registered components from an entity
|
||||||
///
|
///
|
||||||
/// Returns Vec<(discriminant, type_path, serialized_bytes)> for all components that exist on the entity.
|
/// Returns Vec<(discriminant, type_path, serialized_bytes)> for all
|
||||||
|
/// components that exist on the entity.
|
||||||
pub fn serialize_entity_components(
|
pub fn serialize_entity_components(
|
||||||
&self,
|
&self,
|
||||||
world: &bevy::ecs::world::World,
|
world: &bevy::ecs::world::World,
|
||||||
|
|||||||
@@ -178,7 +178,7 @@ proptest! {
|
|||||||
let op = PersistenceOp::UpsertComponent {
|
let op = PersistenceOp::UpsertComponent {
|
||||||
entity_id,
|
entity_id,
|
||||||
component_type,
|
component_type,
|
||||||
data,
|
data: bytes::Bytes::from(data),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Should never fail for valid data
|
// Should never fail for valid data
|
||||||
@@ -205,7 +205,7 @@ proptest! {
|
|||||||
let op1 = PersistenceOp::UpsertComponent {
|
let op1 = PersistenceOp::UpsertComponent {
|
||||||
entity_id,
|
entity_id,
|
||||||
component_type: component_type.clone(),
|
component_type: component_type.clone(),
|
||||||
data: data1.clone(),
|
data: bytes::Bytes::from(data1.clone()),
|
||||||
};
|
};
|
||||||
prop_assert!(buffer.add(op1).is_ok());
|
prop_assert!(buffer.add(op1).is_ok());
|
||||||
|
|
||||||
@@ -213,7 +213,7 @@ proptest! {
|
|||||||
let op2 = PersistenceOp::UpsertComponent {
|
let op2 = PersistenceOp::UpsertComponent {
|
||||||
entity_id,
|
entity_id,
|
||||||
component_type: component_type.clone(),
|
component_type: component_type.clone(),
|
||||||
data: data2.clone(),
|
data: bytes::Bytes::from(data2.clone()),
|
||||||
};
|
};
|
||||||
prop_assert!(buffer.add(op2).is_ok());
|
prop_assert!(buffer.add(op2).is_ok());
|
||||||
|
|
||||||
@@ -247,7 +247,7 @@ proptest! {
|
|||||||
let op = PersistenceOp::UpsertComponent {
|
let op = PersistenceOp::UpsertComponent {
|
||||||
entity_id,
|
entity_id,
|
||||||
component_type,
|
component_type,
|
||||||
data: oversized_data,
|
data: bytes::Bytes::from(oversized_data),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = buffer.add(op);
|
let result = buffer.add(op);
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ syn = { version = "2.0", features = ["full"] }
|
|||||||
quote = "1.0"
|
quote = "1.0"
|
||||||
proc-macro2 = "1.0"
|
proc-macro2 = "1.0"
|
||||||
inventory = { workspace = true }
|
inventory = { workspace = true }
|
||||||
|
bytes = "1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
libmarathon = { path = "../libmarathon" }
|
libmarathon = { path = "../libmarathon" }
|
||||||
|
|||||||
@@ -183,10 +183,10 @@ pub fn derive_synced(input: TokenStream) -> TokenStream {
|
|||||||
let component: #name = rkyv::from_bytes::<#name, rkyv::rancor::Failure>(bytes)?;
|
let component: #name = rkyv::from_bytes::<#name, rkyv::rancor::Failure>(bytes)?;
|
||||||
Ok(Box::new(component))
|
Ok(Box::new(component))
|
||||||
},
|
},
|
||||||
serialize_fn: |world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity| -> Option<Vec<u8>> {
|
serialize_fn: |world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity| -> Option<bytes::Bytes> {
|
||||||
world.get::<#name>(entity).and_then(|component| {
|
world.get::<#name>(entity).and_then(|component| {
|
||||||
rkyv::to_bytes::<rkyv::rancor::Failure>(component)
|
rkyv::to_bytes::<rkyv::rancor::Failure>(component)
|
||||||
.map(|bytes| bytes.to_vec())
|
.map(|vec| bytes::Bytes::from(vec.to_vec()))
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
@@ -204,7 +204,7 @@ pub fn derive_synced(input: TokenStream) -> TokenStream {
|
|||||||
const STRATEGY: libmarathon::networking::SyncStrategy = #strategy_tokens;
|
const STRATEGY: libmarathon::networking::SyncStrategy = #strategy_tokens;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn serialize_sync(&self) -> anyhow::Result<Vec<u8>> {
|
fn serialize_sync(&self) -> anyhow::Result<bytes::Bytes> {
|
||||||
#serialize_impl
|
#serialize_impl
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,7 +228,7 @@ fn generate_serialize(_input: &DeriveInput) -> proc_macro2::TokenStream {
|
|||||||
// Use rkyv for zero-copy serialization
|
// Use rkyv for zero-copy serialization
|
||||||
// Later we can optimize for specific types (e.g., f32 -> to_le_bytes)
|
// Later we can optimize for specific types (e.g., f32 -> to_le_bytes)
|
||||||
quote! {
|
quote! {
|
||||||
rkyv::to_bytes::<rkyv::rancor::Failure>(self).map(|bytes| bytes.to_vec()).map_err(|e| anyhow::anyhow!("Serialization failed: {}", e))
|
rkyv::to_bytes::<rkyv::rancor::Failure>(self).map(|bytes| bytes::Bytes::from(bytes.to_vec())).map_err(|e| anyhow::anyhow!("Serialization failed: {}", e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -499,10 +499,10 @@ pub fn synced(attr: TokenStream, item: TokenStream) -> TokenStream {
|
|||||||
let component: #name = rkyv::from_bytes::<#name, rkyv::rancor::Failure>(bytes)?;
|
let component: #name = rkyv::from_bytes::<#name, rkyv::rancor::Failure>(bytes)?;
|
||||||
Ok(Box::new(component))
|
Ok(Box::new(component))
|
||||||
},
|
},
|
||||||
serialize_fn: |world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity| -> Option<Vec<u8>> {
|
serialize_fn: |world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity| -> Option<bytes::Bytes> {
|
||||||
world.get::<#name>(entity).and_then(|component| {
|
world.get::<#name>(entity).and_then(|component| {
|
||||||
rkyv::to_bytes::<rkyv::rancor::Failure>(component)
|
rkyv::to_bytes::<rkyv::rancor::Failure>(component)
|
||||||
.map(|bytes| bytes.to_vec())
|
.map(|vec| bytes::Bytes::from(vec.to_vec()))
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
@@ -520,7 +520,7 @@ pub fn synced(attr: TokenStream, item: TokenStream) -> TokenStream {
|
|||||||
const STRATEGY: libmarathon::networking::SyncStrategy = #strategy_tokens;
|
const STRATEGY: libmarathon::networking::SyncStrategy = #strategy_tokens;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn serialize_sync(&self) -> anyhow::Result<Vec<u8>> {
|
fn serialize_sync(&self) -> anyhow::Result<bytes::Bytes> {
|
||||||
#serialize_impl
|
#serialize_impl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user