diff --git a/Cargo.lock b/Cargo.lock index ccc93dc..89ee505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3591,7 +3591,7 @@ dependencies = [ [[package]] name = "sunbeam-sdk" -version = "0.7.0" +version = "0.8.0" dependencies = [ "base64", "bytes", diff --git a/sunbeam-sdk/Cargo.toml b/sunbeam-sdk/Cargo.toml index b063c2e..665881e 100644 --- a/sunbeam-sdk/Cargo.toml +++ b/sunbeam-sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sunbeam-sdk" -version = "0.8.0" +version = "0.9.0" edition = "2024" description = "Sunbeam SDK — reusable library for cluster management" repository = "https://src.sunbeam.pt/studio/cli" diff --git a/sunbeam-sdk/src/media/mod.rs b/sunbeam-sdk/src/media/mod.rs new file mode 100644 index 0000000..c7c9c8d --- /dev/null +++ b/sunbeam-sdk/src/media/mod.rs @@ -0,0 +1,308 @@ +//! LiveKit media service client. + +pub mod types; + +use crate::client::{AuthMethod, HttpTransport, ServiceClient}; +use crate::error::{Result, SunbeamError}; +use base64::Engine; +use reqwest::Method; +use types::*; + +/// Client for the LiveKit Twirp API. +pub struct LiveKitClient { + pub(crate) transport: HttpTransport, +} + +impl ServiceClient for LiveKitClient { + fn service_name(&self) -> &'static str { + "livekit" + } + + fn base_url(&self) -> &str { + &self.transport.base_url + } + + fn from_parts(base_url: String, auth: AuthMethod) -> Self { + Self { + transport: HttpTransport::new(&base_url, auth), + } + } +} + +impl LiveKitClient { + /// Build a LiveKitClient from domain (e.g. `https://livekit.{domain}`). + pub fn connect(domain: &str) -> Self { + let base_url = format!("https://livekit.{domain}"); + Self::from_parts(base_url, AuthMethod::Bearer(String::new())) + } + + /// Replace the auth method. + pub fn set_auth(&mut self, auth: AuthMethod) { + self.transport.set_auth(auth); + } + + // -- Rooms --------------------------------------------------------------- + + /// Create a room. + pub async fn create_room(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.RoomService/CreateRoom", body).await + } + + /// List all rooms. + pub async fn list_rooms(&self) -> Result { + self.twirp("livekit.RoomService/ListRooms", &serde_json::json!({})) + .await + } + + /// Delete a room. + pub async fn delete_room(&self, body: &(impl serde::Serialize + Sync)) -> Result<()> { + self.twirp_send("livekit.RoomService/DeleteRoom", body) + .await + } + + /// Update room metadata. + pub async fn update_room_metadata(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.RoomService/UpdateRoomMetadata", body) + .await + } + + /// Send data to a room. + pub async fn send_data(&self, body: &(impl serde::Serialize + Sync)) -> Result<()> { + self.twirp_send("livekit.RoomService/SendData", body).await + } + + // -- Participants -------------------------------------------------------- + + /// List participants in a room. + pub async fn list_participants( + &self, + body: &(impl serde::Serialize + Sync), + ) -> Result { + self.twirp("livekit.RoomService/ListParticipants", body) + .await + } + + /// Get a single participant. + pub async fn get_participant(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.RoomService/GetParticipant", body) + .await + } + + /// Remove a participant from a room. + pub async fn remove_participant(&self, body: &(impl serde::Serialize + Sync)) -> Result<()> { + self.twirp_send("livekit.RoomService/RemoveParticipant", body) + .await + } + + /// Update a participant. + pub async fn update_participant( + &self, + body: &(impl serde::Serialize + Sync), + ) -> Result { + self.twirp("livekit.RoomService/UpdateParticipant", body) + .await + } + + /// Mute a published track. + pub async fn mute_track(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.RoomService/MutePublishedTrack", body) + .await + } + + // -- Egress -------------------------------------------------------------- + + /// Start a room composite egress. + pub async fn start_room_composite_egress( + &self, + body: &(impl serde::Serialize + Sync), + ) -> Result { + self.twirp("livekit.Egress/StartRoomCompositeEgress", body) + .await + } + + /// Start a track egress. + pub async fn start_track_egress(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.Egress/StartTrackEgress", body).await + } + + /// List egress sessions. + pub async fn list_egress(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.Egress/ListEgress", body).await + } + + /// Stop an egress session. + pub async fn stop_egress(&self, body: &(impl serde::Serialize + Sync)) -> Result { + self.twirp("livekit.Egress/StopEgress", body).await + } + + // -- Token --------------------------------------------------------------- + + /// Generate a LiveKit access token (JWT signed with HMAC-SHA256). + /// + /// - `api_key`: LiveKit API key (used as `iss` claim). + /// - `api_secret`: LiveKit API secret (HMAC key). + /// - `identity`: participant identity (used as `sub` claim). + /// - `grants`: video grant permissions. + /// - `ttl_secs`: token lifetime in seconds. + pub fn generate_access_token( + api_key: &str, + api_secret: &str, + identity: &str, + grants: &VideoGrants, + ttl_secs: u64, + ) -> Result { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + let b64 = base64::engine::general_purpose::URL_SAFE_NO_PAD; + + let header = serde_json::json!({"alg": "HS256", "typ": "JWT"}); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(|e| SunbeamError::Other(format!("system time error: {e}")))? + .as_secs(); + + let claims = serde_json::json!({ + "iss": api_key, + "sub": identity, + "nbf": now, + "exp": now + ttl_secs, + "video": grants, + }); + + let header_b64 = b64.encode(serde_json::to_vec(&header)?); + let claims_b64 = b64.encode(serde_json::to_vec(&claims)?); + let signing_input = format!("{header_b64}.{claims_b64}"); + + let mut mac = Hmac::::new_from_slice(api_secret.as_bytes()) + .map_err(|e| SunbeamError::Other(format!("HMAC key error: {e}")))?; + mac.update(signing_input.as_bytes()); + let signature = b64.encode(mac.finalize().into_bytes()); + + Ok(format!("{signing_input}.{signature}")) + } + + // -- Internal helpers ---------------------------------------------------- + + /// Twirp POST that returns a parsed JSON response. + async fn twirp( + &self, + method: &str, + body: &(impl serde::Serialize + Sync), + ) -> Result { + self.transport + .json(Method::POST, &format!("twirp/{method}"), Some(body), method) + .await + } + + /// Twirp POST that discards the response body. + async fn twirp_send(&self, method: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> { + self.transport + .send(Method::POST, &format!("twirp/{method}"), Some(body), method) + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connect_url() { + let c = LiveKitClient::connect("sunbeam.pt"); + assert_eq!(c.base_url(), "https://livekit.sunbeam.pt"); + assert_eq!(c.service_name(), "livekit"); + } + + #[test] + fn test_from_parts() { + let c = LiveKitClient::from_parts( + "http://localhost:7880".into(), + AuthMethod::Bearer("test-token".into()), + ); + assert_eq!(c.base_url(), "http://localhost:7880"); + } + + #[test] + fn test_generate_access_token() { + let grants = VideoGrants { + room_join: Some(true), + room: Some("test-room".into()), + can_publish: Some(true), + can_subscribe: Some(true), + ..Default::default() + }; + let token = LiveKitClient::generate_access_token( + "api-key", + "api-secret", + "user-1", + &grants, + 3600, + ) + .unwrap(); + + // JWT has three dot-separated parts + let parts: Vec<&str> = token.split('.').collect(); + assert_eq!(parts.len(), 3); + + // Verify header + let b64 = base64::engine::general_purpose::URL_SAFE_NO_PAD; + let header_bytes = b64.decode(parts[0]).unwrap(); + let header: serde_json::Value = serde_json::from_slice(&header_bytes).unwrap(); + assert_eq!(header["alg"], "HS256"); + assert_eq!(header["typ"], "JWT"); + + // Verify claims + let claims_bytes = b64.decode(parts[1]).unwrap(); + let claims: serde_json::Value = serde_json::from_slice(&claims_bytes).unwrap(); + assert_eq!(claims["iss"], "api-key"); + assert_eq!(claims["sub"], "user-1"); + assert!(claims["exp"].as_u64().unwrap() > claims["nbf"].as_u64().unwrap()); + assert_eq!(claims["video"]["roomJoin"], true); + assert_eq!(claims["video"]["room"], "test-room"); + } + + #[test] + fn test_generate_access_token_signature_valid() { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + let grants = VideoGrants { + room_create: Some(true), + ..Default::default() + }; + let secret = "my-secret-key"; + let token = + LiveKitClient::generate_access_token("key", secret, "id", &grants, 600).unwrap(); + + let parts: Vec<&str> = token.split('.').collect(); + let signing_input = format!("{}.{}", parts[0], parts[1]); + let b64 = base64::engine::general_purpose::URL_SAFE_NO_PAD; + let sig_bytes = b64.decode(parts[2]).unwrap(); + + let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(signing_input.as_bytes()); + assert!(mac.verify_slice(&sig_bytes).is_ok()); + } + + #[tokio::test] + async fn test_list_rooms_unreachable() { + let c = LiveKitClient::from_parts( + "http://127.0.0.1:19998".into(), + AuthMethod::Bearer("tok".into()), + ); + let result = c.list_rooms().await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_create_room_unreachable() { + let c = LiveKitClient::from_parts( + "http://127.0.0.1:19998".into(), + AuthMethod::Bearer("tok".into()), + ); + let body = serde_json::json!({"name": "test-room"}); + let result = c.create_room(&body).await; + assert!(result.is_err()); + } +} diff --git a/sunbeam-sdk/src/media/types.rs b/sunbeam-sdk/src/media/types.rs new file mode 100644 index 0000000..2d69822 --- /dev/null +++ b/sunbeam-sdk/src/media/types.rs @@ -0,0 +1,212 @@ +//! LiveKit media service types. + +use serde::{Deserialize, Deserializer, Serialize}; + +/// Deserialize a value that may be either a string or an integer as i64. +fn deserialize_string_or_i64<'de, D: Deserializer<'de>>(d: D) -> Result, D::Error> { + let v: Option = Option::deserialize(d)?; + match v { + None | Some(serde_json::Value::Null) => Ok(None), + Some(serde_json::Value::Number(n)) => Ok(n.as_i64()), + Some(serde_json::Value::String(s)) => Ok(s.parse().ok()), + _ => Ok(None), + } +} + +/// A LiveKit room. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Room { + #[serde(default)] + pub sid: String, + #[serde(default)] + pub name: String, + #[serde(default)] + pub empty_timeout: Option, + #[serde(default)] + pub max_participants: Option, + #[serde(default, deserialize_with = "deserialize_string_or_i64")] + pub creation_time: Option, + #[serde(default)] + pub metadata: Option, + #[serde(default)] + pub num_participants: Option, + #[serde(default)] + pub num_publishers: Option, +} + +/// Response from ListRooms. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListRoomsResponse { + #[serde(default)] + pub rooms: Vec, +} + +/// Participant information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParticipantInfo { + #[serde(default)] + pub sid: String, + #[serde(default)] + pub identity: String, + #[serde(default)] + pub name: Option, + #[serde(default)] + pub state: Option, + #[serde(default)] + pub metadata: Option, + #[serde(default)] + pub joined_at: Option, + #[serde(default)] + pub is_publisher: Option, +} + +/// Response from ListParticipants. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListParticipantsResponse { + #[serde(default)] + pub participants: Vec, +} + +/// Response from MutePublishedTrack. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MuteTrackResponse { + #[serde(default)] + pub track: Option, +} + +/// Egress information. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EgressInfo { + #[serde(default)] + pub egress_id: String, + #[serde(default)] + pub room_id: Option, + #[serde(default)] + pub room_name: Option, + #[serde(default)] + pub status: Option, + #[serde(default)] + pub started_at: Option, + #[serde(default)] + pub ended_at: Option, + #[serde(default)] + pub error: Option, +} + +/// Response from ListEgress. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListEgressResponse { + #[serde(default)] + pub items: Vec, +} + +/// Video grant claims for access tokens. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct VideoGrants { + #[serde(default, skip_serializing_if = "Option::is_none", rename = "roomCreate")] + pub room_create: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "roomList")] + pub room_list: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "roomJoin")] + pub room_join: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub room: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "canPublish")] + pub can_publish: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "canSubscribe")] + pub can_subscribe: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "canPublishData")] + pub can_publish_data: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "roomAdmin")] + pub room_admin: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "roomRecord")] + pub room_record: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_room_roundtrip() { + let json = serde_json::json!({ + "sid": "RM_abc123", + "name": "my-room", + "max_participants": 50, + "creation_time": 1700000000i64, + "num_participants": 3 + }); + let room: Room = serde_json::from_value(json).unwrap(); + assert_eq!(room.sid, "RM_abc123"); + assert_eq!(room.name, "my-room"); + assert_eq!(room.max_participants, Some(50)); + } + + #[test] + fn test_list_rooms_response() { + let json = serde_json::json!({ + "rooms": [ + {"sid": "RM_1", "name": "room-1"}, + {"sid": "RM_2", "name": "room-2"} + ] + }); + let resp: ListRoomsResponse = serde_json::from_value(json).unwrap(); + assert_eq!(resp.rooms.len(), 2); + } + + #[test] + fn test_participant_info() { + let json = serde_json::json!({ + "sid": "PA_abc", + "identity": "user@example.com", + "name": "Alice", + "is_publisher": true + }); + let p: ParticipantInfo = serde_json::from_value(json).unwrap(); + assert_eq!(p.identity, "user@example.com"); + assert_eq!(p.is_publisher, Some(true)); + } + + #[test] + fn test_egress_info() { + let json = serde_json::json!({ + "egress_id": "EG_abc", + "room_name": "my-room", + "status": 1, + "started_at": 1700000000i64 + }); + let e: EgressInfo = serde_json::from_value(json).unwrap(); + assert_eq!(e.egress_id, "EG_abc"); + } + + #[test] + fn test_video_grants_serialization() { + let grants = VideoGrants { + room_create: Some(true), + room_join: Some(true), + room: Some("my-room".into()), + can_publish: Some(true), + can_subscribe: Some(true), + ..Default::default() + }; + let json = serde_json::to_value(&grants).unwrap(); + assert_eq!(json["roomCreate"], true); + assert_eq!(json["roomJoin"], true); + assert!(json.get("roomList").is_none()); + assert!(json.get("canPublishData").is_none()); + } + + #[test] + fn test_empty_list_rooms() { + let json = serde_json::json!({}); + let resp: ListRoomsResponse = serde_json::from_value(json).unwrap(); + assert!(resp.rooms.is_empty()); + } + + #[test] + fn test_mute_track_response() { + let json = serde_json::json!({"track": {"sid": "TR_abc"}}); + let r: MuteTrackResponse = serde_json::from_value(json).unwrap(); + assert!(r.track.is_some()); + } +}