Simplify default Result generics.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -889,7 +889,7 @@ pub async fn full_user_deactivate(
|
|||||||
services: &Services,
|
services: &Services,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
all_joined_rooms: &[OwnedRoomId],
|
all_joined_rooms: &[OwnedRoomId],
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
.deactivate_account(user_id)
|
.deactivate_account(user_id)
|
||||||
|
|||||||
@@ -597,7 +597,7 @@ fn add_unsigned_device_display_name(
|
|||||||
keys: &mut Raw<ruma::encryption::DeviceKeys>,
|
keys: &mut Raw<ruma::encryption::DeviceKeys>,
|
||||||
metadata: ruma::api::client::device::Device,
|
metadata: ruma::api::client::device::Device,
|
||||||
include_display_names: bool,
|
include_display_names: bool,
|
||||||
) -> serde_json::Result<()> {
|
) -> Result {
|
||||||
if let Some(display_name) = metadata.display_name {
|
if let Some(display_name) = metadata.display_name {
|
||||||
let mut object = keys.deserialize_as::<serde_json::Map<String, serde_json::Value>>()?;
|
let mut object = keys.deserialize_as::<serde_json::Map<String, serde_json::Value>>()?;
|
||||||
|
|
||||||
|
|||||||
@@ -230,11 +230,7 @@ pub async fn leave_room(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remote_leave_room(
|
async fn remote_leave_room(services: &Services, user_id: &UserId, room_id: &RoomId) -> Result {
|
||||||
services: &Services,
|
|
||||||
user_id: &UserId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut make_leave_response_and_server =
|
let mut make_leave_response_and_server =
|
||||||
Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
|
Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
|
||||||
|
|
||||||
|
|||||||
@@ -149,7 +149,7 @@ async fn is_event_report_valid(
|
|||||||
reason: Option<&String>,
|
reason: Option<&String>,
|
||||||
score: Option<ruma::Int>,
|
score: Option<ruma::Int>,
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
debug_info!(
|
debug_info!(
|
||||||
"Checking if report from user {sender_user} for event {event_id} in room {room_id} is \
|
"Checking if report from user {sender_user} for event {event_id} in room {room_id} is \
|
||||||
valid"
|
valid"
|
||||||
|
|||||||
@@ -309,7 +309,7 @@ async fn auth_server(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn auth_server_checks(services: &Services, x_matrix: &XMatrix) -> Result<()> {
|
fn auth_server_checks(services: &Services, x_matrix: &XMatrix) -> Result {
|
||||||
if !services.server.config.allow_federation {
|
if !services.server.config.allow_federation {
|
||||||
return Err!(Config("allow_federation", "Federation is disabled."));
|
return Err!(Config("allow_federation", "Federation is disabled."));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -321,7 +321,7 @@ fn warn_unknown_key(config: &Config) {
|
|||||||
|
|
||||||
/// Checks the presence of the `address` and `unix_socket_path` keys in the
|
/// Checks the presence of the `address` and `unix_socket_path` keys in the
|
||||||
/// raw_config, exiting the process if both keys were detected.
|
/// raw_config, exiting the process if both keys were detected.
|
||||||
pub(super) fn is_dual_listening(raw_config: &Figment) -> Result<()> {
|
pub(super) fn is_dual_listening(raw_config: &Figment) -> Result {
|
||||||
let contains_address = raw_config.contains("address");
|
let contains_address = raw_config.contains("address");
|
||||||
let contains_unix_socket = raw_config.contains("unix_socket_path");
|
let contains_unix_socket = raw_config.contains("unix_socket_path");
|
||||||
if contains_address && contains_unix_socket {
|
if contains_address && contains_unix_socket {
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ fn init_features() -> Result<Vec<String>> {
|
|||||||
Ok(features)
|
Ok(features)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn append_features(features: &mut Vec<String>, manifest: &str) -> Result<()> {
|
fn append_features(features: &mut Vec<String>, manifest: &str) -> Result {
|
||||||
let manifest = Manifest::from_str(manifest)?;
|
let manifest = Manifest::from_str(manifest)?;
|
||||||
features.extend(manifest.features.keys().cloned());
|
features.extend(manifest.features.keys().cloned());
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ where
|
|||||||
|
|
||||||
pub fn fmt<F, S>(fun: F, out: Arc<Mutex<S>>) -> Box<Closure>
|
pub fn fmt<F, S>(fun: F, out: Arc<Mutex<S>>) -> Box<Closure>
|
||||||
where
|
where
|
||||||
F: Fn(&mut S, &Level, &str, &str) -> Result<()> + Send + Sync + Copy + 'static,
|
F: Fn(&mut S, &Level, &str, &str) -> Result + Send + Sync + Copy + 'static,
|
||||||
S: std::fmt::Write + Send + 'static,
|
S: std::fmt::Write + Send + 'static,
|
||||||
{
|
{
|
||||||
Box::new(move |data| call(fun, &mut *out.lock().expect("locked"), &data))
|
Box::new(move |data| call(fun, &mut *out.lock().expect("locked"), &data))
|
||||||
@@ -30,7 +30,7 @@ where
|
|||||||
|
|
||||||
fn call<F, S>(fun: F, out: &mut S, data: &Data<'_>)
|
fn call<F, S>(fun: F, out: &mut S, data: &Data<'_>)
|
||||||
where
|
where
|
||||||
F: Fn(&mut S, &Level, &str, &str) -> Result<()>,
|
F: Fn(&mut S, &Level, &str, &str) -> Result,
|
||||||
S: std::fmt::Write,
|
S: std::fmt::Write,
|
||||||
{
|
{
|
||||||
fun(out, &data.level(), data.span_name(), data.message()).expect("log line appended");
|
fun(out, &data.level(), data.span_name(), data.message()).expect("log line appended");
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::fmt::Write;
|
|||||||
use super::{Level, color};
|
use super::{Level, color};
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
pub fn html<S>(out: &mut S, level: &Level, span: &str, msg: &str) -> Result<()>
|
pub fn html<S>(out: &mut S, level: &Level, span: &str, msg: &str) -> Result
|
||||||
where
|
where
|
||||||
S: Write + ?Sized,
|
S: Write + ?Sized,
|
||||||
{
|
{
|
||||||
@@ -18,7 +18,7 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn markdown<S>(out: &mut S, level: &Level, span: &str, msg: &str) -> Result<()>
|
pub fn markdown<S>(out: &mut S, level: &Level, span: &str, msg: &str) -> Result
|
||||||
where
|
where
|
||||||
S: Write + ?Sized,
|
S: Write + ?Sized,
|
||||||
{
|
{
|
||||||
@@ -28,7 +28,7 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn markdown_table<S>(out: &mut S, level: &Level, span: &str, msg: &str) -> Result<()>
|
pub fn markdown_table<S>(out: &mut S, level: &Level, span: &str, msg: &str) -> Result
|
||||||
where
|
where
|
||||||
S: Write + ?Sized,
|
S: Write + ?Sized,
|
||||||
{
|
{
|
||||||
@@ -38,7 +38,7 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn markdown_table_head<S>(out: &mut S) -> Result<()>
|
pub fn markdown_table_head<S>(out: &mut S) -> Result
|
||||||
where
|
where
|
||||||
S: Write + ?Sized,
|
S: Write + ?Sized,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ impl LogLevelReloadHandles {
|
|||||||
.insert(name.into(), handle);
|
.insert(name.into(), handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reload(&self, new_value: &EnvFilter, names: Option<&[&str]>) -> Result<()> {
|
pub fn reload(&self, new_value: &EnvFilter, names: Option<&[&str]>) -> Result {
|
||||||
self.handles
|
self.handles
|
||||||
.lock()
|
.lock()
|
||||||
.expect("locked")
|
.expect("locked")
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ impl Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reload(&self) -> Result<()> {
|
pub fn reload(&self) -> Result {
|
||||||
if cfg!(any(not(tuwunel_mods), not(feature = "tuwunel_mods"))) {
|
if cfg!(any(not(tuwunel_mods), not(feature = "tuwunel_mods"))) {
|
||||||
return Err!("Reloading not enabled");
|
return Err!("Reloading not enabled");
|
||||||
}
|
}
|
||||||
@@ -103,7 +103,7 @@ impl Server {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn signal(&self, sig: &'static str) -> Result<()> {
|
pub fn signal(&self, sig: &'static str) -> Result {
|
||||||
if let Err(e) = self.signal.send(sig) {
|
if let Err(e) = self.signal.send(sig) {
|
||||||
return Err!("Failed to send signal: {e}");
|
return Err!("Failed to send signal: {e}");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ pub(super) fn password(password: &str) -> Result<String> {
|
|||||||
.map_err(map_err)
|
.map_err(map_err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn verify_password(password: &str, password_hash: &str) -> Result<()> {
|
pub(super) fn verify_password(password: &str, password_hash: &str) -> Result {
|
||||||
let password_hash = PasswordHash::new(password_hash).map_err(map_err)?;
|
let password_hash = PasswordHash::new(password_hash).map_err(map_err)?;
|
||||||
ARGON
|
ARGON
|
||||||
.get_or_init(init_argon)
|
.get_or_init(init_argon)
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ macro_rules! is_format {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn collect_stream<F>(func: F) -> Result<String>
|
pub fn collect_stream<F>(func: F) -> Result<String>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut dyn std::fmt::Write) -> Result<()>,
|
F: FnOnce(&mut dyn std::fmt::Write) -> Result,
|
||||||
{
|
{
|
||||||
let mut out = String::new();
|
let mut out = String::new();
|
||||||
func(&mut out)?;
|
func(&mut out)?;
|
||||||
@@ -63,7 +63,7 @@ pub fn camel_to_snake_string(s: &str) -> String {
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[allow(clippy::unbuffered_bytes)] // these are allocated string utilities, not file I/O utils
|
#[allow(clippy::unbuffered_bytes)] // these are allocated string utilities, not file I/O utils
|
||||||
pub fn camel_to_snake_case<I, O>(output: &mut O, input: I) -> Result<()>
|
pub fn camel_to_snake_case<I, O>(output: &mut O, input: I) -> Result
|
||||||
where
|
where
|
||||||
I: std::io::Read,
|
I: std::io::Read,
|
||||||
O: std::fmt::Write,
|
O: std::fmt::Write,
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ impl<'de> Deserializer<'de> {
|
|||||||
|
|
||||||
/// Determine if the input was fully consumed and error if bytes remaining.
|
/// Determine if the input was fully consumed and error if bytes remaining.
|
||||||
/// This is intended for debug assertions; not optimized for parsing logic.
|
/// This is intended for debug assertions; not optimized for parsing logic.
|
||||||
fn finished(&self) -> Result<()> {
|
fn finished(&self) -> Result {
|
||||||
let pos = self.pos;
|
let pos = self.pos;
|
||||||
let len = self.buf.len();
|
let len = self.buf.len();
|
||||||
let parsed = &self.buf[0..pos];
|
let parsed = &self.buf[0..pos];
|
||||||
|
|||||||
@@ -17,10 +17,10 @@ use crate::Server;
|
|||||||
type StartFuncResult = Pin<Box<dyn Future<Output = Result<Arc<Services>>> + Send>>;
|
type StartFuncResult = Pin<Box<dyn Future<Output = Result<Arc<Services>>> + Send>>;
|
||||||
type StartFuncProto = fn(&Arc<tuwunel_core::Server>) -> StartFuncResult;
|
type StartFuncProto = fn(&Arc<tuwunel_core::Server>) -> StartFuncResult;
|
||||||
|
|
||||||
type RunFuncResult = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
|
type RunFuncResult = Pin<Box<dyn Future<Output = Result> + Send>>;
|
||||||
type RunFuncProto = fn(&Arc<Services>) -> RunFuncResult;
|
type RunFuncProto = fn(&Arc<Services>) -> RunFuncResult;
|
||||||
|
|
||||||
type StopFuncResult = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
|
type StopFuncResult = Pin<Box<dyn Future<Output = Result> + Send>>;
|
||||||
type StopFuncProto = fn(Arc<Services>) -> StopFuncResult;
|
type StopFuncProto = fn(Arc<Services>) -> StopFuncResult;
|
||||||
|
|
||||||
const RESTART_THRESH: &str = "tuwunel_service";
|
const RESTART_THRESH: &str = "tuwunel_service";
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ pub extern "Rust" fn start(
|
|||||||
#[unsafe(no_mangle)]
|
#[unsafe(no_mangle)]
|
||||||
pub extern "Rust" fn stop(
|
pub extern "Rust" fn stop(
|
||||||
services: Arc<Services>,
|
services: Arc<Services>,
|
||||||
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
|
) -> Pin<Box<dyn Future<Output = Result> + Send>> {
|
||||||
AssertUnwindSafe(run::stop(services))
|
AssertUnwindSafe(run::stop(services))
|
||||||
.catch_unwind()
|
.catch_unwind()
|
||||||
.map_err(Error::from_panic)
|
.map_err(Error::from_panic)
|
||||||
@@ -41,7 +41,7 @@ pub extern "Rust" fn stop(
|
|||||||
#[unsafe(no_mangle)]
|
#[unsafe(no_mangle)]
|
||||||
pub extern "Rust" fn run(
|
pub extern "Rust" fn run(
|
||||||
services: &Arc<Services>,
|
services: &Arc<Services>,
|
||||||
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
|
) -> Pin<Box<dyn Future<Output = Result> + Send>> {
|
||||||
AssertUnwindSafe(run::run(services.clone()))
|
AssertUnwindSafe(run::run(services.clone()))
|
||||||
.catch_unwind()
|
.catch_unwind()
|
||||||
.map_err(Error::from_panic)
|
.map_err(Error::from_panic)
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ use crate::serve;
|
|||||||
|
|
||||||
/// Main loop base
|
/// Main loop base
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(crate) async fn run(services: Arc<Services>) -> Result<()> {
|
pub(crate) async fn run(services: Arc<Services>) -> Result {
|
||||||
let server = &services.server;
|
let server = &services.server;
|
||||||
debug!("Start");
|
debug!("Start");
|
||||||
|
|
||||||
@@ -70,7 +70,7 @@ pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
|
|||||||
|
|
||||||
/// Async destructions
|
/// Async destructions
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
|
pub(crate) async fn stop(services: Arc<Services>) -> Result {
|
||||||
debug!("Shutting down...");
|
debug!("Shutting down...");
|
||||||
|
|
||||||
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
#[cfg(all(feature = "systemd", target_os = "linux"))]
|
||||||
@@ -131,9 +131,9 @@ async fn handle_shutdown(server: Arc<Server>, tx: Sender<()>, handle: axum_serve
|
|||||||
|
|
||||||
async fn handle_services_poll(
|
async fn handle_services_poll(
|
||||||
server: &Arc<Server>,
|
server: &Arc<Server>,
|
||||||
result: Result<()>,
|
result: Result,
|
||||||
listener: JoinHandle<Result<()>>,
|
listener: JoinHandle<Result>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
debug!("Service manager finished: {result:?}");
|
debug!("Service manager finished: {result:?}");
|
||||||
|
|
||||||
if server.running() {
|
if server.running() {
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ pub(super) async fn serve(
|
|||||||
app: Router,
|
app: Router,
|
||||||
handle: ServerHandle,
|
handle: ServerHandle,
|
||||||
addrs: Vec<SocketAddr>,
|
addrs: Vec<SocketAddr>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let app = app.into_make_service_with_connect_info::<SocketAddr>();
|
let app = app.into_make_service_with_connect_info::<SocketAddr>();
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
for addr in &addrs {
|
for addr in &addrs {
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ pub(super) async fn serve(
|
|||||||
server: &Arc<Server>,
|
server: &Arc<Server>,
|
||||||
app: Router,
|
app: Router,
|
||||||
mut shutdown: broadcast::Receiver<()>,
|
mut shutdown: broadcast::Receiver<()>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let mut tasks = JoinSet::<()>::new();
|
let mut tasks = JoinSet::<()>::new();
|
||||||
let executor = TokioExecutor::new();
|
let executor = TokioExecutor::new();
|
||||||
let app = app.into_make_service_with_connect_info::<net::SocketAddr>();
|
let app = app.into_make_service_with_connect_info::<net::SocketAddr>();
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ pub async fn update(
|
|||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
event_type: RoomAccountDataEventType,
|
event_type: RoomAccountDataEventType,
|
||||||
data: &serde_json::Value,
|
data: &serde_json::Value,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
if data.get("type").is_none() || data.get("content").is_none() {
|
if data.get("type").is_none() || data.get("content").is_none() {
|
||||||
return Err!(Request(InvalidParam("Account data doesn't have all required fields.")));
|
return Err!(Request(InvalidParam("Account data doesn't have all required fields.")));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -275,7 +275,7 @@ impl Service {
|
|||||||
.ok_or_else(|| err!(Request(NotFound("Admin user not joined to admin room"))))
|
.ok_or_else(|| err!(Request(NotFound("Admin user not joined to admin room"))))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_response(&self, content: RoomMessageEventContent) -> Result<()> {
|
async fn handle_response(&self, content: RoomMessageEventContent) -> Result {
|
||||||
let Some(Relation::Reply { in_reply_to }) = content.relates_to.as_ref() else {
|
let Some(Relation::Reply { in_reply_to }) = content.relates_to.as_ref() else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
@@ -309,7 +309,7 @@ impl Service {
|
|||||||
content: RoomMessageEventContent,
|
content: RoomMessageEventContent,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
assert!(self.user_is_admin(user_id).await, "sender is not admin");
|
assert!(self.user_is_admin(user_id).await, "sender is not admin");
|
||||||
|
|
||||||
let state_lock = self.services.state.mutex.lock(room_id).await;
|
let state_lock = self.services.state.mutex.lock(room_id).await;
|
||||||
@@ -334,7 +334,7 @@ impl Service {
|
|||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
state_lock: &RoomMutexGuard,
|
state_lock: &RoomMutexGuard,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
||||||
let content = RoomMessageEventContent::text_plain(format!(
|
let content = RoomMessageEventContent::text_plain(format!(
|
||||||
"Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command \
|
"Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command \
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Res
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
fn validate_url(&self, url: &Url) -> Result<()> {
|
fn validate_url(&self, url: &Url) -> Result {
|
||||||
if let Some(url_host) = url.host_str() {
|
if let Some(url_host) = url.host_str() {
|
||||||
if let Ok(ip) = IPAddress::parse(url_host) {
|
if let Ok(ip) = IPAddress::parse(url_host) {
|
||||||
trace!("Checking request URL IP {ip:?}");
|
trace!("Checking request URL IP {ip:?}");
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ pub async fn add_key(
|
|||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
session_id: &str,
|
session_id: &str,
|
||||||
key_data: &Raw<KeyBackupData>,
|
key_data: &Raw<KeyBackupData>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let key = (user_id, version);
|
let key = (user_id, version);
|
||||||
if self
|
if self
|
||||||
.db
|
.db
|
||||||
|
|||||||
@@ -13,14 +13,14 @@ use tuwunel_core::{
|
|||||||
use crate::{Services, service, service::Service};
|
use crate::{Services, service, service::Service};
|
||||||
|
|
||||||
pub(crate) struct Manager {
|
pub(crate) struct Manager {
|
||||||
manager: Mutex<Option<JoinHandle<Result<()>>>>,
|
manager: Mutex<Option<JoinHandle<Result>>>,
|
||||||
workers: Mutex<Workers>,
|
workers: Mutex<Workers>,
|
||||||
server: Arc<Server>,
|
server: Arc<Server>,
|
||||||
service: Arc<service::Map>,
|
service: Arc<service::Map>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type Workers = JoinSet<WorkerResult>;
|
type Workers = JoinSet<WorkerResult>;
|
||||||
type WorkerResult = (Arc<dyn Service>, Result<()>);
|
type WorkerResult = (Arc<dyn Service>, Result);
|
||||||
type WorkersLocked<'a> = MutexGuard<'a, Workers>;
|
type WorkersLocked<'a> = MutexGuard<'a, Workers>;
|
||||||
|
|
||||||
const RESTART_DELAY_MS: u64 = 2500;
|
const RESTART_DELAY_MS: u64 = 2500;
|
||||||
@@ -35,7 +35,7 @@ impl Manager {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn poll(&self) -> Result<()> {
|
pub(super) async fn poll(&self) -> Result {
|
||||||
if let Some(manager) = &mut *self.manager.lock().await {
|
if let Some(manager) = &mut *self.manager.lock().await {
|
||||||
trace!("Polling service manager...");
|
trace!("Polling service manager...");
|
||||||
return manager.await?;
|
return manager.await?;
|
||||||
@@ -44,7 +44,7 @@ impl Manager {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn start(self: Arc<Self>) -> Result<()> {
|
pub(super) async fn start(self: Arc<Self>) -> Result {
|
||||||
let mut workers = self.workers.lock().await;
|
let mut workers = self.workers.lock().await;
|
||||||
|
|
||||||
debug!("Starting service manager...");
|
debug!("Starting service manager...");
|
||||||
@@ -83,7 +83,7 @@ impl Manager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(&self) -> Result<()> {
|
async fn worker(&self) -> Result {
|
||||||
loop {
|
loop {
|
||||||
let mut workers = self.workers.lock().await;
|
let mut workers = self.workers.lock().await;
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -99,7 +99,7 @@ impl Manager {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_abort(&self, _workers: &mut WorkersLocked<'_>, error: Error) -> Result<()> {
|
async fn handle_abort(&self, _workers: &mut WorkersLocked<'_>, error: Error) -> Result {
|
||||||
// not supported until service can be associated with abort
|
// not supported until service can be associated with abort
|
||||||
unimplemented!("unexpected worker task abort {error:?}");
|
unimplemented!("unexpected worker task abort {error:?}");
|
||||||
}
|
}
|
||||||
@@ -108,7 +108,7 @@ impl Manager {
|
|||||||
&self,
|
&self,
|
||||||
workers: &mut WorkersLocked<'_>,
|
workers: &mut WorkersLocked<'_>,
|
||||||
result: WorkerResult,
|
result: WorkerResult,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let (service, result) = result;
|
let (service, result) = result;
|
||||||
match result {
|
match result {
|
||||||
| Ok(()) => self.handle_finished(workers, &service).await,
|
| Ok(()) => self.handle_finished(workers, &service).await,
|
||||||
@@ -120,7 +120,7 @@ impl Manager {
|
|||||||
&self,
|
&self,
|
||||||
_workers: &mut WorkersLocked<'_>,
|
_workers: &mut WorkersLocked<'_>,
|
||||||
service: &Arc<dyn Service>,
|
service: &Arc<dyn Service>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
debug!("service {:?} worker finished", service.name());
|
debug!("service {:?} worker finished", service.name());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -130,7 +130,7 @@ impl Manager {
|
|||||||
workers: &mut WorkersLocked<'_>,
|
workers: &mut WorkersLocked<'_>,
|
||||||
service: &Arc<dyn Service>,
|
service: &Arc<dyn Service>,
|
||||||
error: Error,
|
error: Error,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let name = service.name();
|
let name = service.name();
|
||||||
error!("service {name:?} aborted: {error}");
|
error!("service {name:?} aborted: {error}");
|
||||||
|
|
||||||
@@ -155,7 +155,7 @@ impl Manager {
|
|||||||
&self,
|
&self,
|
||||||
workers: &mut WorkersLocked<'_>,
|
workers: &mut WorkersLocked<'_>,
|
||||||
service: &Arc<dyn Service>,
|
service: &Arc<dyn Service>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
if !self.server.running() {
|
if !self.server.running() {
|
||||||
return Err!(
|
return Err!(
|
||||||
"Service {:?} worker not starting during server shutdown.",
|
"Service {:?} worker not starting during server shutdown.",
|
||||||
|
|||||||
@@ -165,7 +165,7 @@ impl Data {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(super) fn remove_url_preview(&self, url: &str) -> Result<()> {
|
pub(super) fn remove_url_preview(&self, url: &str) -> Result {
|
||||||
self.url_previews.remove(url.as_bytes());
|
self.url_previews.remove(url.as_bytes());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -175,7 +175,7 @@ impl Data {
|
|||||||
url: &str,
|
url: &str,
|
||||||
data: &UrlPreviewData,
|
data: &UrlPreviewData,
|
||||||
timestamp: Duration,
|
timestamp: Duration,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let mut value = Vec::<u8>::new();
|
let mut value = Vec::<u8>::new();
|
||||||
value.extend_from_slice(×tamp.as_secs().to_be_bytes());
|
value.extend_from_slice(×tamp.as_secs().to_be_bytes());
|
||||||
value.push(0xFF);
|
value.push(0xFF);
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use crate::Services;
|
|||||||
/// Migrates a media directory from legacy base64 file names to sha2 file names.
|
/// Migrates a media directory from legacy base64 file names to sha2 file names.
|
||||||
/// All errors are fatal. Upon success the database is keyed to not perform this
|
/// All errors are fatal. Upon success the database is keyed to not perform this
|
||||||
/// again.
|
/// again.
|
||||||
pub(crate) async fn migrate_sha256_media(services: &Services) -> Result<()> {
|
pub(crate) async fn migrate_sha256_media(services: &Services) -> Result {
|
||||||
let db = &services.db;
|
let db = &services.db;
|
||||||
let config = &services.server.config;
|
let config = &services.server.config;
|
||||||
|
|
||||||
@@ -57,7 +57,7 @@ pub(crate) async fn migrate_sha256_media(services: &Services) -> Result<()> {
|
|||||||
/// - Going back and forth to non-sha256 legacy binaries (e.g. upstream).
|
/// - Going back and forth to non-sha256 legacy binaries (e.g. upstream).
|
||||||
/// - Deletion of artifacts in the media directory which will then fall out of
|
/// - Deletion of artifacts in the media directory which will then fall out of
|
||||||
/// sync with the database.
|
/// sync with the database.
|
||||||
pub(crate) async fn checkup_sha256_media(services: &Services) -> Result<()> {
|
pub(crate) async fn checkup_sha256_media(services: &Services) -> Result {
|
||||||
use crate::media::encode_key;
|
use crate::media::encode_key;
|
||||||
|
|
||||||
debug!("Checking integrity of media directory");
|
debug!("Checking integrity of media directory");
|
||||||
@@ -101,7 +101,7 @@ async fn handle_media_check(
|
|||||||
key: &[u8],
|
key: &[u8],
|
||||||
new_path: &OsStr,
|
new_path: &OsStr,
|
||||||
old_path: &OsStr,
|
old_path: &OsStr,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
use crate::media::encode_key;
|
use crate::media::encode_key;
|
||||||
|
|
||||||
let (mediaid_file, mediaid_user) = dbs;
|
let (mediaid_file, mediaid_user) = dbs;
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ impl crate::Service for Service {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(self: Arc<Self>) -> Result<()> {
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
self.create_media_dir().await?;
|
self.create_media_dir().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -86,7 +86,7 @@ impl Service {
|
|||||||
content_disposition: Option<&ContentDisposition>,
|
content_disposition: Option<&ContentDisposition>,
|
||||||
content_type: Option<&str>,
|
content_type: Option<&str>,
|
||||||
file: &[u8],
|
file: &[u8],
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
// Width, Height = 0 if it's not a thumbnail
|
// Width, Height = 0 if it's not a thumbnail
|
||||||
let key = self.db.create_file_metadata(
|
let key = self.db.create_file_metadata(
|
||||||
mxc,
|
mxc,
|
||||||
@@ -104,7 +104,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes a file in the database and from the media directory via an MXC
|
/// Deletes a file in the database and from the media directory via an MXC
|
||||||
pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> {
|
pub async fn delete(&self, mxc: &Mxc<'_>) -> Result {
|
||||||
match self.db.search_mxc_metadata_prefix(mxc).await {
|
match self.db.search_mxc_metadata_prefix(mxc).await {
|
||||||
| Ok(keys) => {
|
| Ok(keys) => {
|
||||||
for key in keys {
|
for key in keys {
|
||||||
@@ -341,12 +341,12 @@ impl Service {
|
|||||||
Ok(deletion_count)
|
Ok(deletion_count)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_media_dir(&self) -> Result<()> {
|
pub async fn create_media_dir(&self) -> Result {
|
||||||
let dir = self.get_media_dir();
|
let dir = self.get_media_dir();
|
||||||
Ok(fs::create_dir_all(dir).await?)
|
Ok(fs::create_dir_all(dir).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_media_file(&self, key: &[u8]) -> Result<()> {
|
async fn remove_media_file(&self, key: &[u8]) -> Result {
|
||||||
let path = self.get_media_file(key);
|
let path = self.get_media_file(key);
|
||||||
let legacy = self.get_media_file_b64(key);
|
let legacy = self.get_media_file_b64(key);
|
||||||
debug!(?key, ?path, ?legacy, "Removing media file");
|
debug!(?key, ?path, ?legacy, "Removing media file");
|
||||||
|
|||||||
@@ -49,13 +49,13 @@ pub struct UrlPreviewData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub async fn remove_url_preview(&self, url: &str) -> Result<()> {
|
pub async fn remove_url_preview(&self, url: &str) -> Result {
|
||||||
// TODO: also remove the downloaded image
|
// TODO: also remove the downloaded image
|
||||||
self.db.remove_url_preview(url)
|
self.db.remove_url_preview(url)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub async fn set_url_preview(&self, url: &str, data: &UrlPreviewData) -> Result<()> {
|
pub async fn set_url_preview(&self, url: &str, data: &UrlPreviewData) -> Result {
|
||||||
let now = SystemTime::now()
|
let now = SystemTime::now()
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
.expect("valid system time");
|
.expect("valid system time");
|
||||||
|
|||||||
@@ -439,7 +439,7 @@ pub async fn fetch_remote_content_legacy(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
fn check_fetch_authorized(&self, mxc: &Mxc<'_>) -> Result<()> {
|
fn check_fetch_authorized(&self, mxc: &Mxc<'_>) -> Result {
|
||||||
if self
|
if self
|
||||||
.services
|
.services
|
||||||
.server
|
.server
|
||||||
@@ -463,7 +463,7 @@ fn check_fetch_authorized(&self, mxc: &Mxc<'_>) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
fn check_legacy_freeze(&self) -> Result<()> {
|
fn check_legacy_freeze(&self) -> Result {
|
||||||
self.services
|
self.services
|
||||||
.server
|
.server
|
||||||
.config
|
.config
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ async fn long_file_names_works() {
|
|||||||
Ok(key)
|
Ok(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_file_mxc(&self, _mxc: String) -> Result<()> { todo!() }
|
fn delete_file_mxc(&self, _mxc: String) -> Result { todo!() }
|
||||||
|
|
||||||
fn search_mxc_metadata_prefix(&self, _mxc: String) -> Result<Vec<Vec<u8>>> { todo!() }
|
fn search_mxc_metadata_prefix(&self, _mxc: String) -> Result<Vec<Vec<u8>>> { todo!() }
|
||||||
|
|
||||||
@@ -59,14 +59,14 @@ async fn long_file_names_works() {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_url_preview(&self, _url: &str) -> Result<()> { todo!() }
|
fn remove_url_preview(&self, _url: &str) -> Result { todo!() }
|
||||||
|
|
||||||
fn set_url_preview(
|
fn set_url_preview(
|
||||||
&self,
|
&self,
|
||||||
_url: &str,
|
_url: &str,
|
||||||
_data: &UrlPreviewData,
|
_data: &UrlPreviewData,
|
||||||
_timestamp: std::time::Duration,
|
_timestamp: std::time::Duration,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ impl super::Service {
|
|||||||
content_type: Option<&str>,
|
content_type: Option<&str>,
|
||||||
dim: &Dim,
|
dim: &Dim,
|
||||||
file: &[u8],
|
file: &[u8],
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let key =
|
let key =
|
||||||
self.db
|
self.db
|
||||||
.create_file_metadata(mxc, user, dim, content_disposition, content_type)?;
|
.create_file_metadata(mxc, user, dim, content_disposition, content_type)?;
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ use crate::{Services, media};
|
|||||||
/// equal or lesser version. These are expected to be backward-compatible.
|
/// equal or lesser version. These are expected to be backward-compatible.
|
||||||
pub(crate) const DATABASE_VERSION: u64 = 17;
|
pub(crate) const DATABASE_VERSION: u64 = 17;
|
||||||
|
|
||||||
pub(crate) async fn migrations(services: &Services) -> Result<()> {
|
pub(crate) async fn migrations(services: &Services) -> Result {
|
||||||
let users_count = services.users.count().await;
|
let users_count = services.users.count().await;
|
||||||
|
|
||||||
// Matrix resource ownership is based on the server name; changing it
|
// Matrix resource ownership is based on the server name; changing it
|
||||||
@@ -52,7 +52,7 @@ pub(crate) async fn migrations(services: &Services) -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fresh(services: &Services) -> Result<()> {
|
async fn fresh(services: &Services) -> Result {
|
||||||
let db = &services.db;
|
let db = &services.db;
|
||||||
|
|
||||||
services
|
services
|
||||||
@@ -77,7 +77,7 @@ async fn fresh(services: &Services) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Apply any migrations
|
/// Apply any migrations
|
||||||
async fn migrate(services: &Services) -> Result<()> {
|
async fn migrate(services: &Services) -> Result {
|
||||||
let db = &services.db;
|
let db = &services.db;
|
||||||
let config = &services.server.config;
|
let config = &services.server.config;
|
||||||
|
|
||||||
@@ -220,7 +220,7 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn db_lt_12(services: &Services) -> Result<()> {
|
async fn db_lt_12(services: &Services) -> Result {
|
||||||
for username in &services
|
for username in &services
|
||||||
.users
|
.users
|
||||||
.list_local_users()
|
.list_local_users()
|
||||||
@@ -306,7 +306,7 @@ async fn db_lt_12(services: &Services) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn db_lt_13(services: &Services) -> Result<()> {
|
async fn db_lt_13(services: &Services) -> Result {
|
||||||
for username in &services
|
for username in &services
|
||||||
.users
|
.users
|
||||||
.list_local_users()
|
.list_local_users()
|
||||||
@@ -353,7 +353,7 @@ async fn db_lt_13(services: &Services) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result<()> {
|
async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result {
|
||||||
warn!("Fixing bad double separator in state_cache roomuserid_joined");
|
warn!("Fixing bad double separator in state_cache roomuserid_joined");
|
||||||
|
|
||||||
let db = &services.db;
|
let db = &services.db;
|
||||||
@@ -397,7 +397,7 @@ async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result<
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) -> Result<()> {
|
async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) -> Result {
|
||||||
warn!("Retroactively fixing bad data from broken roomuserid_joined");
|
warn!("Retroactively fixing bad data from broken roomuserid_joined");
|
||||||
|
|
||||||
let db = &services.db;
|
let db = &services.db;
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ impl Data {
|
|||||||
currently_active: Option<bool>,
|
currently_active: Option<bool>,
|
||||||
last_active_ago: Option<UInt>,
|
last_active_ago: Option<UInt>,
|
||||||
status_msg: Option<String>,
|
status_msg: Option<String>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let last_presence = self.get_presence(user_id).await;
|
let last_presence = self.get_presence(user_id).await;
|
||||||
let state_changed = match last_presence {
|
let state_changed = match last_presence {
|
||||||
| Err(_) => true,
|
| Err(_) => true,
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ impl crate::Service for Service {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(self: Arc<Self>) -> Result<()> {
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
let receiver = self.timer_channel.1.clone();
|
let receiver = self.timer_channel.1.clone();
|
||||||
|
|
||||||
let mut presence_timers = FuturesUnordered::new();
|
let mut presence_timers = FuturesUnordered::new();
|
||||||
@@ -99,7 +99,7 @@ impl Service {
|
|||||||
|
|
||||||
/// Pings the presence of the given user in the given room, setting the
|
/// Pings the presence of the given user in the given room, setting the
|
||||||
/// specified state.
|
/// specified state.
|
||||||
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
|
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result {
|
||||||
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
||||||
|
|
||||||
let last_presence = self.db.get_presence(user_id).await;
|
let last_presence = self.db.get_presence(user_id).await;
|
||||||
@@ -140,7 +140,7 @@ impl Service {
|
|||||||
currently_active: Option<bool>,
|
currently_active: Option<bool>,
|
||||||
last_active_ago: Option<UInt>,
|
last_active_ago: Option<UInt>,
|
||||||
status_msg: Option<String>,
|
status_msg: Option<String>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
let presence_state = match state.as_str() {
|
let presence_state = match state.as_str() {
|
||||||
| "" => &PresenceState::Offline, // default an empty string to 'offline'
|
| "" => &PresenceState::Offline, // default an empty string to 'offline'
|
||||||
| &_ => state,
|
| &_ => state,
|
||||||
@@ -257,7 +257,7 @@ impl Service {
|
|||||||
Ok(event)
|
Ok(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_presence_timer(&self, user_id: &OwnedUserId) -> Result<()> {
|
async fn process_presence_timer(&self, user_id: &OwnedUserId) -> Result {
|
||||||
let mut presence_state = PresenceState::Offline;
|
let mut presence_state = PresenceState::Offline;
|
||||||
let mut last_active_ago = None;
|
let mut last_active_ago = None;
|
||||||
let mut status_msg = None;
|
let mut status_msg = None;
|
||||||
|
|||||||
@@ -390,8 +390,7 @@ impl Service {
|
|||||||
pusher: &Pusher,
|
pusher: &Pusher,
|
||||||
tweaks: Vec<Tweak>,
|
tweaks: Vec<Tweak>,
|
||||||
event: &Pdu,
|
event: &Pdu,
|
||||||
) -> Result
|
) -> Result {
|
||||||
{
|
|
||||||
// TODO: email
|
// TODO: email
|
||||||
match &pusher.kind {
|
match &pusher.kind {
|
||||||
| PusherKind::Http(http) => {
|
| PusherKind::Http(http) => {
|
||||||
|
|||||||
@@ -348,7 +348,7 @@ impl super::Service {
|
|||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_resolve_error(e: &ResolveError, host: &'_ str) -> Result<()> {
|
fn handle_resolve_error(e: &ResolveError, host: &'_ str) -> Result {
|
||||||
use hickory_resolver::{ResolveErrorKind::Proto, proto::ProtoErrorKind};
|
use hickory_resolver::{ResolveErrorKind::Proto, proto::ProtoErrorKind};
|
||||||
|
|
||||||
match e.kind() {
|
match e.kind() {
|
||||||
@@ -376,7 +376,7 @@ impl super::Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn validate_dest(&self, dest: &ServerName) -> Result<()> {
|
fn validate_dest(&self, dest: &ServerName) -> Result {
|
||||||
if dest == self.services.server.name && !self.services.server.config.federation_loopback {
|
if dest == self.services.server.name && !self.services.server.config.federation_loopback {
|
||||||
return Err!("Won't send federation request to ourselves");
|
return Err!("Won't send federation request to ourselves");
|
||||||
}
|
}
|
||||||
@@ -388,7 +388,7 @@ impl super::Service {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn validate_dest_ip_literal(&self, dest: &ServerName) -> Result<()> {
|
fn validate_dest_ip_literal(&self, dest: &ServerName) -> Result {
|
||||||
trace!("Destination is an IP literal, checking against IP range denylist.",);
|
trace!("Destination is an IP literal, checking against IP range denylist.",);
|
||||||
debug_assert!(
|
debug_assert!(
|
||||||
dest.is_ip_literal() || !IPAddress::is_valid(dest.host()),
|
dest.is_ip_literal() || !IPAddress::is_valid(dest.host()),
|
||||||
@@ -403,7 +403,7 @@ impl super::Service {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn validate_ip(&self, ip: &IPAddress) -> Result<()> {
|
pub(crate) fn validate_ip(&self, ip: &IPAddress) -> Result {
|
||||||
if !self.services.client.valid_cidr_range(ip) {
|
if !self.services.client.valid_cidr_range(ip) {
|
||||||
return Err!(BadServerResponse("Not allowed to send requests to this IP"));
|
return Err!(BadServerResponse("Not allowed to send requests to this IP"));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,12 +64,7 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn set_alias(
|
pub fn set_alias(&self, alias: &RoomAliasId, room_id: &RoomId, user_id: &UserId) -> Result {
|
||||||
&self,
|
|
||||||
alias: &RoomAliasId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
user_id: &UserId,
|
|
||||||
) -> Result<()> {
|
|
||||||
if alias == self.services.globals.admin_alias
|
if alias == self.services.globals.admin_alias
|
||||||
&& user_id != self.services.globals.server_user
|
&& user_id != self.services.globals.server_user
|
||||||
{
|
{
|
||||||
@@ -96,7 +91,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub async fn remove_alias(&self, alias: &RoomAliasId, user_id: &UserId) -> Result<()> {
|
pub async fn remove_alias(&self, alias: &RoomAliasId, user_id: &UserId) -> Result {
|
||||||
if !self.user_can_remove_alias(alias, user_id).await? {
|
if !self.user_can_remove_alias(alias, user_id).await? {
|
||||||
return Err!(Request(Forbidden("User is not permitted to remove this alias.")));
|
return Err!(Request(Forbidden("User is not permitted to remove this alias.")));
|
||||||
}
|
}
|
||||||
@@ -295,7 +290,7 @@ impl Service {
|
|||||||
&self,
|
&self,
|
||||||
room_alias: &RoomAliasId,
|
room_alias: &RoomAliasId,
|
||||||
appservice_info: &Option<RegistrationInfo>,
|
appservice_info: &Option<RegistrationInfo>,
|
||||||
) -> Result<()> {
|
) -> Result {
|
||||||
if !self
|
if !self
|
||||||
.services
|
.services
|
||||||
.globals
|
.globals
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ use super::ExtractBody;
|
|||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
|
#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
|
||||||
pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> {
|
pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result {
|
||||||
if self
|
if self
|
||||||
.services
|
.services
|
||||||
.state_cache
|
.state_cache
|
||||||
@@ -143,7 +143,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
|||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(skip(self, pdu), level = "debug")]
|
#[tracing::instrument(skip(self, pdu), level = "debug")]
|
||||||
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> {
|
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result {
|
||||||
let (room_id, event_id, value) = self
|
let (room_id, event_id, value) = self
|
||||||
.services
|
.services
|
||||||
.event_handler
|
.event_handler
|
||||||
|
|||||||
@@ -51,12 +51,7 @@ impl crate::Service for Service {
|
|||||||
impl Service {
|
impl Service {
|
||||||
/// Sets a user as typing until the timeout timestamp is reached or
|
/// Sets a user as typing until the timeout timestamp is reached or
|
||||||
/// roomtyping_remove is called.
|
/// roomtyping_remove is called.
|
||||||
pub async fn typing_add(
|
pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result {
|
||||||
&self,
|
|
||||||
user_id: &UserId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
timeout: u64,
|
|
||||||
) -> Result<()> {
|
|
||||||
debug_info!("typing started {user_id:?} in {room_id:?} timeout:{timeout:?}");
|
debug_info!("typing started {user_id:?} in {room_id:?} timeout:{timeout:?}");
|
||||||
// update clients
|
// update clients
|
||||||
self.typing
|
self.typing
|
||||||
@@ -89,7 +84,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Removes a user from typing before the timeout is reached.
|
/// Removes a user from typing before the timeout is reached.
|
||||||
pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
|
pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result {
|
||||||
debug_info!("typing stopped {user_id:?} in {room_id:?}");
|
debug_info!("typing stopped {user_id:?} in {room_id:?}");
|
||||||
// update clients
|
// update clients
|
||||||
self.typing
|
self.typing
|
||||||
@@ -131,7 +126,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Makes sure that typing events with old timestamps get removed.
|
/// Makes sure that typing events with old timestamps get removed.
|
||||||
async fn typings_maintain(&self, room_id: &RoomId) -> Result<()> {
|
async fn typings_maintain(&self, room_id: &RoomId) -> Result {
|
||||||
let current_timestamp = utils::millis_since_unix_epoch();
|
let current_timestamp = utils::millis_since_unix_epoch();
|
||||||
let mut removable = Vec::new();
|
let mut removable = Vec::new();
|
||||||
|
|
||||||
@@ -226,12 +221,7 @@ impl Service {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn federation_send(
|
async fn federation_send(&self, room_id: &RoomId, user_id: &UserId, typing: bool) -> Result {
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
user_id: &UserId,
|
|
||||||
typing: bool,
|
|
||||||
) -> Result<()> {
|
|
||||||
debug_assert!(
|
debug_assert!(
|
||||||
self.services.globals.user_is_local(user_id),
|
self.services.globals.user_is_local(user_id),
|
||||||
"tried to broadcast typing status of remote user",
|
"tried to broadcast typing status of remote user",
|
||||||
|
|||||||
@@ -275,7 +275,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, room_id), level = "debug")]
|
#[tracing::instrument(skip(self, room_id), level = "debug")]
|
||||||
pub async fn flush_room(&self, room_id: &RoomId) -> Result<()> {
|
pub async fn flush_room(&self, room_id: &RoomId) -> Result {
|
||||||
let servers = self
|
let servers = self
|
||||||
.services
|
.services
|
||||||
.state_cache
|
.state_cache
|
||||||
@@ -286,7 +286,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, servers), level = "debug")]
|
#[tracing::instrument(skip(self, servers), level = "debug")]
|
||||||
pub async fn flush_servers<'a, S>(&self, servers: S) -> Result<()>
|
pub async fn flush_servers<'a, S>(&self, servers: S) -> Result
|
||||||
where
|
where
|
||||||
S: Stream<Item = &'a ServerName> + Send + 'a,
|
S: Stream<Item = &'a ServerName> + Send + 'a,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ pub(crate) trait Service: Any + Send + Sync {
|
|||||||
|
|
||||||
/// Implement the service's worker loop. The service manager spawns a
|
/// Implement the service's worker loop. The service manager spawns a
|
||||||
/// task and calls this function after all services have been built.
|
/// task and calls this function after all services have been built.
|
||||||
async fn worker(self: Arc<Self>) -> Result<()> { Ok(()) }
|
async fn worker(self: Arc<Self>) -> Result { Ok(()) }
|
||||||
|
|
||||||
/// Interrupt the service. This is sent to initiate a graceful shutdown.
|
/// Interrupt the service. This is sent to initiate a graceful shutdown.
|
||||||
/// The service worker should return from its work loop.
|
/// The service worker should return from its work loop.
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ impl Services {
|
|||||||
debug_info!("Services shutdown complete.");
|
debug_info!("Services shutdown complete.");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn poll(&self) -> Result<()> {
|
pub async fn poll(&self) -> Result {
|
||||||
if let Some(manager) = self.manager.lock().await.as_ref() {
|
if let Some(manager) = self.manager.lock().await.as_ref() {
|
||||||
return manager.poll().await;
|
return manager.poll().await;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user