feat!: add client.chat_stream() method

BREAKING CHANGE: You can't set the `stream` option for `client.chat*()`.

Either use `client.chat_stream()` if you want to use streams
or use `client.chat()` / `client.chat_async()` otherwise.
This commit is contained in:
Ivan Gabriele
2024-03-04 08:16:06 +01:00
parent f91e794d71
commit 4a4219d3ea
7 changed files with 252 additions and 46 deletions

View File

@@ -2,6 +2,25 @@ use serde::{Deserialize, Serialize};
use crate::v1::{common, constants};
// -----------------------------------------------------------------------------
// Definitions
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ChatMessage {
pub role: ChatMessageRole,
pub content: String,
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[allow(non_camel_case_types)]
pub enum ChatMessageRole {
assistant,
user,
}
// -----------------------------------------------------------------------------
// Request
#[derive(Debug)]
pub struct ChatCompletionParams {
pub tools: Option<String>,
@@ -9,7 +28,6 @@ pub struct ChatCompletionParams {
pub max_tokens: Option<u32>,
pub top_p: Option<f32>,
pub random_seed: Option<u32>,
pub stream: Option<bool>,
pub safe_prompt: Option<bool>,
}
impl Default for ChatCompletionParams {
@@ -20,7 +38,6 @@ impl Default for ChatCompletionParams {
max_tokens: None,
top_p: None,
random_seed: None,
stream: None,
safe_prompt: None,
}
}
@@ -28,7 +45,7 @@ impl Default for ChatCompletionParams {
#[derive(Debug, Serialize, Deserialize)]
pub struct ChatCompletionRequest {
pub messages: Vec<ChatCompletionMessage>,
pub messages: Vec<ChatMessage>,
pub model: constants::Model,
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<String>,
@@ -40,8 +57,7 @@ pub struct ChatCompletionRequest {
pub top_p: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub random_seed: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<bool>,
pub stream: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub safe_prompt: Option<bool>,
// TODO Check this prop (seen in official Python client but not in API doc).
@@ -52,7 +68,8 @@ pub struct ChatCompletionRequest {
impl ChatCompletionRequest {
pub fn new(
model: constants::Model,
messages: Vec<ChatCompletionMessage>,
messages: Vec<ChatMessage>,
stream: bool,
options: Option<ChatCompletionParams>,
) -> Self {
let ChatCompletionParams {
@@ -61,7 +78,6 @@ impl ChatCompletionRequest {
max_tokens,
top_p,
random_seed,
stream,
safe_prompt,
} = options.unwrap_or_default();
@@ -79,6 +95,9 @@ impl ChatCompletionRequest {
}
}
// -----------------------------------------------------------------------------
// Response
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ChatCompletionResponse {
pub id: String,
@@ -86,28 +105,45 @@ pub struct ChatCompletionResponse {
/// Unix timestamp (in seconds).
pub created: u32,
pub model: constants::Model,
pub choices: Vec<ChatCompletionChoice>,
pub choices: Vec<ChatCompletionResponseChoice>,
pub usage: common::ResponseUsage,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ChatCompletionChoice {
pub struct ChatCompletionResponseChoice {
pub index: u32,
pub message: ChatCompletionMessage,
pub message: ChatMessage,
pub finish_reason: String,
// TODO Check this prop (seen in API responses but undocumented).
// pub logprobs: ???
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ChatCompletionMessage {
pub role: ChatCompletionMessageRole,
pub content: String,
// -----------------------------------------------------------------------------
// Stream
#[derive(Debug, Deserialize)]
pub struct ChatCompletionStreamChunk {
pub id: String,
pub object: String,
/// Unix timestamp (in seconds).
pub created: u32,
pub model: constants::Model,
pub choices: Vec<ChatCompletionStreamChunkChoice>,
// TODO Check this prop (seen in API responses but undocumented).
// pub usage: ???,
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[allow(non_camel_case_types)]
pub enum ChatCompletionMessageRole {
assistant,
user,
#[derive(Debug, Deserialize)]
pub struct ChatCompletionStreamChunkChoice {
pub index: u32,
pub delta: ChatCompletionStreamChunkChoiceDelta,
pub finish_reason: Option<String>,
// TODO Check this prop (seen in API responses but undocumented).
// pub logprobs: ???,
}
#[derive(Debug, Deserialize)]
pub struct ChatCompletionStreamChunkChoiceDelta {
pub role: Option<ChatMessageRole>,
pub content: String,
}

View File

@@ -1,9 +1,13 @@
use crate::v1::error::ApiError;
use futures::stream::StreamExt;
use futures::Stream;
use reqwest::Error as ReqwestError;
use serde_json::from_str;
use crate::v1::error::ApiError;
use crate::v1::{
chat_completion::{
ChatCompletionMessage, ChatCompletionParams, ChatCompletionRequest, ChatCompletionResponse,
ChatCompletionParams, ChatCompletionRequest, ChatCompletionResponse, ChatMessage,
},
constants::{EmbedModel, Model, API_URL_BASE},
embedding::{EmbeddingRequest, EmbeddingRequestOptions, EmbeddingResponse},
@@ -11,6 +15,8 @@ use crate::v1::{
model_list::ModelListResponse,
};
use super::chat_completion::ChatCompletionStreamChunk;
pub struct Client {
pub api_key: String,
pub endpoint: String,
@@ -44,10 +50,10 @@ impl Client {
pub fn chat(
&self,
model: Model,
messages: Vec<ChatCompletionMessage>,
messages: Vec<ChatMessage>,
options: Option<ChatCompletionParams>,
) -> Result<ChatCompletionResponse, ApiError> {
let request = ChatCompletionRequest::new(model, messages, options);
let request = ChatCompletionRequest::new(model, messages, false, options);
let response = self.post_sync("/chat/completions", &request)?;
let result = response.json::<ChatCompletionResponse>();
@@ -60,10 +66,10 @@ impl Client {
pub async fn chat_async(
&self,
model: Model,
messages: Vec<ChatCompletionMessage>,
messages: Vec<ChatMessage>,
options: Option<ChatCompletionParams>,
) -> Result<ChatCompletionResponse, ApiError> {
let request = ChatCompletionRequest::new(model, messages, options);
let request = ChatCompletionRequest::new(model, messages, false, options);
let response = self.post_async("/chat/completions", &request).await?;
let result = response.json::<ChatCompletionResponse>().await;
@@ -73,6 +79,50 @@ impl Client {
}
}
pub async fn chat_stream(
&self,
model: Model,
messages: Vec<ChatMessage>,
options: Option<ChatCompletionParams>,
) -> Result<impl Stream<Item = Result<ChatCompletionStreamChunk, ApiError>>, ApiError> {
let request = ChatCompletionRequest::new(model, messages, true, options);
let response = self
.post_stream("/chat/completions", &request)
.await
.map_err(|e| ApiError {
message: e.to_string(),
})?;
if !response.status().is_success() {
let status = response.status();
let text = response.text().await.unwrap_or_default();
return Err(ApiError {
message: format!("{}: {}", status, text),
});
}
let deserialized_stream =
response
.bytes_stream()
.map(|item| -> Result<ChatCompletionStreamChunk, ApiError> {
match item {
Ok(bytes) => {
let text = String::from_utf8(bytes.to_vec()).map_err(|e| ApiError {
message: e.to_string(),
})?;
let text_trimmed = text.trim_start_matches("data: ");
from_str(&text_trimmed).map_err(|e| ApiError {
message: e.to_string(),
})
}
Err(e) => Err(ApiError {
message: e.to_string(),
}),
}
});
Ok(deserialized_stream)
}
pub fn embeddings(
&self,
model: EmbedModel,
@@ -156,10 +206,25 @@ impl Client {
request_builder
}
fn build_request_stream(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
let user_agent = format!(
"ivangabriele/mistralai-client-rs/{}",
env!("CARGO_PKG_VERSION")
);
let request_builder = request
.bearer_auth(&self.api_key)
.header("Accept", "text/event-stream")
.header("Content-Type", "application/json")
.header("User-Agent", user_agent);
request_builder
}
fn get_sync(&self, path: &str) -> Result<reqwest::blocking::Response, ApiError> {
let client_sync = reqwest::blocking::Client::new();
let reqwest_client = reqwest::blocking::Client::new();
let url = format!("{}{}", self.endpoint, path);
let request = self.build_request_sync(client_sync.get(url));
let request = self.build_request_sync(reqwest_client.get(url));
let result = request.send();
match result {
@@ -263,6 +328,35 @@ impl Client {
}
}
async fn post_stream<T: serde::ser::Serialize + std::fmt::Debug>(
&self,
path: &str,
params: &T,
) -> Result<reqwest::Response, ApiError> {
let reqwest_client = reqwest::Client::new();
let url = format!("{}{}", self.endpoint, path);
let request_builder = reqwest_client.post(url).json(params);
let request = self.build_request_stream(request_builder);
let result = request.send().await;
match result {
Ok(response) => {
if response.status().is_success() {
Ok(response)
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(ApiError {
message: format!("{}: {}", status, text),
})
}
}
Err(error) => Err(ApiError {
message: error.to_string(),
}),
}
}
fn to_api_error(&self, err: ReqwestError) -> ApiError {
ApiError {
message: err.to_string(),