/pkg/machine/README.md
/pkg/machine is a nondeterministic, multi-state, clock-based, relational, optionally accepting, and non-blocking
state machine. It’s a form of a rules engine that can orchestrate blocking APIs into fully controllable async
state-machines. Write ops are state mutations, read ops are state checking,
and subscriptions are state waiting. It’s dependency-free and a building block of a larger project.
Installation
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
Features
Features are explained using Mermaid flow diagrams, and headers link to relevant sections of the manual.
Multi-state
Many states can be active at the same time.
Clock and state contexts
States have clocks that produce contexts (odd = active; even = inactive).
Queue
Queue of mutations enable lock-free actor model.
AOP handlers
States are Aspects with Enter, State, Exit, and End handlers.
Negotiation
Transitions are cancellable (during the negotiation phase).
Relations
States are connected via Require, Remove, and Add relations.
Subscriptions
Channel-broadcast waiting on clock values.
Error handling
Error is a state, handled just like any other mutation.
val, err := someOp()
if err != nil {
mach.AddErr(err, nil)
return // no err needed
}
Tracers
Synchronous tracers for internal events.
TransitionInit TransitionStart TransitionEnd HandlerStart HandlerEnd
MachineInit MachineDispose NewSubmachine QueueEnd SchemaChange VerifyStates
Usage
Raw Strings
// ProcessingFile to FileProcessed
// 1 async and 1 sync state
package main
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
func main() {
// init the state machine
mach := am.New(nil, am.Schema{
"ProcessingFile": { // async
Remove: am.S{"FileProcessed"},
},
"FileProcessed": { // async
Remove: am.S{"ProcessingFile"},
},
"InProgress": { // sync
Auto: true,
Require: am.S{"ProcessingFile"},
},
}, nil)
mach.BindHandlers(&Handlers{
Filename: "README.md",
})
// change the state
mach.Add1("ProcessingFile", nil)
// wait for completed
select {
case <-time.After(5 * time.Second):
println("timeout")
case <-mach.WhenErr(nil):
println("err:", mach.Err())
case <-mach.When1("FileProcessed", nil):
println("done")
}
}
type Handlers struct {
Filename string
}
// negotiation handler
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
// read-only ops
// decide if moving fwd is ok
// no blocking
// lock-free critical section
return true
}
// final handler
func (h *Handlers) ProcessingFileState(e *am.Event) {
// read & write ops
// no blocking
// lock-free critical section
mach := e.Machine
// clock-based expiration context
stateCtx := mach.NewStateCtx("ProcessingFile")
// unblock
go func() {
// re-check the state ctx
if stateCtx.Err() != nil {
return // expired
}
// blocking call
err := processFile(h.Filename, stateCtx)
// re-check the state ctx after a blocking call
if stateCtx.Err() != nil {
return // expired
}
if err != nil {
mach.AddErr(err, nil)
return
}
// move to the next state in the flow
mach.Add1("FileProcessed", nil)
}()
}
Waiting
Subscriptions do not allocate goroutines and channels are reused.
// wait until Foo becomes active
<-mach.When1("Foo", nil)
// wait until Foo becomes inactive
<-mach.WhenNot1("Foo", nil)
// wait for Foo to be activated with an arg ID=123
<-mach.WhenArgs("Foo", am.A{"ID": 123}, nil)
// wait for Foo to have a tick >= 6
<-mach.WhenTime1("Foo", 6, nil)
// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.Time{6, 10}, nil)
// wait for Foo to have a tick increased by 2
<-mach.WhenTicks("Foo", 2, nil)
// wait for a mutation to execute
<-mach.WhenQueue(mach.Add1("Foo", nil))
// wait for an error
<-mach.WhenErr(nil)
// wait on a time query
<-mach.WhenQuery(func(c am.Clock) bool {
// Foo activated >5 times and Bar activated twice as much
return c["Foo"] >= 10 && c["Bar"] >= 2*c["Foo"]
}, nil)
State Targeting
Transition is available within transition handlers.
var (
tx am.Transition
t1 am.Time
t2 am.Time
)
// was Foo added and Bar removed?
added, removed := tx.TimeIndexDiff()
added.Is1("Foo") && removed.Is1("Bar")
// was Foo called?
tx.TimeIndexCalled().Is1("Foo")
// was Foo active before?
tx.TimeIndexBefore().Is1("Foo")
// will Foo be active after?
tx.TimeIndexAfter().Is1("Foo")
// is Foo queued?
mach.WillBe1("Foo")
// number of states which ticked
len(tx.TimeIndexTimeDiff().ActiveStates())
// did Foo tick between t1 and t2?
t2.DiffSince(t1).NonZeroStates().Is1("Foo")
Schema File
// BasicStatesDef contains all the states of the Basic state machine.
type BasicStatesDef struct {
*am.StatesBase
// ErrNetwork indicates a generic network error.
ErrNetwork string
// ErrHandlerTimeout indicates one of state machine handlers has timed out.
ErrHandlerTimeout string
// Start indicates the machine should be working. Removing start can force
// stop the machine.
Start string
// Ready indicates the machine meets criteria to perform work, and requires
// Start.
Ready string
// Healthcheck is a periodic request making sure that the machine is still
// alive.
Healthcheck string
}
var BasicSchema = am.Schema{
// Errors
ssB.ErrNetwork: {Require: S{Exception}},
ssB.ErrHandlerTimeout: {Require: S{Exception}},
// Basics
ssB.Start: {},
ssB.Ready: {Require: S{ssB.Start}},
ssB.Healthcheck: {},
}
Passing Args
// Example with typed state names (ss) and typed arguments (A).
mach.Add1(ss.KillingWorker, Pass(&A{
ConnAddr: ":5555",
WorkerAddr: ":5556",
}))
Mutations and Relations
While mutations are the heartbeat of asyncmachine, it’s the relations which define the rules of the flow. Check out the relations playground and quiz yourself (maybe a fancier playground).
mach := newMach("DryWaterWet", am.Schema{
"Wet": {
Require: am.S{"Water"},
},
"Dry": {
Remove: am.S{"Water"},
},
"Water": {
Add: am.S{"Wet"},
Remove: am.S{"Dry"},
},
})
mach.Add1("Dry", nil)
mach.Add1("Water", nil)
// TODO quiz: is Wet active?
Demos
- Relations playground
- Interactively use the TUI debugger with data pre-generated by a secai bot:
go run github.com/pancsta/asyncmachine-go/tools/cmd/am-dbg@latest \
--import-data https://pancsta.github.io/assets/asyncmachine-go/am-dbg-exports/secai-cook.gob.br \
mach://cook
Examples
All examples and benchmarks can be found in /examples.
Devtools
/tools/cmd/am-dbgMulti-client TUI debugger./tools/cmd/arpcNetwork-native REPL and CLI./tools/cmd/am-visGenerates D2 diagrams./tools/cmd/am-genGenerates schema files and Grafana dashboards./tools/cmd/am-relayRotates logs and relays WASM.
Apps
asyncmachine-go synchronizes state for the following projects:
- secai - AI Workflows framework
- secai Web UI - WebAssembly go-app PWA
- Self-hosting of pkg/rpc, pkg/node, pkg/pubsub
- arpc REPL - Cobra-based REPL
- am-dbg TUI Debugger - Single state-machine TUI app
- libp2p PubSub Simulator - Sandbox simulator for libp2p-pubsub
- libp2p PubSub Benchmark - Benchmark of libp2p-pubsub ported to asyncmachine-go
Documentation
API
The common API methods are listed below. There’s more for local state machines, but all of these are also implemented in the transparent RPC layer.
// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State
// Api is a subset of Machine for alternative implementations.
type Api interface {
// ///// REMOTE
// Mutations (remote)
Add1(state string, args A) Result
Add(states S, args A) Result
Remove1(state string, args A) Result
Remove(states S, args A) Result
AddErr(err error, args A) Result
AddErrState(state string, err error, args A) Result
Toggle(states S, args A) Result
Toggle1(state string, args A) Result
Set(states S, args A) Result
// Traced mutations (remote)
EvAdd1(event *Event, state string, args A) Result
EvAdd(event *Event, states S, args A) Result
EvRemove1(event *Event, state string, args A) Result
EvRemove(event *Event, states S, args A) Result
EvAddErr(event *Event, err error, args A) Result
EvAddErrState(event *Event, state string, err error, args A) Result
EvToggle(event *Event, states S, args A) Result
EvToggle1(event *Event, state string, args A) Result
// Waiting (remote)
WhenArgs(state string, args A, ctx context.Context) <-chan struct{}
// ///// LOCAL
// Checking (local)
Err() error
IsErr() bool
Is(states S) bool
Is1(state string) bool
Any(states ...S) bool
Any1(state ...string) bool
Not(states S) bool
Not1(state string) bool
IsTime(time Time, states S) bool
WasTime(time Time, states S) bool
IsClock(clock Clock) bool
WasClock(clock Clock) bool
Has(states S) bool
Has1(state string) bool
CanAdd(states S, args A) Result
CanAdd1(state string, args A) Result
CanRemove(states S, args A) Result
CanRemove1(state string, args A) Result
Transition() *Transition
// Waiting (local)
When(states S, ctx context.Context) <-chan struct{}
When1(state string, ctx context.Context) <-chan struct{}
WhenNot(states S, ctx context.Context) <-chan struct{}
WhenNot1(state string, ctx context.Context) <-chan struct{}
WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
WhenErr(ctx context.Context) <-chan struct{}
WhenQueue(tick Result) <-chan struct{}
// Getters (local)
StateNames() S
StateNamesMatch(re *regexp.Regexp) S
ActiveStates(states S) S
Tick(state string) uint64
Clock(states S) Clock
Time(states S) Time
QueueTick() uint64
MachineTick() uint32
QueueLen() uint16
NewStateCtx(state string) context.Context
Export() (*Serialized, Schema, error)
Schema() Schema
Switch(groups ...S) string
Groups() (map[string][]int, []string)
Index(states S) []int
Index1(state string) int
// Misc (local)
Id() string
ParentId() string
ParseStates(states S) S
Tags() []string
Context() context.Context
String() string
StringAll() string
Log(msg string, args ...any)
SemLogger() SemLogger
Inspect(states S) string
BindHandlers(handlers any) error
DetachHandlers(handlers any) error
HasHandlers() bool
StatesVerified() bool
Tracers() []Tracer
DetachTracer(tracer Tracer) error
BindTracer(tracer Tracer) error
AddBreakpoint(added S, removed S, strict bool)
AddBreakpoint1(added string, removed string, strict bool)
Dispose()
WhenDisposed() <-chan struct{}
IsDisposed() bool
OnDispose(fn HandlerDispose)
}
Tests
It’s very easy to get a grasp of how asyncmachine works by reading the idiomatic test suite. Consider the example below of a method used to wait for certain arguments passing via a state activation:
func TestWhenArgs(t *testing.T) {
// init
m := NewRels(t, nil)
// bind
whenCh := m.WhenArgs("B", A{"foo": "bar"}, nil)
// incorrect args
m.Add1("B", A{"foo": "foo"})
select {
case <-whenCh:
t.Fatal("whenCh shouldnt be selected")
default:
// pass
}
// correct args
m.Add1("B", A{"foo": "bar"})
select {
case <-whenCh:
// pass
default:
t.Fatal("whenCh should be selected")
}
// dispose
m.Dispose()
<-m.WhenDisposed()
}
Status
Release Candidate, semantically versioned, partially optimized.
Concepts
asyncmachine is loosely based on the following concepts:
- dependency graph
- async event emitter
- nondeterministic state machine
- queue
- aspect-oriented programming
- SQL relations
- Paxos negotiation
- logical clock
- programming by contract
- non-blocking
- actor model
- causal inference
- declarative logic
State Oriented Programming
This is a new term which could possibly encapsulate the unique way of modeling the flow using asyncmachine. Unlike common state machines, there are no transition paths between states, and the activation / deactivation is decided by the state consensus. Consensus is calculated from a mutation, active states, relations between states, and negotiating methods. Just like object-oriented programming solves domain complexity, state-oriented programming tries to solve unpredictability of the flow.
monorepo
Go back to the monorepo root to continue reading.

