/pkg/machine/README.md
/pkg/machine is a nondeterministic, multi-state, clock-based, relational, optionally accepting, and non-blocking
state machine. It’s a form of a rules engine that can orchestrate blocking APIs into fully controllable async
state-machines. Write ops are state mutations, read ops are state checking,
and subscriptions are state waiting. It’s dependency-free and a building block of a larger project.
Installation
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
Features
Features are explained using Mermaid flow diagrams, and headers link to relevant sections of the manual.
Multi-state
Many states can be active at the same time.
Clock and state contexts
States have clocks that produce contexts (odd = active; even = inactive).
Queue
Queue of mutations enables lock-free Actor Model.
AOP handlers
States are Aspects with Enter, State, Exit, and End handlers.
Negotiation
Transitions are cancellable (during the negotiation phase).
Relations
States are connected via Require, Remove, and Add relations.
Subscriptions
Channel-based broadcast for waiting on clock values.
Error handling
Error is a state, handled just like any other mutation.
val, err := someOp()
if err != nil {
mach.AddErr(err, nil)
return // no err needed
}
Tracers
Synchronous tracers for internal events.
TransitionInit TransitionStart TransitionEnd HandlerStart HandlerEnd
MachineInit MachineDispose NewSubmachine QueueEnd SchemaChange VerifyStates
Usage
Raw Strings
// ProcessingFile to FileProcessed
// 1 async and 1 sync state
package main
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
func main() {
// init the state machine
mach := am.New(nil, am.Schema{
"ProcessingFile": { // async
Remove: am.S{"FileProcessed"},
},
"FileProcessed": { // async
Remove: am.S{"ProcessingFile"},
},
"InProgress": { // sync
Auto: true,
Require: am.S{"ProcessingFile"},
},
}, nil)
mach.BindHandlers(&Handlers{
Filename: "README.md",
})
// change the state
mach.Add1("ProcessingFile", nil)
// wait for completed
select {
case <-time.After(5 * time.Second):
println("timeout")
case <-mach.WhenErr(nil):
println("err:", mach.Err())
case <-mach.When1("FileProcessed", nil):
println("done")
}
}
type Handlers struct {
Filename string
}
// negotiation handler
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
// read-only ops
// decide if moving fwd is ok
// no blocking
// lock-free critical section
return true
}
// final handler
func (h *Handlers) ProcessingFileState(e *am.Event) {
// read & write ops
// no blocking
// lock-free critical section
mach := e.Machine
// clock-based expiration context
stateCtx := mach.NewStateCtx("ProcessingFile")
// unblock
go func() {
// re-check the state ctx
if stateCtx.Err() != nil {
return // expired
}
// blocking call
err := processFile(h.Filename, stateCtx)
// re-check the state ctx after a blocking call
if stateCtx.Err() != nil {
return // expired
}
if err != nil {
mach.AddErr(err, nil)
return
}
// move to the next state in the flow
mach.Add1("FileProcessed", nil)
}()
}
Waiting
Subscriptions do not allocate goroutines.
// wait until FileDownloaded becomes active
<-mach.When1("FileDownloaded", nil)
// wait until FileDownloaded becomes inactive
<-mach.WhenNot1("DownloadingFile", nil)
// wait for EventConnected to be activated with an arg ID=123
<-mach.WhenArgs("EventConnected", am.A{"ID": 123}, nil)
// wait for Foo to have a tick >= 6
<-mach.WhenTime1("Foo", 6, nil)
// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.Time{6, 10}, nil)
// wait for DownloadingFile to have a tick increased by 2 since now
<-mach.WhenTicks("DownloadingFile", 2, nil)
// wait for a mutation to execute
<-mach.WhenQueue(mach.Add1("Foo", nil), nil)
// wait for an error
<-mach.WhenErr(nil)
Schema File
// BasicStatesDef contains all the states of the Basic state machine.
type BasicStatesDef struct {
*am.StatesBase
// ErrNetwork indicates a generic network error.
ErrNetwork string
// ErrHandlerTimeout indicates one of state machine handlers has timed out.
ErrHandlerTimeout string
// Start indicates the machine should be working. Removing start can force
// stop the machine.
Start string
// Ready indicates the machine meets criteria to perform work, and requires
// Start.
Ready string
// Healthcheck is a periodic request making sure that the machine is still
// alive.
Healthcheck string
}
var BasicSchema = am.Schema{
// Errors
ssB.ErrNetwork: {Require: S{Exception}},
ssB.ErrHandlerTimeout: {Require: S{Exception}},
// Basics
ssB.Start: {},
ssB.Ready: {Require: S{ssB.Start}},
ssB.Healthcheck: {},
}
Passing Args
// Example with typed state names (ssS) and typed arguments (A).
mach.Add1(ssS.KillingWorker, Pass(&A{
ConnAddr: ":5555",
WorkerAddr: ":5556",
}))
Mutations and Relations
While mutations are the heartbeat of asyncmachine, it’s the relations which define the rules of the flow. Check out the relations playground and quiz yourself (maybe a fancier playground).
mach := newMach("DryWaterWet", am.Schema{
"Wet": {
Require: am.S{"Water"},
},
"Dry": {
Remove: am.S{"Water"},
},
"Water": {
Add: am.S{"Wet"},
Remove: am.S{"Dry"},
},
})
mach.Add1("Dry", nil)
mach.Add1("Water", nil)
// TODO quiz: is Wet active?
Demos
- Relations playground
- Interactively use the TUI debugger with data pre-generated by a secai bot:
go run github.com/pancsta/asyncmachine-go/tools/cmd/am-dbg@latest \
--import-data https://assets.asyncmachine.dev/am-dbg-exports/secai-cook.gob.br \
mach://cook
Examples
All examples and benchmarks can be found in /examples.
Dev Tools
/tools/cmd/am-dbgMulti-client TUI debugger./tools/cmd/am-genGenerates schema files and Grafana dashboards./tools/cmd/arpcNetwork-native REPL and CLI./tools/cmd/am-visGenerates D2 diagrams./tools/cmd/am-relayRotates logs.
Apps
- secai AI Agents framework.
- arpc REPL Cobra-based REPL.
- am-dbg TUI Debugger Single state machine TUI app.
- libp2p PubSub Simulator Sandbox simulator for libp2p-pubsub.
- libp2p PubSub Benchmark Benchmark of libp2p-pubsub ported to asyncmachine-go.
Documentation
API
The most common API methods are listed below. There’s more for local state machines, but all of these are also implemented in the transparent RPC layer.
// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State
// Api is a subset of Machine for alternative implementations.
type Api interface {
// ///// REMOTE
// Mutations (remote)
Add1(state string, args A) Result
Add(states S, args A) Result
Remove1(state string, args A) Result
Remove(states S, args A) Result
AddErr(err error, args A) Result
AddErrState(state string, err error, args A) Result
Toggle(states S, args A) Result
Toggle1(state string, args A) Result
Set(states S, args A) Result
// Traced mutations (remote)
EvAdd1(event *Event, state string, args A) Result
EvAdd(event *Event, states S, args A) Result
EvRemove1(event *Event, state string, args A) Result
EvRemove(event *Event, states S, args A) Result
EvAddErr(event *Event, err error, args A) Result
EvAddErrState(event *Event, state string, err error, args A) Result
EvToggle(event *Event, states S, args A) Result
EvToggle1(event *Event, state string, args A) Result
// Waiting (remote)
WhenArgs(state string, args A, ctx context.Context) <-chan struct{}
// Getters (remote)
Err() error
// ///// LOCAL
// Checking (local)
IsErr() bool
Is(states S) bool
Is1(state string) bool
Any(states ...S) bool
Any1(state ...string) bool
Not(states S) bool
Not1(state string) bool
IsTime(time Time, states S) bool
WasTime(time Time, states S) bool
IsClock(clock Clock) bool
WasClock(clock Clock) bool
Has(states S) bool
Has1(state string) bool
CanAdd(states S, args A) Result
CanAdd1(state string, args A) Result
CanRemove(states S, args A) Result
CanRemove1(state string, args A) Result
CountActive(states S) int
// Waiting (local)
When(states S, ctx context.Context) <-chan struct{}
When1(state string, ctx context.Context) <-chan struct{}
WhenNot(states S, ctx context.Context) <-chan struct{}
WhenNot1(state string, ctx context.Context) <-chan struct{}
WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
WhenErr(ctx context.Context) <-chan struct{}
WhenQueue(tick Result) <-chan struct{}
// Getters (local)
StateNames() S
StateNamesMatch(re *regexp.Regexp) S
ActiveStates() S
Tick(state string) uint64
Clock(states S) Clock
Time(states S) Time
TimeSum(states S) uint64
QueueTick() uint64
NewStateCtx(state string) context.Context
Export() *Serialized
Schema() Schema
Switch(groups ...S) string
Groups() (map[string][]int, []string)
Index(states S) []int
Index1(state string) int
// Misc (local)
Id() string
ParentId() string
Tags() []string
Ctx() context.Context
String() string
StringAll() string
Log(msg string, args ...any)
SemLogger() SemLogger
Inspect(states S) string
BindHandlers(handlers any) error
DetachHandlers(handlers any) error
HasHandlers() bool
StatesVerified() bool
Tracers() []Tracer
DetachTracer(tracer Tracer) error
BindTracer(tracer Tracer) error
AddBreakpoint1(added string, removed string, strict bool)
AddBreakpoint(added S, removed S, strict bool)
Dispose()
WhenDisposed() <-chan struct{}
IsDisposed() bool
}
Tests
It’s very easy to get a grasp of how asyncmachine works by reading the idiomatic test suite. Consider the example below of a method used to wait for certain arguments passing via a state activation:
func TestWhenArgs(t *testing.T) {
// init
m := NewRels(t, nil)
// bind
whenCh := m.WhenArgs("B", A{"foo": "bar"}, nil)
// incorrect args
m.Add1("B", A{"foo": "foo"})
select {
case <-whenCh:
t.Fatal("whenCh shouldnt be selected")
default:
// pass
}
// correct args
m.Add1("B", A{"foo": "bar"})
select {
case <-whenCh:
// pass
default:
t.Fatal("whenCh should be selected")
}
// dispose
m.Dispose()
<-m.WhenDisposed()
}
Status
Release Candidate, semantically versioned, partially optimized.
Concepts
asyncmachine is loosely based on the following concepts:
