logo   async
machine.dev

/docs/cookbook.md

This cookbook containing numerous copy-pasta snippets of common patterns for asyncmachine-go; version v0.18.5. See also:

Activation handler with negotiation

// negotiation handler
func (h *Handlers) NameEnter(e *am.Event) bool {}
// final handler
func (h *Handlers) NameState(e *am.Event) {}

De-activation handler with negotiation

// negotiation handler
func (h *Handlers) NameExit(e *am.Event) bool {}
// final handler
func (h *Handlers) NameEnd(e *am.Event) {}

State to state handlers

// with Foo active, can Bar activate? (negotiation)
func (h *Handlers) FooBar(e *am.Event) {}
// with Bar active, can Baz activate? (negotiation)
func (h *Handlers) BarBaz(e *am.Event) {}

Global negotiation handler

// called at the end of negotiation
func (h *Handlers) AnyEnter(e *am.Event) bool {}

Global final handler

// called as the last final handler
func (h *Handlers) AnyState(e *am.Event) {}

Common imports

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
)

Common env vars

AM_DEBUG=1
AM_DBG_ADDR=1
AM_LOG=3
AM_LOG_FULL=1
AM_HEALTHCHECK=1
AM_SERVICE=orchestrator
AM_OTEL_TRACE=1
AM_OTEL_TRACE_TXS=1
AM_OTEL_TRACE_SKIP_STATES_RE=(^rm-|relay-|rc-)|(RegisterDisposal$)
#
#AM_RPC_LOG_SERVER=1
#AM_RPC_LOG_CLIENT=1
#AM_RPC_LOG_MUX=1
#AM_RPC_DBG=1
#AM_RELAY_DBG=1

Debugging a machine

am-dbg --dir tmp
amhelp.MachDebugEnv(mach)

Enable env debugging

// instead of setting using .env
amhelp.EnableDebugging(false)
amhelp.MachDebugEnv(mach)

Simple logging

mach.SemLogger().SetLevel(am.LogChanges)

Custom logging

// max the log level
mach.SemLogger().SetLevel(am.LogEverything)
// level based dispatcher
mach.SemLogger().SetLogger(func(level LogLevel, msg string, args ...any) {
    if level > am.LogChanges {
        customLogDetails(msg, args...)
        return
    }
    customLog(msg, args...)

})

Logging args

// custom list
mach.SemLogger().SetArgsMapper(am.NewLogArgsMapper(0, []string{"id", "name"}))
// defaults + custom
mach.SemLogger().SetArgsMapperDef("id", "name")
// log typed args
mach.SemLogger().SetArgsMapper(LogArgs)

Minimal machine init

import am "github.com/pancsta/asyncmachine-go/pkg/machine"
// ...
states := am.Schema{"Foo":{}, "Bar":{Require: am.S{"Foo"}}}
mach := am.New(ctx, states, &am.Opts{
    Id: "mach1",
})

Common machine init

import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    ss "PACKAGE/states"
)
// ...
mach, err := am.NewCommon(ctx, "mach1", ss.Schema, ss.Names(), nil, nil, &am.Opts{
    LogLevel: am.LogChanges,
    Parent: machParent,
})

Waiting (subscriptions)

// wait until Foo becomes active
<-mach.When1("Foo", nil)

// wait until Foo becomes inactive
<-mach.WhenNot1("Foo", nil)

// wait for Foo to be activated with an arg ID=123
<-mach.WhenArgs("Foo", am.A{"ID": 123}, nil)

// wait for Foo to have a tick >= 6
<-mach.WhenTime1("Foo", 6, nil)

// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.Time{6, 10}, nil)

// wait for Foo to have a tick increased by 2
<-mach.WhenTicks("Foo", 2, nil)

// wait for a mutation to execute
<-mach.WhenQueue(mach.Add1("Foo", nil))

// wait for an error
<-mach.WhenErr(nil)

// wait on a time query
<-mach.WhenQuery(func(c am.Clock) bool {
    // Foo activated >5 times and Bar activated twice as much
    return c["Foo"] >= 10 && c["Bar"] >= 2*c["Foo"]
}, nil)

State Targeting

var (
    tx am.Transition
    t1 am.Time
    t2 am.Time
)

// was Foo added and Bar removed?
added, removed := tx.TimeIndexDiff()
added.Is1("Foo") && removed.Is1("Bar")

// was Foo called?
tx.TimeIndexCalled().Is1("Foo")

// was Foo active before?
tx.TimeIndexBefore().Is1("Foo")

// will Foo be active after?
tx.TimeIndexAfter().Is1("Foo")

// is Foo queued?
mach.WillBe1("Foo")

