Expand description
mahler is an automated job orchestration library that builds and executes dynamic workflows.
The library uses automated planning (heavily based on HTNs) to compose user defined jobs into a workflow (represented as a DAG) to achieve a desired target system state.
This library’s API is heavily inspired by Axum’s.
§Features
- Simple API - system state is defined as Rust structs with the help of the provided derive crate. Allowed tasks are defined as pure Rust functions acting on a part or the whole system state.
- State engine with integrated planner - tasks are configured as jobs in a
Workerdomain. On a new target state, the worker will look for necessary changes to reach the target and look for a workflow that allows to reach the target from the current state. - Concurrent execution - the internal planner detects when tasks can run concurrently based on state paths
- Automatic re-planning - re-computes workflow when runtime conditions change
- Observable runtime - monitor the evolving state of the system from the Worker API. For more detailed logging, the library uses the tracing crate.
- Easy to debug - worker observable state and known goals allow easy replicability when issues occur.
§Worker
A Worker orchestrates jobs into a workflow and executes the workflow tasks when given a target. The worker also manages the system state model, making changes as the workflow is executed.
When creating a Worker, the first step is to assign jobs to specific routes and operations within the system state. Once the jobs are set-up, providing the worker with an initial state makes the worker ready to operate on the system.
The only way to control the worker and effect changes into the controlled system is to provide the worker with a target state.
use mahler::state::State;
use mahler::worker::Worker;
use mahler::job::{create, update};
#[derive(State)]
struct MySystem;
let mut worker = Worker::new()
// assign possible jobs to worker
.job("", update(global))
.job("/{foo}", update(foo))
.job("/{foo}/{bar}", create(foo_bar))
// initialize the worker state
.initial_state(MySystem {/* .. */})
.unwrap();
fn global() {}
fn foo() {}
fn foo_bar() {}
// Control the system by providing a new target state
worker.seek_target(MySystemTarget { /* .. */ }).await.unwrap();When comparing the internal state with the target, the Worker’s planner will generate a list of differences between the states (see JSON Patch) and try to find a plan within the jobs that are applicable to a path.
For instance, in the worker defined above, if the target finds a new value is created in the path /a/b
it will try jobs from more general to more specific, meaning it will try
- global
- foo
- foo_bar
§Operations
Jobs may be applicable to operations create (add), update (replace), delete (remove), any and none,
meaning they may be selected when a new property is created/updated or removed from the system
state. A task assigned to none is never selected by the planner, but may be used as part of
compound tasks. All potentially runnable jobs need to be linked to the worker, hence the
need for none jobs.
See Operation for more information.
§State
The library relies internally on JSON values for state representation. Parts of the state can be referenced using JSON pointers (see RFC 6901) and state differences are calculated using JSON patch (see RFC 6902).
For this reason, the system state can only be modelled using serializable data structures.
The worker and planner make distinction between the internal state of the system, and what
can be used as target state. For instance, the start date of a process is good for reference
information, but it doesn’t make a good target. When modelling state, internal state can be
annotated using #[mahler(internal)] which means these properties will not be used in the
comparison of with the target state.
use std::time::SystemTime;
use mahler::state::{State, List};
// the `State` macro implements `Serialize` and `Deserialize` for
// the struct and creates an associated type `ServiceTarget` without
// any internal properties
#[derive(State)]
struct Service {
// will be used when planning
cmd: List<String>,
// will not be used for planning
#[mahler(internal)]
start_time: SystemTime,
}For accessing read-only, non-serializable resources from tasks, Mahler provides a separate mechanism.
§Shared resources
Sometimes it may be desirable for jobs to access a shared resource (e.g. database connection,
file descriptor, etc.). These can be provided when creating the worker, with the only
restriction is that these structures must be Send and Sync.
use mahler::state::State;
use mahler::extract::Res;
use mahler::job::update;
use mahler::worker::Worker;
// the system state
#[derive(State)]
struct MySystem;
// MyConnection represents a shared resource
struct MyConnection;
// Tasks can make use of resources via the `Res` extractor
fn some_task(conn: Res<MyConnection>) {}
let conn = MyConnection {/* .. */};
let worker = Worker::new()
.resource::<MyConnection>(conn)
.job("/", update(some_task))
.initial_state(MySystem {/* .. */})
.unwrap();Note that only one resource of each type can be provided to the worker.
§Tasks and Jobs
A Task in Mahler is an operation on a part of the system state that may chose to make changes to the state given some target. It is defined as a pure Rust handler and it may or may not perform IO. A Job is the configuration of a task to an operation on the system state.
A task is defined via a Handler and is applied to a specific Context, which is composed of an application path (a JSON pointer to a part of the system state), an optional target and zero or more path arguments.
A handler in mahler is any function that accepts zero or more “extractors” as arguments and returns something that can be converted into an effect on the system.
§Extractors
An extractor is a type that implements FromSystem. Extractors are how the planning/execution context is passed to the handler.
use mahler::state::State;
use mahler::extract::{View, Args, Target, System, Res};
struct MyConnection;
#[derive(State)]
struct MySystem;
// `View` provides a view into the relevant part of the
// state for the handler and allows making changes to the state.
fn view(state: View<u32>) {}
// For nullable values, use `View<Option<T>>`
// for instance, in the case of `create` operations
fn nullable_view(state: View<Option<u32>>) {}
// `Args` gives you the path arguments and deserializes them
fn args(Args(counter_name): Args<String>) {}
// `Target` gives you the target value for the Job operation
// note that `delete` operations do not have a target.
fn target(Target(tgt): Target<u32>) {}
// `System` provides a view into the top level system state.
// A Job using the System extractor cannot run concurrently to other jobs
fn system(System(state): System<MySystem>) {}
// `Res` allows to access a shared resource
fn res(res: Res<MyConnection>) {}For extractors using generics, using a type that cannot be deserialized from the internal worker state will result in an Error. This means the task won’t be usable by the planner, resulting in a warning (or a failure in debug builds).
§Modifying the system state
The View extractor provides a mechanism to modify the system state by returning the modified view.
use mahler::extract::{View, Target};
// create a task to update a counter
fn plus_one(mut counter: View<u32>, Target(tgt): Target<u32>) -> View<u32> {
// `View` implements Deref and DerefMut to
// operate on the internal value
if *counter < tgt {
// update the counter if below the target
*counter += 1;
}
// if the counter has not changed, then the task won't
// be selected by the planner
counter
}
// remove the counter
fn delete_counter(mut view: View<Option<u32>>) -> View<Option<u32>> {
view.take();
view
}Internally, the cumulative changes to the View extractors are converted by the planner to a
Patch and used to determine
the applicability of the task to a given target (if no changes are performed by the task at planning,
then the task is not applicable). At runtime, the same patch is used first to
determine if the task is safe to apply, and later to update the internal worker state.
§System Effects (I/O)
In mahler, the task handler needs to be executed in two different contexts:
- At planning, the context for the job is determined (current state, path, target) and the corresponding task is tested to simulate the changes it introduces without actually modifying the underlying system. The same job may be tested multiple times while planning.
- At runtime, the tasks composing the workflow are executed in the corresponding order determined by the planner and changes to the underlying system are performed.
This 2-in-1 function evaluation is enabled by the introduction of the IO
type. An IO combines both a pure operation on an input and an effectful or IO
operation.
use mahler::task::{with_io, IO};
use mahler::extract::View;
use tokio::time::{sleep, Duration};
fn plus_one(mut view: View<i32>) -> IO<i32> {
// Pure modification
*view += 1;
// Combine with IO operation
with_io(view, |view| async move {
// system changes should only be performed
// within the IO part
sleep(Duration::from_millis(10)).await;
// return the view
Ok(view)
})
}
This type is what can be used in mahler jobs to isolate effects on the underlying system.
use tokio::time::{sleep, Duration};
use mahler::extract::{View, Target};
use mahler::task::{with_io, IO};
// create a Job to update a counter
fn plus_one(mut counter: View<u32>, Target(tgt): Target<u32>) -> IO<u32> {
if *counter < tgt {
// update the counter if below the target
*counter += 1;
}
// return an IO type to isolate system changes
with_io(counter, |counter| async move {
// the IO portion will only ever be called if the job is
// selected by the planner
// perform IO here
sleep(Duration::from_millis(10)).await;
// with_io expects a Result output
Ok(counter)
})
}use tokio::time::{sleep, Duration};
use tokio::runtime::Runtime;
use mahler::extract::{View, Target};
fn plus_one(mut counter: View<u32>, Target(tgt): Target<u32>) -> View<u32> {
if *counter < tgt {
// update the counter if below the target
*counter += 1;
}
Runtime::new().unwrap().block_on(async {
// This is a footgun as it adds 100ms every time
// the task is tested by the planner
sleep(Duration::from_millis(100)).await;
});
counter
}§Compound tasks
Sometimes it may be desirable to re-use tasks in different contexts, or combine multiple tasks in order to guide the planner. This can be achieved by the use of compound tasks, called Methods in the Mahler API.
A Method handler is any function that receives zero or more extractors and
returns something that can be converted to a Vec of tasks.
use mahler::extract::{View, Target};
use mahler::task::{Task, Handler};
fn plus_one() {}
// define a method
fn plus_two(counter: View<i32>, Target(tgt): Target<i32>) -> Vec<Task> {
if tgt - *counter > 1 {
// Return two instances of the `plus_one` task
return vec![
// Provide a target for the task.
// `with_target` assigns a target to the task
plus_one.with_target(tgt),
plus_one.with_target(tgt)
];
}
// returning nothing means the method will not be picked
// by the planner
vec![]
}
// methods can also call other methods
fn plus_three(counter: View<i32>, Target(tgt): Target<i32>) -> Vec<Task> {
if tgt - *counter > 2 {
return vec![plus_two.with_target(tgt), plus_one.with_target(tgt)];
}
vec![]
}A task Handler may be converted to a Task by using the into_task method or using one of the helper methods with_target or with_arg.
§Error handling
All possible errors by Mahler operations are defined by the ErrorKind type.
When calling Worker::seek_target, the worker will return an
error if there is a problem with serialization or if an error happens at planning or there is
an internal error. If an error happens at workflow execution, the method will not return an
error but terminate with an Aborted value, which will include
a Vec of all I/O errors that happen during the workflow execution with the
ErrorKind::Runtime type.
§Monitoring system state
The Worker provides a follow method, returning a stream of state updates. A new value will be produced on the stream every time the internal worker state changes.
§Observability
For detailed Worker observability, mahler is instrumented with the tracing crate to report on the operation and progress of the Worker planning and workflow execution stages. These events can be processed using the tracing_subscriber crate to produce structured or human readable logs.
Key log levels:
- INFO: Workflow events, task execution
- DEBUG: Detailed planning information, state changes
- WARN: Task failures, interruptions
- ERROR: Fatal errors
- TRACE: Planner and internal worker operation
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
// Initialize tracing subscriber with recommended formatting
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(
fmt::layer()
.with_span_events(fmt::format::FmtSpan::CLOSE)
.event_format(fmt::format().compact().with_target(false)),
)
.init();§Testing
When debug_assertions is enabled, mahler exposes two testing methods for Worker.
- find_workflow allows to generate a Workflow for a given initial and target state. The workflow can be compared with a manually created DAG to test against an expected plan.
- run_task allows to run a task in the context of a worker. This may be helpful to diagnose any extraction/expansion errors with the task definition or for debugging of a specific task.
It also exposes the following workflow types and macros
- Dag an DAG implementation used internally by mahler.
- dag a declarative macro to combine DAGs into branches
- seq a declarative macro to create a linear DAG from a list of values
- par a declarative macro to create a branching DAG with single value branches
These utils can be used for testing and comparing generated workflows with specific DAGs. See find_workflow for more info.
Modules§
- dag
Debug-assertions enabled - Directed Acyclic Graph implementation and methods
- error
- Error handling types
- exception
- Types for defining planning exceptions
- extract
- Types and traits for extracting task runtime context
- job
- Types for creating and manipulating Worker Jobs
- json
- JSON referencing and manipulation
- result
- Alias over
core::result::Result - serde
- Re-export of
serde_corefor use by mahler-derive - state
- State trait and standardized serialization along with State procedural macro
- sync
- State synchronization and runtime control
- task
- Types and traits for declaring and operating with Tasks
- worker
- Automated planning and execution of task workflows
Macros§
- enforce
- Enforces a condition and returns early with an aborted IO if the condition is false.