Créer un worker asynchrone de bout en bout (capstone)
Objectifs
À la fin de ce tutoriel, vous saurez :
- O1. Concevoir le modèle de données SeaORM pour une table de jobs avec une state machine à 5 états.
- O2. Implémenter la boucle d'un worker Tokio avec sémaphore de concurrence et timeout.
- O3. Tester l'ensemble du cycle (enqueue → exécution → SSE) avec des tests d'intégration hermétiques.
Pré-requis
- Technique : tutoriels 01 et 02 complétés ; notions de Tokio (async/await,
mpsc,spawn) ; PostgreSQL local ou testcontainer disponible. - Pédagogique : tutoriel
02-premiere-contributioncomplété. - Temps estimé : ~120 minutes.
Vue d'ensemble
Ce tutoriel est un capstone : vous combinez des recettes connues (migration, service, handler) sans guide pas-à-pas détaillé. L'objectif est de reproduire le patron du worker d'import (import_service + import_worker) pour un cas générique appelé notify_worker — un worker qui envoie des notifications externes de manière asynchrone.
flowchart LR
UI[Handler POST\n/notify] -->|create_job| DB[(notify_jobs)]
UI -->|try_send| Chan[mpsc NotifyTask]
UI -->|302| Page[GET /notify/:id]
Chan --> Worker[NotifyWorker\nSémaphore N]
Worker -->|update status| DB
Worker -->|HTTP POST externe| External[Endpoint distant]
Page -. SSE .-> DB
Le patron est identique à celui utilisé pour l'import de dépôts externe (voir crates/gitrust-core/src/services/import_service.rs) et pour le worker CI. Vous n'inventez rien : vous combinez des pièces existantes.
Modèle mental : pensez à une file de caissiers de supermarché. La file d'attente (
mpsc channel) accumule les clients (tâches). Le nombre de caisses ouvertes est le sémaphore. Chaque caissier (worker goroutine) traite un client et met à jour le tableau d'affichage (DB + SSE).
Étape 1 : Concevoir la state machine
Avant d'écrire une ligne de code, formalisez la state machine de votre job.
stateDiagram-v2
[*] --> pending : create_job()
pending --> running : worker démarre
pending --> cancelled : POST /cancel
running --> success : HTTP 200 reçu
running --> failed : erreur ou timeout
running --> cancelled : flag DB checked
Les 5 états sont : pending, running, success, failed, cancelled.
Règles de transition :
- Seul
pendingpeut être annulé avant démarrage. runningpeut être annulé via un flag en DB vérifié par le worker.successetfailedsont terminaux.
Checkpoint : dessinez la state machine sur papier avant de continuer. Répondez à : « que se passe-t-il si le serveur redémarre pendant un job running ? » (réponse : au redémarrage, les jobs running doivent être marqués failed avec le message « server restarted »).
Étape 2 : Créer la migration SeaORM
Créez le fichier de migration :
crates/gitrust-core/src/migrations/m20260501_000023_create_notify_jobs.rs
use sea_orm_migration::prelude::*; #[derive(DeriveMigrationName)] pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .create_table( Table::create() .table(NotifyJobs::Table) .if_not_exists() .col(ColumnDef::new(NotifyJobs::Id).uuid().not_null().primary_key()) .col(ColumnDef::new(NotifyJobs::OwnerId).uuid().not_null()) .col(ColumnDef::new(NotifyJobs::TargetUrl).text().not_null()) .col(ColumnDef::new(NotifyJobs::Payload).text().not_null()) .col( ColumnDef::new(NotifyJobs::Status) .string() .not_null() .default("pending"), ) .col(ColumnDef::new(NotifyJobs::ErrorMessage).text().null()) .col(ColumnDef::new(NotifyJobs::StartedAt).timestamp_with_time_zone().null()) .col(ColumnDef::new(NotifyJobs::FinishedAt).timestamp_with_time_zone().null()) .col( ColumnDef::new(NotifyJobs::CreatedAt) .timestamp_with_time_zone() .not_null(), ) .col( ColumnDef::new(NotifyJobs::UpdatedAt) .timestamp_with_time_zone() .not_null(), ) .to_owned(), ) .await?; manager .create_index( Index::create() .table(NotifyJobs::Table) .name("notify_jobs_owner_status_idx") .col(NotifyJobs::OwnerId) .col(NotifyJobs::Status) .to_owned(), ) .await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .drop_table(Table::drop().table(NotifyJobs::Table).to_owned()) .await } } #[derive(DeriveIden)] enum NotifyJobs { Table, Id, OwnerId, TargetUrl, Payload, Status, ErrorMessage, StartedAt, FinishedAt, CreatedAt, UpdatedAt, }
Enregistrez la migration dans crates/gitrust-core/src/migrations/mod.rs en ajoutant m20260501_000023_create_notify_jobs::Migration à la liste du Migrator.
Test rouge à faire passer :
#[tokio::test] async fn migration_creates_notify_jobs_table() { let db = setup_test_db().await; // Si la migration s'est appliquée, la requête ne renvoie pas d'erreur. let count: i64 = db .query_one(Statement::from_string( DatabaseBackend::Postgres, "SELECT COUNT(*) FROM notify_jobs".to_owned(), )) .await .expect("La table notify_jobs doit exister") .unwrap() .try_get("", "count") .unwrap(); assert_eq!(count, 0); }
Checkpoint : cargo test --package gitrust-core migration_creates_notify_jobs_table passe au vert.
Étape 3 : Définir les DTOs et l'enum de statut
Créez crates/gitrust-core/src/dto/notify_dto.rs :
use serde::{Deserialize, Serialize}; /// Statut d'un job de notification — state machine à 5 états. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum NotifyStatus { Pending, Running, Success, Failed, Cancelled, } impl NotifyStatus { pub fn as_str(&self) -> &'static str { match self { Self::Pending => "pending", Self::Running => "running", Self::Success => "success", Self::Failed => "failed", Self::Cancelled => "cancelled", } } pub fn from_str(s: &str) -> Option<Self> { match s { "pending" => Some(Self::Pending), "running" => Some(Self::Running), "success" => Some(Self::Success), "failed" => Some(Self::Failed), "cancelled" => Some(Self::Cancelled), _ => None, } } } /// Payload envoyé via SSE au client. #[derive(Debug, Serialize)] pub struct NotifyJobSse { pub status: String, pub error_message: Option<String>, } /// Input pour créer un job. #[derive(Debug, Deserialize)] pub struct CreateNotifyJobInput { pub target_url: String, pub payload: String, }
Tests round-trip obligatoires :
#[cfg(test)] mod tests { use super::*; #[test] fn status_round_trip() { for (s, expected) in [ ("pending", NotifyStatus::Pending), ("running", NotifyStatus::Running), ("success", NotifyStatus::Success), ("failed", NotifyStatus::Failed), ("cancelled", NotifyStatus::Cancelled), ] { let status = NotifyStatus::from_str(s).expect("doit parser"); assert_eq!(status, expected); assert_eq!(status.as_str(), s); } } #[test] fn unknown_status_returns_none() { assert!(NotifyStatus::from_str("unknown").is_none()); } }
Checkpoint : cargo test --package gitrust-core status_round_trip passe.
Étape 4 : Implémenter le service
Créez crates/gitrust-core/src/services/notify_service.rs. Le service ne fait que orchestrer la DB — aucune logique métier IO ici :
use chrono::Utc; use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, EntityTrait}; use uuid::Uuid; use crate::{ dto::notify_dto::{CreateNotifyJobInput, NotifyStatus}, error::GitrustError, models::notify_job, }; pub struct NotifyService; impl NotifyService { /// Crée un job en statut `pending` et retourne son ID. pub async fn create_job( db: &DatabaseConnection, owner_id: Uuid, input: CreateNotifyJobInput, ) -> Result<Uuid, GitrustError> { let id = Uuid::new_v4(); let now = Utc::now(); let model = notify_job::ActiveModel { id: Set(id), owner_id: Set(owner_id), target_url: Set(input.target_url), payload: Set(input.payload), status: Set(NotifyStatus::Pending.as_str().to_owned()), error_message: Set(None), started_at: Set(None), finished_at: Set(None), created_at: Set(now), updated_at: Set(now), }; model.insert(db).await.map_err(GitrustError::Database)?; Ok(id) } /// Passe le job en `running`. pub async fn mark_running(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> { Self::update_status(db, id, NotifyStatus::Running, None).await } /// Passe le job en `success`. pub async fn mark_success(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> { Self::update_status(db, id, NotifyStatus::Success, None).await } /// Passe le job en `failed` avec un message d'erreur. pub async fn mark_failed( db: &DatabaseConnection, id: Uuid, error: &str, ) -> Result<(), GitrustError> { Self::update_status(db, id, NotifyStatus::Failed, Some(error)).await } /// Passe le job en `cancelled`. pub async fn mark_cancelled(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> { Self::update_status(db, id, NotifyStatus::Cancelled, None).await } /// Vérifie si le job a été annulé (pour le poll du worker). pub async fn is_cancelled( db: &DatabaseConnection, id: Uuid, ) -> Result<bool, GitrustError> { let job = notify_job::Entity::find_by_id(id) .one(db) .await .map_err(GitrustError::Database)? .ok_or_else(|| GitrustError::NotFound("job introuvable".into()))?; Ok(job.status == NotifyStatus::Cancelled.as_str()) } async fn update_status( db: &DatabaseConnection, id: Uuid, status: NotifyStatus, error: Option<&str>, ) -> Result<(), GitrustError> { use sea_orm::ActiveModelTrait; let now = Utc::now(); let mut model: notify_job::ActiveModel = notify_job::Entity::find_by_id(id) .one(db) .await .map_err(GitrustError::Database)? .ok_or_else(|| GitrustError::NotFound("job introuvable".into()))? .into(); model.status = Set(status.as_str().to_owned()); model.updated_at = Set(now); model.error_message = Set(error.map(str::to_owned)); if status == NotifyStatus::Running { model.started_at = Set(Some(now)); } if matches!(status, NotifyStatus::Success | NotifyStatus::Failed | NotifyStatus::Cancelled) { model.finished_at = Set(Some(now)); } model.update(db).await.map_err(GitrustError::Database)?; Ok(()) } }
Étape 5 : Implémenter le worker
Le worker tourne dans un tokio::spawn séparé. Il lit les tâches depuis un canal mpsc et les exécute avec un sémaphore de concurrence :
// crates/gitrust-core/src/services/notify_worker.rs use std::sync::Arc; use sea_orm::DatabaseConnection; use tokio::sync::{mpsc, Semaphore}; use uuid::Uuid; use super::notify_service::NotifyService; /// Tâche envoyée via le canal mpsc au worker. pub struct NotifyTask { pub job_id: Uuid, pub target_url: String, pub payload: String, } pub struct NotifyWorkerConfig { /// Nombre maximal de jobs concurrents. pub max_concurrent: usize, /// Timeout en secondes par job. pub timeout_secs: u64, } /// Boucle principale du worker. À appeler avec `tokio::spawn`. pub async fn run( db: DatabaseConnection, mut rx: mpsc::Receiver<NotifyTask>, config: NotifyWorkerConfig, ) { let semaphore = Arc::new(Semaphore::new(config.max_concurrent)); while let Some(task) = rx.recv().await { let permit = semaphore.clone().acquire_owned().await.expect("semaphore fermé"); let db = db.clone(); let timeout = config.timeout_secs; tokio::spawn(async move { let _permit = permit; // libéré à la fin du bloc execute_task(&db, task, timeout).await; }); } } async fn execute_task(db: &DatabaseConnection, task: NotifyTask, timeout_secs: u64) { // Vérifier si annulé avant de démarrer match NotifyService::is_cancelled(db, task.job_id).await { Ok(true) => return, Err(e) => { tracing::error!(job_id = %task.job_id, error = %e, "Erreur lecture job"); return; } Ok(false) => {} } if let Err(e) = NotifyService::mark_running(db, task.job_id).await { tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer running"); return; } let result = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), send_notification(&task.target_url, &task.payload), ) .await; match result { Ok(Ok(())) => { if let Err(e) = NotifyService::mark_success(db, task.job_id).await { tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer success"); } } Ok(Err(e)) => { let msg = format!("Erreur HTTP: {e}"); if let Err(db_err) = NotifyService::mark_failed(db, task.job_id, &msg).await { tracing::error!(job_id = %task.job_id, error = %db_err, "Impossible de marquer failed"); } } Err(_elapsed) => { let msg = format!("Timeout après {timeout_secs}s"); if let Err(e) = NotifyService::mark_failed(db, task.job_id, &msg).await { tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer failed (timeout)"); } } } } async fn send_notification(url: &str, payload: &str) -> Result<(), reqwest::Error> { let client = reqwest::Client::new(); client .post(url) .header("Content-Type", "application/json") .body(payload.to_owned()) .send() .await? .error_for_status()?; Ok(()) }
Checkpoint : cargo build --package gitrust-core compile sans erreur.
Étape 6 : Ajouter le handler et la route SSE
Dans crates/gitrust-web/src/handlers/notify.rs, créez le handler POST qui enqueues le job et redirige, ainsi que le handler SSE :
use axum::{ extract::{Path, State}, response::{sse::{Event, KeepAlive, Sse}, Redirect}, Extension, Form, }; use sea_orm::DatabaseConnection; use tokio::time::{interval, Duration}; use tokio_stream::wrappers::IntervalStream; use tokio_stream::StreamExt; use uuid::Uuid; use gitrust_core::{ dto::notify_dto::CreateNotifyJobInput, services::{notify_service::NotifyService, notify_worker::NotifyTask}, }; use crate::auth::AuthUser; pub type NotifySender = tokio::sync::mpsc::Sender<NotifyTask>; /// POST /notify — crée un job et redirige vers la page de suivi. pub async fn create_notify_job( State(db): State<DatabaseConnection>, Extension(tx): Extension<NotifySender>, user: AuthUser, Form(input): Form<CreateNotifyJobInput>, ) -> Result<Redirect, crate::error::AppError> { let job_id = NotifyService::create_job(&db, user.user_id, input.clone()).await?; // Essai d'envoi dans le canal — si plein, le job reste `pending` jusqu'au // prochain redémarrage (les jobs `pending` peuvent être re-envoyés au démarrage). let _ = tx.try_send(NotifyTask { job_id, target_url: input.target_url, payload: input.payload, }); Ok(Redirect::to(&format!("/notify/{job_id}"))) } /// GET /notify/:id/stream — SSE : pousse les mises à jour de statut au client. pub async fn notify_job_stream( State(db): State<DatabaseConnection>, Path(id): Path<Uuid>, ) -> Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>> { let stream = IntervalStream::new(interval(Duration::from_secs(1))) .map(move |_| { let db = db.clone(); async move { // Poll la DB pour l'état courant use gitrust_core::models::notify_job; use sea_orm::EntityTrait; let job = notify_job::Entity::find_by_id(id) .one(&db) .await .ok() .flatten(); let data = match job { Some(j) => format!( r#"{{"status":"{}","error":{}}}"#, j.status, j.error_message .as_deref() .map(|e| format!(r#""{}""#, e.replace('"', "\\\""))) .unwrap_or("null".to_owned()) ), None => r#"{"status":"not_found"}"#.to_owned(), }; Ok(Event::default().data(data)) } }) .then(|fut| fut); Sse::new(stream).keep_alive(KeepAlive::default()) }
Enregistrez les routes dans routes.rs :
.route("/notify", post(handlers::notify::create_notify_job)) .route("/notify/:id", get(handlers::notify::notify_job_page)) .route("/notify/:id/stream", get(handlers::notify::notify_job_stream)) .route("/notify/:id/cancel", post(handlers::notify::cancel_notify_job))
Étape 7 : Intégrer le worker dans main.rs
Dans crates/gitrust-web/src/main.rs, initialisez le canal et démarrez le worker :
// Dans la fonction main(), après la construction de l'app : let (notify_tx, notify_rx) = tokio::sync::mpsc::channel::<NotifyTask>(100); let notify_db = db.clone(); tokio::spawn(gitrust_core::services::notify_worker::run( notify_db, notify_rx, NotifyWorkerConfig { max_concurrent: std::env::var("NOTIFY_MAX_CONCURRENT") .unwrap_or("4".into()) .parse() .unwrap_or(4), timeout_secs: std::env::var("NOTIFY_TIMEOUT_SECS") .unwrap_or("30".into()) .parse() .unwrap_or(30), }, )); // Injecter l'émetteur dans le router via Extension let app = routes::routes(&static_path) .layer(Extension(notify_tx)) .with_state(db);
Étape 8 : Tests d'intégration
Écrivez au minimum ces trois tests dans crates/gitrust-core/src/services/notify_service_test.rs :
#[tokio::test] async fn create_job_inserts_with_pending_status() { let db = setup_test_db().await; let id = NotifyService::create_job( &db, Uuid::new_v4(), CreateNotifyJobInput { target_url: "https://example.com/hook".into(), payload: r#"{"event":"test"}"#.into(), }, ) .await .expect("create_job doit réussir"); let job = notify_job::Entity::find_by_id(id) .one(&db) .await .unwrap() .unwrap(); assert_eq!(job.status, "pending"); assert!(job.started_at.is_none()); } #[tokio::test] async fn mark_running_sets_started_at() { let db = setup_test_db().await; let id = create_test_job(&db).await; NotifyService::mark_running(&db, id).await.unwrap(); let job = notify_job::Entity::find_by_id(id).one(&db).await.unwrap().unwrap(); assert_eq!(job.status, "running"); assert!(job.started_at.is_some()); } #[tokio::test] async fn cancel_before_running_is_terminal() { let db = setup_test_db().await; let id = create_test_job(&db).await; NotifyService::mark_cancelled(&db, id).await.unwrap(); let is_cancelled = NotifyService::is_cancelled(&db, id).await.unwrap(); assert!(is_cancelled); }
Checkpoint : cargo test --package gitrust-core notify — tous les tests passent.
Récapitulatif
- O1 accompli en concevant la table
notify_jobsavec 5 états (pending/running/success/failed/cancelled) et la migration SeaORM correspondante. - O2 accompli en implémentant
notify_worker::runavecSemaphore,tokio::time::timeout, et lecture du flag d'annulation via la DB. - O3 accompli en écrivant les tests
create_job_inserts_with_pending_status,mark_running_sets_started_atetcancel_before_running_is_terminal, tous hermétiques (pas de réseau réel).
Et si ça ne marche pas
| Symptôme | Cause probable | Correction |
|---|---|---|
channel closed au démarrage | Le Receiver est droppé avant le spawn du worker | Vérifiez que tokio::spawn(run(..., notify_rx, ...)) est appelé avant que notify_rx ne soit droppé |
Le job reste pending indéfiniment | Le canal est plein ou le worker n'est pas démarré | Ajoutez un log dans execute_task ; vérifiez que tokio::spawn est bien appelé dans main |
| SSE ne reçoit aucun événement | Le Content-Type: text/event-stream n'est pas reconnu | Vérifiez que le handler retourne bien Sse<...> et non String |
Prochaine étape
Vous êtes désormais core contributor. Consultez les how-to pour les recettes quotidiennes :
GitRust