// number of states which ticked
len(tx.TimeIndexTimeDiff().ActiveStates())

// did Foo tick between t1 and t2?
t2.DiffSince(t1).NonZeroStates().Is1("Foo")

Synchronous state (single)

Ready: {},

Asynchronous state (double)

DownloadingFile: {
    Remove: groupFileDownloaded,
},
FileDownloaded: {
    Remove: groupFileDownloaded,
},

Asynchronous boolean state (triple)

Connected: {
    Remove:  groupConnected,
},
Connecting: {
    Remove:  groupConnected,
},
Disconnecting: {
    Remove: groupConnected,
},

Full asynchronous boolean state (quadruple)

Connected: {
    Remove:  groupConnected,
},
Connecting: {
    Remove:  groupConnected,
},
Disconnecting: {
    Remove: groupConnected,
},
Disconnected: {
    Auto: 1,
    Remove: groupConnected,
},

Input Multi state

var States = am.Schema{
    ConnectEvent:    {Multi: true},
}
// ...
// called even if the state is already active
func (h *Handlers) ConnectEventState(e *am.Event) {}

Throttled Input Multi state

var States = am.Schema{
    ConnectEvent:    {Multi: true},
}
// ...
mach.PoolSetLimit("ConnectEvent", 10)
// ...
func (h *Handlers) ConnectEventState(e *am.Event) {
// called even if the state is already active
ctx := h.mach.NewStateCtx("Start")
  mach.PoolFork(ctx, e, func() {
    // max 10 concurrent forks
  })
}

Self removal state

func (h *Handlers) FwdStepState(_ *am.Event) {
    // removes itself AFTER the end of the handler
    // like defer, but with a queue
    h.Mach.Remove1("FwdStep", nil)
}

State context fork (raw)

func (h *Handlers) DownloadingFileState(e *am.Event) {
    // open until the state remains active
    ctx := e.Machine.NewStateCtx("DownloadingFile")
    // fork to unblock
    go func() {
        // check if still valid
        if ctx.Err() != nil {
            return // expired
        }
        print("foo")
    }()
}

State context fork (sugar)

// sugar (preferred)
func (h *Handlers) DownloadingFileState(e *am.Event) {
    // open until the state remains active
    ctx := h.mach.NewStateCtx("DownloadingFile")
    // fork to unblock
    h.mach.Fork(ctx, e, func() {
        print("foo")
    })
}

Step context (raw)

var ctx context.Context
// create a ctx just for this select statement (step)
ctxStep, cancelStep = context.WithCancel(context.Background())
defer cancel()
select {
    case ctx.Err() != nil:
        return nil // expired
    case <-mach.WhenErr(ctxStep):
        return mach.Err
    case <-time.After(5 * time.Second):
        return am.ErrTimeout
    case <-mach.When1("Foo", ctxStep):
    case <-mach.WhenArgs("Bar", am.A{"id": 1}, ctxStep):
}
// dispose "when" listeners as soon as they aren't needed
cancel()

Step context (sugar)

var ctx context.Context
// create a ctx just for this select statement (step)
ctxStep, cancelStep = context.WithCancel(context.Background())
err := amhelp.WaitForErrAny(ctx, 5 * time.Second, mach,
    mach.When1("Foo", ctxStep),
    mach.WhenArgs("Bar", am.A{"id": 1}, ctxStep),
)
// dispose "when" listeners as soon as they aren't needed
cancelStep()
switch {
    case ctx.Err() != nil:
        return nil // expired
    case err != nil {
        // includes mach.Err() if mach.IsErr()
        return err
    }
}
// either Foo or Bar[id=1] active

Nested forking

func (h *Handlers) DownloadingFileState(e *am.Event) {
    ctx := h.mach.NewStateCtx("DownloadingFile")
    h.mach.Fork(ctx, e, func() {
        print("DownloadingFileState.Fork")
        // nested unblocking goes without [e], which is not valid at this point
        h.Mach.Go(ctx, func() {
            fmt.Println("DownloadingFileState.Go")
        })
    })
}

Wait for multiple subscriptions

// create a ctx just for this select statement (step)
err := amhelp.WaitForAll(ctx, 5 * time.Second,
    mach.When1("Foo", ctx),
    mach.WhenArgs("Bar", am.A{"id": 1}, ctx),
)
switch {
    case ctx.Err() != nil:
        return nil // expired
    case err != nil {
        return err
    case mach.IsErr() != nil {
        return mach.Err()
    }
}
// both Foo or Bar[id=1] active

Channel responses via arguments

// buffered channels
req := &myReq{
    resp:  make(chan []string, 1),
    err:   make(chan error, 1),
}
defer req.resp.Close()
defer req.err.Close()
// async push to the handler
mach.Add1("GetPeers", am.A{"myReq": req})
// await resp and err
select {
case resp := <-req.resp:
    return resp, nil
case err := <-req.err:
    return nil, err
case <-mach.Ctx.Done():
    return nil, mach.Ctx.Err()
func (p *PubSub) GetPeersState(e *am.Event) {
    p.Mach.Remove1(ss.GetPeers, nil)

    req := e.Args["myReq"].(*myReq)

    // ...

    // buffered
    req.resp <- out
}

Equivalent of select write with default

// vanilla
select {
case p.newPeers <- struct{}{}:
default:
    // canceled
}

// asyncmachine (empty queue)
res := mach.Add1("PeersPending", nil)
if res == am.Canceled {
    // canceled
}

// asyncmachine (with queue)
<-mach.WhenQueue(
    mach.Add1("PeersPending", nil)
)
if mach.Not1("PeersPending") {
    // canceled
}

Custom exception handler

type Handlers struct {
    *am.ExceptionHandler
}

func (h *Handlers) ExceptionState(e *am.Event) {
    // custom handling logic
    // ...

    // call the parent error handler (or not)
    h.ExceptionHandler.ExceptionState(e)
}

State definition

am.Schema{
    "StateName": {

        // properties
        Auto:    true,
        Multi:   true,

        // relations
        Require: am.S{"AnotherState1"},
        Add:     am.S{"AnotherState2"},
        Remove:  am.S{"AnotherState3", "AnotherState4"},
        After:   am.S{"AnotherState2"},
    }
}

Schema template

// BasicStatesDef contains all the states of the Basic state machine.
type BasicStatesDef struct {
    *am.StatesBase

    // ErrNetwork indicates a generic network error.
    ErrNetwork string
    // ErrHandlerTimeout indicates one of state machine handlers has timed out.
    ErrHandlerTimeout string

    // Start indicates the machine should be working. Removing start can force
    // stop the machine.
    Start string
    // Ready indicates the machine meets criteria to perform work, and requires
    // Start.
    Ready string
    // Healthcheck is a periodic request making sure that the machine is still
    // alive.
    Healthcheck string
}

var BasicSchema = am.Schema{

    // Errors

    ssB.ErrNetwork:        {Require: S{Exception}},
    ssB.ErrHandlerTimeout: {Require: S{Exception}},

    // Basics

    ssB.Start:       {},
    ssB.Ready:       {Require: S{ssB.Start}},
    ssB.Healthcheck: {},
}

Transition-aware handler

See also State Targeting.

func (h *Handlers) ClientSelectedEnd(e *am.Event) {
    // clean up, except when switching to SelectingClient
    if e.Transition().TimeIndexCalled().Is1(ss.SelectingClient) {
        return
    }
    h.cleanUp()
}

Batch data into a single transition

var queue []*Msg
var queueMx sync.Mutex
var scheduled bool

func Msg(msgTx *Msg) {
    queueMx.Lock()
    defer queueMx.Unlock()

    if !scheduled {
        scheduled = true
        go func() {
            // wait
            time.Sleep(time.Second)

            queueMx.Lock()
            defer queueMx.Unlock()

            // add in bulk
            mach.Add1("Msgs", am.A{"msgs": queue})
            queue = nil
            scheduled = false
        }()
    }
    // enqueue
    queue = append(queue, msgTx)
}

Switch a state group

switch mach.Switch(ss.GroupPlaying) {
case ss.Playing:
case ss.TailMode:
default:
}

DiffStates to navigate the flow

func (h *Handlers) HelpDialogEnd(e *am.Event) {
    diff := am.StatesDiff(ss.GroupDialog, e.Transition().TargetStates())
    if len(diff) == len(ss.GroupDialog) {
        // all dialogs closed, show main
        h.layoutRoot.SendToFront("main")
    }
}

Pass data from a negotiation handler to the final handler

Not recommended…


func (s *Sim) MsgRandomTopicEnter(e *am.Event) bool {

    p := s.pickRandPeer()
    if p == nil || len(p.simTopics) == 0 {
        // not enough topics
        return false
    }
    randTopic := p.simTopics[rand.Intn(len(p.simTopics))]

    if len(s.GetTopicPeers(randTopic)) < 2 {
        // Not enough peers in topic
        return false
    }

    // pass the topic
    e.Args["Topic.id"] = randTopic

    return len(s.topics) > 0
}

Check if a part of a group of states is active

if d.Mach.Any1(ss.GroupDialog...) {
    d.Mach.Remove(ss.GroupDialog, nil)
    return nil
}

Open Telemetry

err := amtele.MachBindOtelEnv(mach)
if err != nil {
    return err
}

RPC Server

srv, err := arpc.NewServer(ctx, addr, "server", sourceMach, &arpc.ServerOpts{
    Parent: machParent,
})
if err != nil {
    panic(err)
}

// start
srv.Start()
err = amhelp.WaitForAll(ctx, 2*time.Second,
    srv.Mach.When1(ssrpc.ServerStates.RpcReady, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

RPC Client

client, err := arpc.NewClient(ctx, addr, "clientid", schema, &arpc.ClientOpts{
    Consumer: consumer,
    Parent: machParent,
})

// start
client.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    c.Mach.When1(ssrpc.ClientStates.Ready, ctx))
if ctx.Err() != nil {
    return
}

RPC Multiplexer

mux, err := arpc.NewMux(ctx, addr, source.Id()), sourceMach, &arpc.MuxOpts{
    Parent: parentMach,
}
mux.Listener = listener // or mux.Addr := ":1234"
mux.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    mux.Mach.When1(ssrpc.MuxStates.Ready, ctx))

RPC Multiplexer with custom server

mux, err := arpc.NewMux(ctx, cfg.Web.AddrAgent(), "server-"+mach.Id(), mach, &arpc.MuxOpts{
    Parent: mach,
    NewServerFn: func(mux *arpc.Mux, id string, conn net.Conn) (*arpc.Server, error) {
        srv, err := mux.NewDefaultServer(id)
        if err != nil {
            return nil, err
        }

        // instant push
        srv.PushInterval.Store(new(10 * time.Millisecond))
        srv.Conn = conn
        srv.Start(e)

        return srv, nil
    },
})

RPC REPL with args name completion

arpc.MachReplEnv(mach, &arpc.ReplOpts{
    AddrDir:  "tmp",
    Args:     ARpc{},
    ParseRpc: ParseRpc,
})

Pass typesafe args

// Example with typed state names (ss) and typed arguments (A).
mach.Add1(ss.KillingWorker, Pass(&A{
    ConnAddr:   ":5555",
    WorkerAddr: ":5556",
}))

Parse typesafe args

func (p *BasePage) ConfigState(e *am.Event) {
    args := ParseArgs(e.Args)

    p.boot.Config = args.Config
}

Validate args in negotiation

func (p *BasePage) ConfigEnter(e *am.Event) bool {
    return ParseArgs(e.Args.Config) != nil
}

func (p *BasePage) ConfigState(e *am.Event) {
    args := ParseArgs(e.Args)

    p.boot.Config = args.Config
}

Error handling

err := t.tab.Runtime.Enable(ctx)
// check ctx first
if ctx.Err() != nil {
    return // expired
}
// check err second
if err != nil {
    // mutation to the Exception state
    mach.EvAddErrState(e, ss.ErrConnecting, err, nil)
    return
}

Error setters

// AddErr adds [ErrWeb].
func AddErr(
    event *am.Event, mach *am.Machine, err error, args ...am.A,
) am.Result {
    if err == nil {
        return am.Executed
    }
    err = fmt.Errorf("%w: %w", ErrWeb, err)

    return mach.EvAddErrState(event, ss.ErrWeb, err, am.OptArgs(args))
}

Block until state is added

amhelp.Add1Sync(ctx, mach, ss.Foo, args)

Block until async state is added

// wait for Foo, by adding Bar
amhelp.Add1Async(ctx, mach, ss.Foo, ss.Bar, args)

Async disposal handlers

type Dispatcher struct {
  *am.ExceptionHandler
  *ssam.DisposedHandlers
}

func NewDispatcher() *Dispatcher {
  a := &Dispatcher{
      // init required
      DisposedHandlers: &ssam.DisposedHandlers{},
  }
}

// ...

var dispose am.HandlerDispose = func(id string, ctx context.Context) {
    pt.Close()
}
mach.Add1(ssam.DisposedStates.RegisterDisposal, am.A{
    ssam.DisposedArgHandler: dispose,
})

// ...

func (h *Dispatcher) DisposingState(e *am.Event) {
    // self disposal
    for _, client := range h.clients {
        client.NetMach.Add1(ssam.DisposedStates.Disposing, nil)
    }
    // call super
    h.DisposedHandlers.DisposingState(e)
}

Traced mutations

// pass [e] as the first param
func (a *Agent) StartState(e *am.Event) {
  mach.EvAdd1(e, ss.Mock, nil)
  mach.EvRemove1(e, s.State, nil)
  // only mutates if err != nil
  mach.EvAddErr(e,
    mach.BindHandlers(a.handlersWeb), nil)
}