feat(workflows.yaml): shared_volume + shell config; fix(wfe-server): log_search probe + webhook tests
- workflows.yaml: declare `shared_volume: { mount_path: /workspace,
size: 30Gi }` on the ci workflow so all sub-workflows share a PVC;
set `shell: /bin/bash` on ci_config/ci_long_config anchors.
- log_search.rs: fix opensearch_url() TCP probe to resolve hostnames
(not just IPs); make ensure_index handle resource_already_exists
races gracefully.
- webhook.rs: 14 new handler-level tests covering generic event auth
(accept/reject/missing), GitHub/Gitea HMAC verification, bad JSON
400s, trigger matching, trigger ref-mismatch skip, and real
workflow-start side effect verification.
This commit is contained in:
@@ -100,6 +100,13 @@ impl LogSearchIndex {
|
||||
|
||||
if !response.status_code().is_success() {
|
||||
let text = response.text().await.unwrap_or_default();
|
||||
// Race: another caller created the index between our
|
||||
// `exists` probe and the `create` call. OpenSearch returns
|
||||
// a 400 with `resource_already_exists_exception`; treat that
|
||||
// as a successful no-op rather than failing the call.
|
||||
if text.contains("resource_already_exists_exception") {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(wfe_core::WfeError::Persistence(format!(
|
||||
"Failed to create log index: {text}"
|
||||
)));
|
||||
@@ -306,15 +313,19 @@ mod tests {
|
||||
fn opensearch_url() -> Option<String> {
|
||||
let url =
|
||||
std::env::var("WFE_SEARCH_URL").unwrap_or_else(|_| "http://localhost:9200".to_string());
|
||||
// Quick TCP probe to check if OpenSearch is reachable.
|
||||
let addr = url
|
||||
// Quick TCP probe to check if OpenSearch is reachable. Use
|
||||
// `to_socket_addrs` so hostnames resolve — the previous
|
||||
// implementation parsed `"localhost:9200"` as a SocketAddr, which
|
||||
// fails (hostnames aren't valid SocketAddrs), silently skipping
|
||||
// every OpenSearch test even when the daemon was available.
|
||||
use std::net::ToSocketAddrs;
|
||||
let host_port = url
|
||||
.strip_prefix("http://")
|
||||
.or_else(|| url.strip_prefix("https://"))
|
||||
.unwrap_or("localhost:9200");
|
||||
match std::net::TcpStream::connect_timeout(
|
||||
&addr.parse().ok()?,
|
||||
std::time::Duration::from_secs(1),
|
||||
) {
|
||||
let mut addrs = host_port.to_socket_addrs().ok()?;
|
||||
let addr = addrs.next()?;
|
||||
match std::net::TcpStream::connect_timeout(&addr, std::time::Duration::from_secs(1)) {
|
||||
Ok(_) => Some(url),
|
||||
Err(_) => None,
|
||||
}
|
||||
|
||||
@@ -561,4 +561,298 @@ mod tests {
|
||||
let data = map_trigger_data(&trigger, &payload);
|
||||
assert!(data.get("missing").is_none());
|
||||
}
|
||||
|
||||
// ─── Handler-level coverage ──────────────────────────────────────
|
||||
//
|
||||
// The block below exercises handle_generic_event / handle_github_webhook /
|
||||
// handle_gitea_webhook directly with an in-memory WorkflowHost to cover
|
||||
// auth branches, signature verification, JSON parse errors, trigger
|
||||
// matching, and the happy path that fires a workflow start.
|
||||
|
||||
use crate::config::{ServerConfig, WebhookConfig};
|
||||
use std::sync::Arc;
|
||||
use wfe::WorkflowHostBuilder;
|
||||
use wfe_core::test_support::{
|
||||
InMemoryLockProvider, InMemoryPersistenceProvider, InMemoryQueueProvider,
|
||||
};
|
||||
|
||||
async fn test_webhook_state() -> WebhookState {
|
||||
let persistence = Arc::new(InMemoryPersistenceProvider::new());
|
||||
let lock = Arc::new(InMemoryLockProvider::new());
|
||||
let queue = Arc::new(InMemoryQueueProvider::new());
|
||||
let host = WorkflowHostBuilder::new()
|
||||
.use_persistence(persistence as Arc<dyn wfe_core::traits::PersistenceProvider>)
|
||||
.use_lock_provider(lock as Arc<dyn wfe_core::traits::DistributedLockProvider>)
|
||||
.use_queue_provider(queue as Arc<dyn wfe_core::traits::QueueProvider>)
|
||||
.build()
|
||||
.unwrap();
|
||||
host.start().await.unwrap();
|
||||
|
||||
WebhookState {
|
||||
host: Arc::new(host),
|
||||
config: ServerConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn test_state_with_secret(source: &str, secret: &str) -> WebhookState {
|
||||
let mut state = test_webhook_state().await;
|
||||
state
|
||||
.config
|
||||
.auth
|
||||
.webhook_secrets
|
||||
.insert(source.to_string(), secret.to_string());
|
||||
state
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn health_check_always_ok() {
|
||||
let resp = health_check().await.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn generic_event_no_auth_configured_publishes() {
|
||||
let state = test_webhook_state().await;
|
||||
let payload = GenericEventPayload {
|
||||
event_name: "order.paid".into(),
|
||||
event_key: "42".into(),
|
||||
data: Some(serde_json::json!({"amount": 99})),
|
||||
};
|
||||
let resp = handle_generic_event(State(state), HeaderMap::new(), Json(payload))
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn generic_event_with_static_token_accepts_valid_bearer() {
|
||||
let mut state = test_webhook_state().await;
|
||||
state.config.auth.tokens = vec!["the-token".into()];
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("authorization", "Bearer the-token".parse().unwrap());
|
||||
let payload = GenericEventPayload {
|
||||
event_name: "evt".into(),
|
||||
event_key: "k".into(),
|
||||
data: None,
|
||||
};
|
||||
let resp = handle_generic_event(State(state), headers, Json(payload))
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn generic_event_with_static_token_rejects_bad_bearer() {
|
||||
let mut state = test_webhook_state().await;
|
||||
state.config.auth.tokens = vec!["the-token".into()];
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("authorization", "Bearer wrong".parse().unwrap());
|
||||
let payload = GenericEventPayload {
|
||||
event_name: "evt".into(),
|
||||
event_key: "k".into(),
|
||||
data: None,
|
||||
};
|
||||
let resp = handle_generic_event(State(state), headers, Json(payload))
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn generic_event_with_static_token_rejects_missing_header() {
|
||||
let mut state = test_webhook_state().await;
|
||||
state.config.auth.tokens = vec!["t".into()];
|
||||
let payload = GenericEventPayload {
|
||||
event_name: "evt".into(),
|
||||
event_key: "k".into(),
|
||||
data: None,
|
||||
};
|
||||
let resp = handle_generic_event(State(state), HeaderMap::new(), Json(payload))
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn github_webhook_accepts_unauthenticated_when_no_secret() {
|
||||
let state = test_webhook_state().await;
|
||||
let body = Bytes::from(
|
||||
r#"{"ref":"refs/heads/main","head_commit":{"id":"deadbeef"},"repository":{"full_name":"me/repo"}}"#,
|
||||
);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-github-event", "push".parse().unwrap());
|
||||
let resp = handle_github_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn github_webhook_rejects_bad_signature() {
|
||||
let state = test_state_with_secret("github", "supersecret").await;
|
||||
let body = Bytes::from(r#"{"ref":"refs/heads/main"}"#);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-github-event", "push".parse().unwrap());
|
||||
headers.insert("x-hub-signature-256", "sha256=invalid".parse().unwrap());
|
||||
let resp = handle_github_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn github_webhook_accepts_valid_signature() {
|
||||
let state = test_state_with_secret("github", "s3cret").await;
|
||||
let body_bytes = r#"{"ref":"refs/heads/main","repository":{"full_name":"me/repo"}}"#;
|
||||
let body = Bytes::from(body_bytes);
|
||||
let mut mac = HmacSha256::new_from_slice(b"s3cret").unwrap();
|
||||
mac.update(body_bytes.as_bytes());
|
||||
let sig = format!("sha256={}", hex::encode(mac.finalize().into_bytes()));
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-github-event", "push".parse().unwrap());
|
||||
headers.insert("x-hub-signature-256", sig.parse().unwrap());
|
||||
let resp = handle_github_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn github_webhook_returns_400_on_bad_json() {
|
||||
let state = test_webhook_state().await;
|
||||
let body = Bytes::from("not { valid } json");
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-github-event", "push".parse().unwrap());
|
||||
let resp = handle_github_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn github_webhook_fires_matching_trigger() {
|
||||
// Define a workflow that the webhook can start, then wire a trigger
|
||||
// that matches the incoming push. Confirm a workflow instance was
|
||||
// created on the host.
|
||||
let mut state = test_webhook_state().await;
|
||||
|
||||
// Register the workflow definition so start_workflow can find it.
|
||||
// The step has no registered factory, which is fine — the workflow
|
||||
// reaches runnable state and the background executor would fail on
|
||||
// first run, but handle_github_webhook only cares that
|
||||
// `host.start_workflow` succeeds in creating the instance.
|
||||
let mut def = wfe_core::models::WorkflowDefinition::new("ci", 1);
|
||||
let mut s0 = wfe_core::models::WorkflowStep::new(0, "noop");
|
||||
s0.outcomes.push(wfe_core::models::StepOutcome {
|
||||
next_step: 0,
|
||||
label: None,
|
||||
value: None,
|
||||
});
|
||||
def.steps.push(s0);
|
||||
state.host.register_workflow_definition(def).await;
|
||||
|
||||
state.config.webhook = WebhookConfig {
|
||||
triggers: vec![WebhookTrigger {
|
||||
source: "github".into(),
|
||||
event: "push".into(),
|
||||
match_ref: Some("refs/heads/main".into()),
|
||||
workflow_id: "ci".into(),
|
||||
version: 1,
|
||||
data_mapping: [("repo".into(), "$.repository.full_name".into())].into(),
|
||||
}],
|
||||
};
|
||||
|
||||
let body = Bytes::from(r#"{"ref":"refs/heads/main","repository":{"full_name":"me/repo"}}"#);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-github-event", "push".parse().unwrap());
|
||||
|
||||
let host = state.host.clone();
|
||||
let resp = handle_github_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
|
||||
// At least one `ci` instance should now exist. The webhook
|
||||
// handler logs the started id, so we just confirm the side
|
||||
// effect via get_workflow by name fallback.
|
||||
let ci1 = host.get_workflow("ci-1").await;
|
||||
assert!(ci1.is_ok(), "expected ci-1 to exist after webhook trigger");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn github_webhook_trigger_skips_non_matching_ref() {
|
||||
let mut state = test_webhook_state().await;
|
||||
state.config.webhook = WebhookConfig {
|
||||
triggers: vec![WebhookTrigger {
|
||||
source: "github".into(),
|
||||
event: "push".into(),
|
||||
match_ref: Some("refs/heads/release".into()),
|
||||
workflow_id: "ci".into(),
|
||||
version: 1,
|
||||
data_mapping: Default::default(),
|
||||
}],
|
||||
};
|
||||
|
||||
let body = Bytes::from(r#"{"ref":"refs/heads/main","repository":{"full_name":"me/repo"}}"#);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-github-event", "push".parse().unwrap());
|
||||
|
||||
let host = state.host.clone();
|
||||
let resp = handle_github_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
|
||||
// No workflow should have been started — trigger.match_ref didn't match.
|
||||
let none = host.get_workflow("ci-1").await;
|
||||
assert!(none.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gitea_webhook_accepts_raw_hex_signature() {
|
||||
let state = test_state_with_secret("gitea", "gitkey").await;
|
||||
let body_bytes = r#"{"ref":"refs/heads/main","repository":{"full_name":"me/repo"}}"#;
|
||||
let body = Bytes::from(body_bytes);
|
||||
let mut mac = HmacSha256::new_from_slice(b"gitkey").unwrap();
|
||||
mac.update(body_bytes.as_bytes());
|
||||
let sig = hex::encode(mac.finalize().into_bytes());
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-gitea-event", "push".parse().unwrap());
|
||||
headers.insert("x-gitea-signature", sig.parse().unwrap());
|
||||
let resp = handle_gitea_webhook(State(state), headers, body)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gitea_webhook_rejects_bad_signature() {
|
||||
let state = test_state_with_secret("gitea", "gitkey").await;
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-gitea-event", "push".parse().unwrap());
|
||||
headers.insert("x-gitea-signature", "totallybogus".parse().unwrap());
|
||||
let resp = handle_gitea_webhook(State(state), headers, Bytes::from(r#"{}"#))
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gitea_webhook_returns_400_on_bad_json() {
|
||||
let state = test_webhook_state().await;
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-gitea-event", "push".parse().unwrap());
|
||||
let resp = handle_gitea_webhook(State(state), headers, Bytes::from("not-json"))
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
// Sanity: the WebhookConfig must exist in ServerConfig for these tests
|
||||
// to compile.
|
||||
#[allow(dead_code)]
|
||||
fn _assert_webhook_config_type() -> WebhookConfig {
|
||||
WebhookConfig::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,8 +56,11 @@ _templates:
|
||||
BUILDKIT_CLIENT_KEY: ${WFE_BUILDKIT_CLIENT_KEY}
|
||||
|
||||
# Default config for short CI steps (4Gi memory, 30min timeout).
|
||||
# `shell: /bin/bash` because the step scripts use `set -o pipefail`,
|
||||
# arrays, and other bashisms the default `/bin/sh` (dash) doesn't support.
|
||||
ci_config: &ci_config
|
||||
image: src.sunbeam.pt/studio/wfe-ci:latest
|
||||
shell: /bin/bash
|
||||
memory: 4Gi
|
||||
cpu: "2"
|
||||
timeout: 30m
|
||||
@@ -66,6 +69,7 @@ _templates:
|
||||
# Default config for long-running CI steps (8Gi memory, 60min timeout).
|
||||
ci_long_config: &ci_long_config
|
||||
image: src.sunbeam.pt/studio/wfe-ci:latest
|
||||
shell: /bin/bash
|
||||
memory: 8Gi
|
||||
cpu: "4"
|
||||
timeout: 60m
|
||||
@@ -724,6 +728,14 @@ workflows:
|
||||
- id: ci
|
||||
name: Continuous Integration
|
||||
version: 1
|
||||
# Shared persistent volume: every step in this ci run — including every
|
||||
# sub-workflow kicked off via `type: workflow` — mounts the same PVC at
|
||||
# /workspace. The `clone` step in checkout puts the repo there and the
|
||||
# lint/test/cover/image/release sub-workflows all see it. Size is tuned
|
||||
# to fit a full `target/` build + sccache copy with headroom.
|
||||
shared_volume:
|
||||
mount_path: /workspace
|
||||
size: 30Gi
|
||||
inputs:
|
||||
repo_url: string
|
||||
commit_sha: string
|
||||
|
||||
Reference in New Issue
Block a user