logo   async
machine.dev

/pkg/machine/README.md

/pkg/machine is a nondeterministic, multi-state, clock-based, relational, optionally accepting, and non-blocking state machine. It’s a form of a rules engine that can orchestrate blocking APIs into fully controllable async state-machines. Write ops are state mutations, read ops are state checking, and subscriptions are state waiting. It’s dependency-free and a building block of a larger project.

Installation

import am "github.com/pancsta/asyncmachine-go/pkg/machine"

Features

Features are explained using Mermaid flow diagrams, and the headers link to relevant sections of the manual.

Multi-state

Many states can be active at the same time.

Clock and state contexts

States have clocks that produce contexts (odd = active; even = inactive).

Queue

Queue of mutations enable lock-free actor model.

AOP handlers

States are Aspects with Enter, State, Exit, and End handlers.

Negotiation

Transitions are cancellable (during the negotiation phase).

Relations

States are connected via Require, Remove, and Add relations.

Subscriptions

Channel-broadcast waiting on clock values.

Error handling

Error is a state, handled just like any other mutation.

val, err := someOp()
if err != nil {
    mach.AddErr(err, nil)
    return // no err needed
}

Tracers

Synchronous tracers for internal events.

TransitionInit TransitionStart TransitionEnd HandlerStart HandlerEnd
MachineInit MachineDispose NewSubmachine QueueEnd SchemaChange VerifyStates

Usage

Raw Strings

// ProcessingFile to FileProcessed
// 1 async and 1 sync state
package main

import am "github.com/pancsta/asyncmachine-go/pkg/machine"

func main() {
    // init the state machine
    mach := am.New(nil, am.Schema{
        "ProcessingFile": { // async
            Remove: am.S{"FileProcessed"},
        },
        "FileProcessed": { // async
            Remove: am.S{"ProcessingFile"},
        },
        "InProgress": { // sync
            Auto:    true,
            Require: am.S{"ProcessingFile"},
        },
    }, nil)
    mach.HandlersBind(&Handlers{
        Filename: "README.md",
    })
    // change the state
    mach.Add1("ProcessingFile", nil)
    // wait for completed
    select {
    case <-time.After(5 * time.Second):
        println("timeout")
    case <-mach.WhenErr(nil):
        println("err:", mach.Err())
    case <-mach.When1("FileProcessed", nil):
        println("done")
    }
}

type Handlers struct {
    Filename string
}

// negotiation handler
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
    // read-only ops
    // decide if moving fwd is ok
    // no blocking
    // lock-free critical section
    return true
}

// final handler
func (h *Handlers) ProcessingFileState(e *am.Event) {
    // read & write ops
    // no blocking
    // lock-free critical section
    mach := e.Machine
    // clock-based expiration context
    stateCtx := mach.NewStateCtx("ProcessingFile")
    // unblock
    go func() {
        // re-check the state ctx
        if stateCtx.Err() != nil {
            return // expired
        }
        // blocking call
        err := processFile(h.Filename, stateCtx)
        // re-check the state ctx after a blocking call
        if stateCtx.Err() != nil {
            return // expired
        }
        if err != nil {
            mach.AddErr(err, nil)
            return
        }
        // move to the next state in the flow
        mach.Add1("FileProcessed", nil)
    }()
}

Waiting

Subscriptions do not allocate goroutines and channels are reused.

// wait until Foo becomes active
<-mach.When1("Foo", nil)

// wait until Foo becomes inactive
<-mach.WhenNot1("Foo", nil)

// wait for Foo to be activated with an arg ID=123
<-mach.WhenArgs("Foo", am.A{"ID": 123}, nil)

// wait for Foo to have a tick >= 6
<-mach.WhenTime1("Foo", 6, nil)

// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.Time{6, 10}, nil)

// wait for Foo to have a tick increased by 2
<-mach.WhenTicks("Foo", 2, nil)

// wait for next time Foo is active (even if currently active)
<-mach.WhenNextActive("Foo", nil)

// wait for a mutation to execute
<-mach.WhenQueue(mach.Add1("Foo", nil))

// wait for an error
<-mach.WhenErr(nil)

// wait on a time query
<-mach.WhenQuery(func(c am.Clock) bool {
    // Foo activated >5 times and Bar activated twice as much
    return c["Foo"] >= 10 && c["Bar"] >= 2*c["Foo"]
}, nil)

State Targeting

Transition is available within transition handlers.

var (
    tx am.Transition
    t1 am.Time
    t2 am.Time
)

// was Foo added and Bar removed?
added, removed := tx.TimeIndexDiff()
added.Is1("Foo") && removed.Is1("Bar")

// was Foo called?
tx.TimeIndexCalled().Is1("Foo")

// was Foo active before?
tx.TimeIndexBefore().Is1("Foo")

// will Foo be active after?
tx.TimeIndexAfter().Is1("Foo")

// is Foo queued?
mach.WillBe1("Foo")

// is Foo queued at the end?
mach.WillBe1("Foo", am.PositionLast)

// number of states which ticked
len(tx.TimeIndexTimeDiff().ActiveStates())

// did Foo tick between t1 and t2?
t2.DiffSince(t1).NonZeroStates().Is1("Foo")

Schema File

// BasicStatesDef contains all the states of the Basic state machine.
type BasicStatesDef struct {
    *am.StatesBase

    // ErrNetwork indicates a generic network error.
    ErrNetwork string
    // ErrHandlerTimeout indicates one of state machine handlers has timed out.
    ErrHandlerTimeout string

    // Start indicates the machine should be working. Removing start can force
    // stop the machine.
    Start string
    // Ready indicates the machine meets criteria to perform work, and requires
    // Start.
    Ready string
    // Healthcheck is a periodic request making sure that the machine is still
    // alive.
    Healthcheck string
}

var BasicSchema = am.Schema{

    // Errors

    ssB.ErrNetwork:        {Require: S{Exception}},
    ssB.ErrHandlerTimeout: {Require: S{Exception}},

    // Basics

    ssB.Start:       {},
    ssB.Ready:       {Require: S{ssB.Start}},
    ssB.Healthcheck: {},
}

Passing Args

// Example with typed state names (ss) and typed arguments (A).
mach.Add1(ss.KillingWorker, Pass(&A{
    ConnAddr:   ":5555",
    WorkerAddr: ":5556",
}))

Mutations and Relations

While mutations are the heartbeat of asyncmachine, it’s the relations which define the rules of the flow. Check out the relations playground and quiz yourself (maybe a fancier playground).

mach := newMach("DryWaterWet", am.Schema{
    "Wet": {
        Require: am.S{"Water"},
    },
    "Dry": {
        Remove: am.S{"Water"},
    },
    "Water": {
        Add:    am.S{"Wet"},
        Remove: am.S{"Dry"},
    },
})
mach.Add1("Dry", nil)
mach.Add1("Water", nil)
// TODO quiz: is Wet active?

Demos

go run github.com/pancsta/asyncmachine-go/tools/cmd/am-dbg@latest \
  --import-data https://pancsta.github.io/assets/asyncmachine-go/am-dbg-exports/secai-cook.gob.br \
  mach://cook

Examples

All examples and benchmarks can be found in /examples.

Devtools

am-dbg

Apps

asyncmachine-go synchronizes state for the following projects:

Documentation

API

The common API methods are listed below. There’s more for local state machines, but all of these are also implemented in the transparent RPC layer.

// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State

