diff --git a/.sqlx/query-7b27319ca510748399f45dc950569b01f4fd7a74b7f2edbef4982c3d4d9a72d8.json b/.sqlx/query-565c545ebc779a7df338f4efa0813f3672bd5e1d5639fc2b06686bdbca16aa2d.json similarity index 61% rename from .sqlx/query-7b27319ca510748399f45dc950569b01f4fd7a74b7f2edbef4982c3d4d9a72d8.json rename to .sqlx/query-565c545ebc779a7df338f4efa0813f3672bd5e1d5639fc2b06686bdbca16aa2d.json index 0392f94..7d42851 100644 --- a/.sqlx/query-7b27319ca510748399f45dc950569b01f4fd7a74b7f2edbef4982c3d4d9a72d8.json +++ b/.sqlx/query-565c545ebc779a7df338f4efa0813f3672bd5e1d5639fc2b06686bdbca16aa2d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE pg_task\n SET is_running = false,\n error = $2,\n wakeup_at = now()\n WHERE id = $1\n RETURNING tried, step::TEXT as \"step!\"\n ", + "query": "\n UPDATE pg_task\n SET is_running = false,\n tried = tried + 1,\n error = $2,\n wakeup_at = now()\n WHERE id = $1\n RETURNING tried, step::TEXT as \"step!\"\n ", "describe": { "columns": [ { @@ -25,5 +25,5 @@ false ] }, - "hash": "7b27319ca510748399f45dc950569b01f4fd7a74b7f2edbef4982c3d4d9a72d8" + "hash": "565c545ebc779a7df338f4efa0813f3672bd5e1d5639fc2b06686bdbca16aa2d" } diff --git a/README.md b/README.md index 190208a..1768123 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,208 @@ # pg_task -Resumable state machine based Postgres tasks +FSM-based Resumable Postgres tasks + +- **FSM-based** - each task is a granular state machine +- **Resumable** - on error, after you fix the step logic or the external + world, the task is able to pick up where it stopped +- **Postgres** - a single table is enough to handle task scheduling, state + transitions, and error processing + +## Table of Contents + +- [Tutorial](#tutorial) + - [Defining Tasks](#defining-tasks) + - [Investigating Errors](#investigating-errors) + - [Fixing the World](#fixing-the-world) +- [Scheduling Tasks](#scheduling-tasks) +- [Running Workers](#running-workers) +- [Stopping Workers](#stopping-workers) +- [Delaying Steps](#delaying-steps) +- [Retrying Steps](#retrying-steps) + +## Tutorial + +_The full runnable code is in [examples/tutorial.rs][tutorial-example]._ + +### Defining Tasks + +We create a greeter task consisting of two steps: + +```rust,ignore +#[derive(Debug, Deserialize, Serialize)] +pub struct ReadName { + filename: String, +} + +#[async_trait] +impl Step for ReadName { + const RETRY_LIMIT: i32 = 5; + + async fn step(self, _db: &PgPool) -> StepResult { + let name = std::fs::read_to_string(&self.filename)?; + NextStep::now(SayHello { name }) + } +} +``` + +The first step tries to read a name from a file: + +- `filename` - the only state we need in this step +- `impl Step for ReadName` - our step is a part of a `Greeter` task +- `RETRY_LIMIT` - the step is fallible, let's retry it a few times +- `NextStep::now(SayHello { name })` - move our task to the `SayHello` step + right now + +```rust,ignore +#[derive(Debug, Deserialize, Serialize)] +pub struct SayHello { + name: String, +} +#[async_trait] +impl Step for SayHello { + async fn step(self, _db: &PgPool) -> StepResult { + println!("Hello, {}", self.name); + NextStep::none() + } +} +``` + +The second step prints the greeting and finishes the task returning +`NextStep::none()`. + +That's essentially all, except for some boilerplate you can find in the +[full code][tutorial-example]. Let's run it: + +```bash +cargo run --example hello +``` + +### Investigating Errors + +You'll see log messages about the 6 (first try + `RETRY_LIMIT`) attempts and +the final error message. Let's look into the DB to find out what happened: + +```bash +~$ psql pg_task -c 'table pg_task' +-[ RECORD 1 ]------------------------------------------------ +id | cddf7de1-1194-4bee-90c6-af73d9206ce2 +step | {"Greeter":{"ReadName":{"filename":"name.txt"}}} +wakeup_at | 2024-06-30 09:32:27.703599+06 +tried | 6 +is_running | f +error | No such file or directory (os error 2) +created_at | 2024-06-30 09:32:22.628563+06 +updated_at | 2024-06-30 09:32:27.703599+06 +``` + +- a non-null `error` field indicates that the task has errored and contains + the error message +- the `step` field provides you with the information about a particular step + and its state when the error occurred + +### Fixing the World + +In this case, the error is due to the external world state. Let's fix it by +creating the file: + +```bash +echo 'Fixed World' > name.txt +``` + +To rerun the task, we just need to clear its `error`: + +```bash +psql pg_task -c 'update pg_task set error = null' +``` + +You'll see the log messages about rerunning the task and the greeting +message of the final step. That's all 🎉. + +## Scheduling Tasks + +Essentially scheduling a task is done by inserting a corresponding row into +the `pg_task` table. You can do in by hands from `psql` or code in any +language. + +There's also a few helpers to take care of the first step serialization and +time scheduling: +- [`enqueue`] - to run the task immediately +- [`delay`] - to run it with a delay +- [`schedule`] - to schedule it to a particular time + +## Running Workers + +After [defining](#defining-tasks) the steps of each task, we need to +wrap them into enums representing whole tasks via [`task!`]: + +```rust,ignore +pg_task::task!(Task1 { StepA, StepB }); +pg_task::task!(Task2 { StepC }); +``` + +One more enum is needed to combine all the possible tasks: + +```rust,ignore +pg_task::scheduler!(Tasks { Task1, Task2 }); +``` + +Now we can run the worker: + +```rust,ignore +pg_task::Worker::::new(db).run().await?; +``` + +All the communication is synchronized by the DB, so it doesn't matter how or +how many workers you run. It could be a separate process as well as +in-process `tokio::spawn`. + +## Stopping Workers + +You can gracefully stop task runners by sending a notification using the +DB: + +```sql +SELECT pg_notify('pg_task_changed', 'stop_worker'); +``` + +The workers would wait until the current step of all the tasks is finished +and then exit. You can wait for this by checking for the existence of +running tasks: + +```sql +SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true); +``` + +## Delaying Steps + +Sometimes you need to delay the next step. Using [`tokio::time::sleep`] +before returning the next step creates a couple of issues: + +- if the process is crashed while sleeping it wont be considered done and + will rerun on restart +- you'd have to wait for the sleeping task to finish on [gracefulshutdown](#stopping-workers) + +Use [`NextStep::delay`] instead - it schedules the next step with the delay +and finishes the current one right away. + +## Retrying Steps + +Use [`Step::RETRY_LIMIT`] and [`Step::RETRY_DELAY`] when you need to retry a +task on errors: + +```rust,ignore +impl Step for ApiRequest { + const RETRY_LIMIT: i32 = 5; + const RETRY_DELAY: Duration = Duration::from_secs(5); + + async fn step(self, _db: &PgPool) -> StepResult { + let result = api_request().await?; + NextStep::now(ProcessResult { result }) + } +} +``` +[tutorial-example]: https://github.com/imbolc/pg_task/blob/main/examples/tutorial.rs diff --git a/examples/tutorial.rs b/examples/tutorial.rs new file mode 100644 index 0000000..76df2eb --- /dev/null +++ b/examples/tutorial.rs @@ -0,0 +1,60 @@ +use async_trait::async_trait; +use pg_task::{NextStep, Step, StepResult}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; + +mod util; + +// Creates a enum `Greeter` containing our task steps +pg_task::task!(Greeter { ReadName, SayHello }); + +// Creates a enum `Tasks` representing all the possible tasks +pg_task::scheduler!(Tasks { Greeter }); + +#[derive(Debug, Deserialize, Serialize)] +pub struct ReadName { + filename: String, +} +#[async_trait] +impl Step for ReadName { + const RETRY_LIMIT: i32 = 5; + + async fn step(self, _db: &PgPool) -> StepResult { + let name = std::fs::read_to_string(self.filename)?; + NextStep::now(SayHello { name }) + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct SayHello { + name: String, +} +#[async_trait] +impl Step for SayHello { + async fn step(self, _db: &PgPool) -> StepResult { + println!("Hello, {}", self.name); + NextStep::none() + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let db = util::init().await?; + + // Let's schedule the task + pg_task::enqueue( + &db, + &Tasks::Greeter( + ReadName { + filename: "name.txt".into(), + } + .into(), + ), + ) + .await?; + + // And run a worker + pg_task::Worker::::new(db).run().await?; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 916df3e..3259baa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,210 @@ //! # pg_task //! -//! Resumable state machine based Postgres tasks +//! FSM-based Resumable Postgres tasks +//! +//! - **FSM-based** - each task is a granular state machine +//! - **Resumable** - on error, after you fix the step logic or the external +//! world, the task is able to pick up where it stopped +//! - **Postgres** - a single table is enough to handle task scheduling, state +//! transitions, and error processing +//! +//! ## Table of Contents +//! +//! - [Tutorial](#tutorial) +//! - [Defining Tasks](#defining-tasks) +//! - [Investigating Errors](#investigating-errors) +//! - [Fixing the World](#fixing-the-world) +//! - [Scheduling Tasks](#scheduling-tasks) +//! - [Running Workers](#running-workers) +//! - [Stopping Workers](#stopping-workers) +//! - [Delaying Steps](#delaying-steps) +//! - [Retrying Steps](#retrying-steps) +//! +//! ## Tutorial +//! +//! _The full runnable code is in [examples/tutorial.rs][tutorial-example]._ +//! +//! ### Defining Tasks +//! +//! We create a greeter task consisting of two steps: +//! +//! ```rust,ignore +//! #[derive(Debug, Deserialize, Serialize)] +//! pub struct ReadName { +//! filename: String, +//! } +//! +//! #[async_trait] +//! impl Step for ReadName { +//! const RETRY_LIMIT: i32 = 5; +//! +//! async fn step(self, _db: &PgPool) -> StepResult { +//! let name = std::fs::read_to_string(&self.filename)?; +//! NextStep::now(SayHello { name }) +//! } +//! } +//! ``` +//! +//! The first step tries to read a name from a file: +//! +//! - `filename` - the only state we need in this step +//! - `impl Step for ReadName` - our step is a part of a `Greeter` task +//! - `RETRY_LIMIT` - the step is fallible, let's retry it a few times +//! - `NextStep::now(SayHello { name })` - move our task to the `SayHello` step +//! right now +//! +//! ```rust,ignore +//! #[derive(Debug, Deserialize, Serialize)] +//! pub struct SayHello { +//! name: String, +//! } +//! #[async_trait] +//! impl Step for SayHello { +//! async fn step(self, _db: &PgPool) -> StepResult { +//! println!("Hello, {}", self.name); +//! NextStep::none() +//! } +//! } +//! ``` +//! +//! The second step prints the greeting and finishes the task returning +//! `NextStep::none()`. +//! +//! That's essentially all, except for some boilerplate you can find in the +//! [full code][tutorial-example]. Let's run it: +//! +//! ```bash +//! cargo run --example hello +//! ``` +//! +//! ### Investigating Errors +//! +//! You'll see log messages about the 6 (first try + `RETRY_LIMIT`) attempts and +//! the final error message. Let's look into the DB to find out what happened: +//! +//! ```bash +//! ~$ psql pg_task -c 'table pg_task' +//! -[ RECORD 1 ]------------------------------------------------ +//! id | cddf7de1-1194-4bee-90c6-af73d9206ce2 +//! step | {"Greeter":{"ReadName":{"filename":"name.txt"}}} +//! wakeup_at | 2024-06-30 09:32:27.703599+06 +//! tried | 6 +//! is_running | f +//! error | No such file or directory (os error 2) +//! created_at | 2024-06-30 09:32:22.628563+06 +//! updated_at | 2024-06-30 09:32:27.703599+06 +//! ``` +//! +//! - a non-null `error` field indicates that the task has errored and contains +//! the error message +//! - the `step` field provides you with the information about a particular step +//! and its state when the error occurred +//! +//! ### Fixing the World +//! +//! In this case, the error is due to the external world state. Let's fix it by +//! creating the file: +//! +//! ```bash +//! echo 'Fixed World' > name.txt +//! ``` +//! +//! To rerun the task, we just need to clear its `error`: +//! +//! ```bash +//! psql pg_task -c 'update pg_task set error = null' +//! ``` +//! +//! You'll see the log messages about rerunning the task and the greeting +//! message of the final step. That's all 🎉. +//! +//! ## Scheduling Tasks +//! +//! Essentially scheduling a task is done by inserting a corresponding row into +//! the `pg_task` table. You can do in by hands from `psql` or code in any +//! language. +//! +//! There's also a few helpers to take care of the first step serialization and +//! time scheduling: +//! - [`enqueue`] - to run the task immediately +//! - [`delay`] - to run it with a delay +//! - [`schedule`] - to schedule it to a particular time +//! +//! ## Running Workers +//! +//! After [defining](#defining-tasks) the steps of each task, we need to +//! wrap them into enums representing whole tasks via [`task!`]: +//! +//! ```rust,ignore +//! pg_task::task!(Task1 { StepA, StepB }); +//! pg_task::task!(Task2 { StepC }); +//! ``` +//! +//! One more enum is needed to combine all the possible tasks: +//! +//! ```rust,ignore +//! # use pg_task::NextStep; +//! pg_task::scheduler!(Tasks { Task1, Task2 }); +//! ``` +//! +//! Now we can run the worker: +//! +//! ```rust,ignore +//! pg_task::Worker::::new(db).run().await?; +//! ``` +//! +//! All the communication is synchronized by the DB, so it doesn't matter how or +//! how many workers you run. It could be a separate process as well as +//! in-process `tokio::spawn`. +//! +//! ## Stopping Workers +//! +//! You can gracefully stop task runners by sending a notification using the +//! DB: +//! +//! ```sql +//! SELECT pg_notify('pg_task_changed', 'stop_worker'); +//! ``` +//! +//! The workers would wait until the current step of all the tasks is finished +//! and then exit. You can wait for this by checking for the existence of +//! running tasks: +//! +//! ```sql +//! SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true); +//! ``` +//! +//! ## Delaying Steps +//! +//! Sometimes you need to delay the next step. Using [`tokio::time::sleep`] +//! before returning the next step creates a couple of issues: +//! +//! - if the process is crashed while sleeping it wont be considered done and +//! will rerun on restart +//! - you'd have to wait for the sleeping task to finish on [graceful +//! shutdown](#stopping-workers) +//! +//! Use [`NextStep::delay`] instead - it schedules the next step with the delay +//! and finishes the current one right away. +//! +//! ## Retrying Steps +//! +//! Use [`Step::RETRY_LIMIT`] and [`Step::RETRY_DELAY`] when you need to retry a +//! task on errors: +//! +//! ```rust,ignore +//! impl Step for ApiRequest { +//! const RETRY_LIMIT: i32 = 5; +//! const RETRY_DELAY: Duration = Duration::from_secs(5); +//! +//! async fn step(self, _db: &PgPool) -> StepResult { +//! let result = api_request().await?; +//! NextStep::now(ProcessResult { result }) +//! } +//! } +//! ``` +//! [tutorial-example]: https://github.com/imbolc/pg_task/blob/main/examples/tutorial.rs + #![warn(clippy::all, missing_docs, nonstandard_style, future_incompatible)] mod error; diff --git a/src/task.rs b/src/task.rs index da83b73..f669383 100644 --- a/src/task.rs +++ b/src/task.rs @@ -115,6 +115,7 @@ impl Task { r#" UPDATE pg_task SET is_running = false, + tried = tried + 1, error = $2, wakeup_at = now() WHERE id = $1