logo   async
machine.dev

/pkg/machine/README.md

/pkg/machine is a nondeterministic, multi-state, clock-based, relational, optionally accepting, and non-blocking state machine. It’s a form of a rules engine that can orchestrate blocking APIs into fully controllable async state-machines. Write ops are state mutations, read ops are state checking, and subscriptions are state waiting. It’s dependency-free and a building block of a larger project.

Installation

import am "github.com/pancsta/asyncmachine-go/pkg/machine"

Features

Features are explained using Mermaid flow diagrams, and headers link to relevant sections of the manual.

Multi-state

Many states can be active at the same time.

Diagram showing multi-state capability

Clock and state contexts

States have clocks that produce contexts (odd = active; even = inactive).

Diagram showing state clocks and contexts

Queue

Queue of mutations enables lock-free Actor Model.

Diagram showing queue and mutations

AOP handlers

States are Aspects with Enter, State, Exit, and End handlers.

Diagram showing AOP handlers

Negotiation

Transitions are cancellable (during the negotiation phase).

Diagram showing negotiation phase

Relations

States are connected via Require, Remove, and Add relations.

Diagram showing state relations

Subscriptions

Channel-based broadcast for waiting on clock values.

Diagram showing subscriptions

Error handling

Error is a state, handled just like any other mutation.

val, err := someOp()
if err != nil {
    mach.AddErr(err, nil)
    return // no err needed
}

Tracers

Synchronous tracers for internal events.

TransitionInit TransitionStart TransitionEnd HandlerStart HandlerEnd
MachineInit MachineDispose NewSubmachine QueueEnd SchemaChange VerifyStates

Usage

Raw Strings

// ProcessingFile to FileProcessed
// 1 async and 1 sync state
package main

import am "github.com/pancsta/asyncmachine-go/pkg/machine"

func main() {
    // init the state machine
    mach := am.New(nil, am.Schema{
        "ProcessingFile": { // async
            Remove: am.S{"FileProcessed"},
        },
        "FileProcessed": { // async
            Remove: am.S{"ProcessingFile"},
        },
        "InProgress": { // sync
            Auto:    true,
            Require: am.S{"ProcessingFile"},
        },
    }, nil)
    mach.BindHandlers(&Handlers{
        Filename: "README.md",
    })
    // change the state
    mach.Add1("ProcessingFile", nil)
    // wait for completed
    select {
    case <-time.After(5 * time.Second):
        println("timeout")
    case <-mach.WhenErr(nil):
        println("err:", mach.Err())
    case <-mach.When1("FileProcessed", nil):
        println("done")
    }
}

type Handlers struct {
    Filename string
}

// negotiation handler
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
    // read-only ops
    // decide if moving fwd is ok
    // no blocking
    // lock-free critical section
    return true
}

// final handler
func (h *Handlers) ProcessingFileState(e *am.Event) {
    // read & write ops
    // no blocking
    // lock-free critical section
    mach := e.Machine
    // clock-based expiration context
    stateCtx := mach.NewStateCtx("ProcessingFile")
    // unblock
    go func() {
        // re-check the state ctx
        if stateCtx.Err() != nil {
            return // expired
        }
        // blocking call
        err := processFile(h.Filename, stateCtx)
        // re-check the state ctx after a blocking call
        if stateCtx.Err() != nil {
            return // expired
        }
        if err != nil {
            mach.AddErr(err, nil)
            return
        }
        // move to the next state in the flow
        mach.Add1("FileProcessed", nil)
    }()
}

Waiting

Subscriptions do not allocate goroutines.

// wait until FileDownloaded becomes active
<-mach.When1("FileDownloaded", nil)

// wait until FileDownloaded becomes inactive
<-mach.WhenNot1("DownloadingFile", nil)

// wait for EventConnected to be activated with an arg ID=123
<-mach.WhenArgs("EventConnected", am.A{"ID": 123}, nil)

// wait for Foo to have a tick >= 6
<-mach.WhenTime1("Foo", 6, nil)

// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.Time{6, 10}, nil)

// wait for DownloadingFile to have a tick increased by 2 since now
<-mach.WhenTicks("DownloadingFile", 2, nil)

// wait for a mutation to execute
<-mach.WhenQueue(mach.Add1("Foo", nil), nil)

// wait for an error
<-mach.WhenErr(nil)

Schema File

// BasicStatesDef contains all the states of the Basic state machine.
type BasicStatesDef struct {
    *am.StatesBase

    // ErrNetwork indicates a generic network error.
    ErrNetwork string
    // ErrHandlerTimeout indicates one of state machine handlers has timed out.
    ErrHandlerTimeout string

    // Start indicates the machine should be working. Removing start can force
    // stop the machine.
    Start string
    // Ready indicates the machine meets criteria to perform work, and requires
    // Start.
    Ready string
    // Healthcheck is a periodic request making sure that the machine is still
    // alive.
    Healthcheck string
}

var BasicSchema = am.Schema{

    // Errors

    ssB.ErrNetwork:        {Require: S{Exception}},
    ssB.ErrHandlerTimeout: {Require: S{Exception}},

    // Basics

    ssB.Start:       {},
    ssB.Ready:       {Require: S{ssB.Start}},
    ssB.Healthcheck: {},
}

Passing Args

// Example with typed state names (ssS) and typed arguments (A).
mach.Add1(ssS.KillingWorker, Pass(&A{
    ConnAddr:   ":5555",
    WorkerAddr: ":5556",
}))