// Api is a subset of Machine for alternative implementations.
type Api interface {
  // ///// REMOTE

  // Mutations (remote)

  Add1(state string, args A) Result
  Add(states S, args A) Result
  Remove1(state string, args A) Result
  Remove(states S, args A) Result
  AddErr(err error, args A) Result
  AddErrState(state string, err error, args A) Result
  Toggle(states S, args A) Result
  Toggle1(state string, args A) Result
  Set(states S, args A) Result

  // Traced mutations (remote)

  EvAdd1(event *Event, state string, args A) Result
  EvAdd(event *Event, states S, args A) Result
  EvRemove1(event *Event, state string, args A) Result
  EvRemove(event *Event, states S, args A) Result
  EvAddErr(event *Event, err error, args A) Result
  EvAddErrState(event *Event, state string, err error, args A) Result
  EvToggle(event *Event, states S, args A) Result
  EvToggle1(event *Event, state string, args A) Result

  // Waiting (remote)

  WhenArgs(state string, args A, ctx context.Context) <-chan struct{}

  // ///// LOCAL

  // Checking (local)

  Err() error
  IsErr() bool
  Is(states S) bool
  Is1(state string) bool
  Any(states ...S) bool
  Any1(state ...string) bool
  Not(states S) bool
  Not1(state string) bool
  IsTime(time Time, states S) bool
  WasTime(time Time, states S) bool
  IsClock(clock Clock) bool
  WasClock(clock Clock) bool
  Has(states S) bool
  Has1(state string) bool
  CanAdd(states S, args A) Result
  CanAdd1(state string, args A) Result
  CanRemove(states S, args A) Result
  CanRemove1(state string, args A) Result
  Transition() *Transition
  IsLocal() bool
  ErrInternal() <-chan error

  // Waiting (local)

  When(states S, ctx context.Context) <-chan struct{}
  When1(state string, ctx context.Context) <-chan struct{}
  WhenNot(states S, ctx context.Context) <-chan struct{}
  WhenNot1(state string, ctx context.Context) <-chan struct{}
  WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
  WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
  WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
  WhenNextActive(state string, ctx context.Context) <-chan struct{}
  WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
  WhenErr(ctx context.Context) <-chan struct{}
  WhenQueue(tick Result) <-chan struct{}

  // Getters (local)

  StateNames() S
  ActiveStates(states S) S
  Tick(state string) uint64
  Clock(states S) Clock
  Time(states S) Time
  QueueTick() uint64
  MachineTick() uint32
  QueueLen() uint16
  NewStateCtx(state string, e ...*Event) context.Context
  Export() (*Serialized, Schema, error)
  Schema() Schema
  Switch(groups ...S) string
  Groups() (map[string][]int, []string)
  Index(states S) []int
  Index1(state string) int

  // Misc (local)

  Id() string
  ParentId() string
  ParseStates(states S) S
  Tags() []string
  Context() context.Context
  ContextParent() context.Context
  String() string
  StringAll() string
  Log(msg string, args ...any)
  SemLogger() SemLogger
  Inspect(states S) string
  HandlersBind(handlers any, opts ...BindOpts) (string, error)
  HandlersBindMaps(negotiations map[string]HandlerNegotiation,
    finals map[string]HandlerFinal, opts ...BindOpts) (string, error)
  HandlersDetach(bindingId string) error
  Handlers() []string
  StatesVerified() bool
  Tracers() []Tracer
  TracerDetach(id string) error
  TracerBind(tracer Tracer) (string, error)
  AddBreakpoint(added S, removed S, strict bool)
  AddBreakpoint1(added string, removed string, strict bool)
  Dispose()
  WhenDisposed() <-chan struct{}
  IsDisposed() bool
  OnDispose(fn HandlerDispose)
}

Tests

It’s very easy to get a grasp of how asyncmachine works by reading the idiomatic test suite. Consider the example below of a method used to wait for certain arguments passing via a state activation:

func TestWhenArgs(t *testing.T) {
    // init
    m := NewRels(t, nil)

    // bind
    whenCh := m.WhenArgs("B", A{"foo": "bar"}, nil)

    // incorrect args
    m.Add1("B", A{"foo": "foo"})
    select {
    case <-whenCh:
        t.Fatal("whenCh shouldnt be selected")
    default:
        // pass
    }

    // correct args
    m.Add1("B", A{"foo": "bar"})
    select {
    case <-whenCh:
        // pass
    default:
        t.Fatal("whenCh should be selected")
    }

    // dispose
    m.Dispose()
    <-m.WhenDisposed()
}

Status

Release Candidate, semantically versioned, partially optimized.

Concepts

asyncmachine is loosely based on the following concepts:

State-Oriented Programming

This is a new term which could possibly encapsulate the unique way of modeling the flow using asyncmachine. Unlike common state machines, there are no transition paths between states, and the activation / deactivation is decided by the state consensus. The consensus is calculated from a mutation, active states, relations between states, and negotiating methods. Just like object-oriented programming solves domain complexity, state-oriented programming tries to solve the unpredictability of the flow.

monorepo

Go back to the monorepo root to continue reading.