Mutations and Relations

While mutations are the heartbeat of asyncmachine, it’s the relations which define the rules of the flow. Check out the relations playground and quiz yourself (maybe a fancier playground).

mach := newMach("DryWaterWet", am.Schema{
    "Wet": {
        Require: am.S{"Water"},
    },
    "Dry": {
        Remove: am.S{"Water"},
    },
    "Water": {
        Add:    am.S{"Wet"},
        Remove: am.S{"Dry"},
    },
})
mach.Add1("Dry", nil)
mach.Add1("Water", nil)
// TODO quiz: is Wet active?

Demos

go run github.com/pancsta/asyncmachine-go/tools/cmd/am-dbg@latest \
  --import-data https://assets.asyncmachine.dev/am-dbg-exports/secai-cook.gob.br \
  mach://cook

Examples

All examples and benchmarks can be found in /examples.

Dev Tools

am-dbg

Apps

Documentation

API

The most common API methods are listed below. There’s more for local state machines, but all of these are also implemented in the transparent RPC layer.

// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State

// Api is a subset of Machine for alternative implementations.
type Api interface {
    // ///// REMOTE

    // Mutations (remote)

    Add1(state string, args A) Result
    Add(states S, args A) Result
    Remove1(state string, args A) Result
    Remove(states S, args A) Result
    AddErr(err error, args A) Result
    AddErrState(state string, err error, args A) Result
    Toggle(states S, args A) Result
    Toggle1(state string, args A) Result
    Set(states S, args A) Result

    // Traced mutations (remote)

    EvAdd1(event *Event, state string, args A) Result
    EvAdd(event *Event, states S, args A) Result
    EvRemove1(event *Event, state string, args A) Result
    EvRemove(event *Event, states S, args A) Result
    EvAddErr(event *Event, err error, args A) Result
    EvAddErrState(event *Event, state string, err error, args A) Result
    EvToggle(event *Event, states S, args A) Result
    EvToggle1(event *Event, state string, args A) Result

    // Waiting (remote)

    WhenArgs(state string, args A, ctx context.Context) <-chan struct{}

    // Getters (remote)

    Err() error

    // ///// LOCAL

    // Checking (local)

    IsErr() bool
    Is(states S) bool
    Is1(state string) bool
    Any(states ...S) bool
    Any1(state ...string) bool
    Not(states S) bool
    Not1(state string) bool
    IsTime(time Time, states S) bool
    WasTime(time Time, states S) bool
    IsClock(clock Clock) bool
    WasClock(clock Clock) bool
    Has(states S) bool
    Has1(state string) bool
    CanAdd(states S, args A) Result
    CanAdd1(state string, args A) Result
    CanRemove(states S, args A) Result
    CanRemove1(state string, args A) Result
    CountActive(states S) int

    // Waiting (local)

    When(states S, ctx context.Context) <-chan struct{}
    When1(state string, ctx context.Context) <-chan struct{}
    WhenNot(states S, ctx context.Context) <-chan struct{}
    WhenNot1(state string, ctx context.Context) <-chan struct{}
    WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
    WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
    WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
    WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
    WhenErr(ctx context.Context) <-chan struct{}
    WhenQueue(tick Result) <-chan struct{}

    // Getters (local)

    StateNames() S
    StateNamesMatch(re *regexp.Regexp) S
    ActiveStates() S
    Tick(state string) uint64
    Clock(states S) Clock
    Time(states S) Time
    TimeSum(states S) uint64
    QueueTick() uint64
    NewStateCtx(state string) context.Context
    Export() *Serialized
    Schema() Schema
    Switch(groups ...S) string
    Groups() (map[string][]int, []string)
    Index(states S) []int
    Index1(state string) int

    // Misc (local)

    Id() string
    ParentId() string
    Tags() []string
    Ctx() context.Context
    String() string
    StringAll() string
    Log(msg string, args ...any)
    SemLogger() SemLogger
    Inspect(states S) string
    BindHandlers(handlers any) error
    DetachHandlers(handlers any) error
    HasHandlers() bool
    StatesVerified() bool
    Tracers() []Tracer
    DetachTracer(tracer Tracer) error
    BindTracer(tracer Tracer) error
    AddBreakpoint1(added string, removed string, strict bool)
    AddBreakpoint(added S, removed S, strict bool)
    Dispose()
    WhenDisposed() <-chan struct{}
    IsDisposed() bool
}

Tests

It’s very easy to get a grasp of how asyncmachine works by reading the idiomatic test suite. Consider the example below of a method used to wait for certain arguments passing via a state activation:

func TestWhenArgs(t *testing.T) {
    // init
    m := NewRels(t, nil)

    // bind
    whenCh := m.WhenArgs("B", A{"foo": "bar"}, nil)

    // incorrect args
    m.Add1("B", A{"foo": "foo"})
    select {
    case <-whenCh:
        t.Fatal("whenCh shouldnt be selected")
    default:
        // pass
    }

    // correct args
    m.Add1("B", A{"foo": "bar"})
    select {
    case <-whenCh:
        // pass
    default:
        t.Fatal("whenCh should be selected")
    }

    // dispose
    m.Dispose()
    <-m.WhenDisposed()
}

Status

Release Candidate, semantically versioned, partially optimized.

Concepts

asyncmachine is loosely based on the following concepts: