/pkg/helpers/README.md
/pkg/helpers is the swiss army knife for async state machines. It encapsulates many low-level details of
/pkg/machine and provide easy to reason about functions. It specifically deals with
machine time and multi states, which can be quite
verbose.
Highlights:
- synchronous calls
- debugging helpers
- waiting helpers
- failsafe Request object
- test helpers
- miscellaneous utils
Installation
// prod
import amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
// tests
import amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers/testing"
Synchronous Calls
Synchronous wrappers of async state machine calls, which assume a single, blocking scenario controlled with context. Multi states are handled automatically.
- Add1Block
- Add1AsyncBlock
Example - add state StateNameSelected and wait until it becomes active
res := amhelp.Add1Block(ctx, mach, ss.StateNameSelected, am.A{"state": state})
print(mach.Is1(ss.StateNameSelected)) // true
print(res) // am.Executed or am.Canceled, never am.Queued
Example - wait for ScrollToTx, triggered by ScrollToMutTx
res := amhelp.Add1AsyncBlock(ctx, mach, ss.ScrollToTx, ss.ScrollToMutTx, am.A{
"state": state,
"fwd": true,
})
print(mach.Is1(ss.ScrollToTx)) // true
print(res) // am.Executed or am.Canceled, never am.Queued
Debugging Helpers
Activate debugging in 1 line - either for a single machine, or for the whole process.
- EnableDebugging
- SetLogLevel
- MachDebugEnv
- MachDebug
- Healthcheck
- IsDebug
Example - enable dbg telemetry using conventional env vars.
// set env vars
amhelp.EnableDebugging(true)
// debug
amhelp.MachDebugEnv(mach)
Waiting Helpers
Waiting helpers mostly provide wrappers for when methods in select statements with a
state context, and timeout context. State context is checked before, and timeout
transformed into am.ErrTimeout. It reduces state context checking, if used as a first call after a blocking call /
start (which is a common case).
- WaitForAll
- WaitForErrAll
- WaitForAny
- WaitForErrAny
- GroupWhen1
- Wait
- ExecAndClose
Example - wait for bootstrap RPC to become ready.
//
err := amhelp.WaitForAll(ctx, s.ConnTimeout,
boot.server.Mach.When1(ssrpc.ServerStates.RpcReady, nil))
if ctx.Err() != nil {
return // expired
}
if err != nil {
AddErrWorker(s.Mach, err, Pass(argsOut))
return
}
Example - wait for all clients to be ready.
var clients []*node.Client
amhelp.WaitForAll(t, ctx, 2*time.Second,
amhelp.GroupWhen1(clients, ssC.Ready, nil)...)
Example - wait 1s with a state context.
if !amhelp.Wait(ctx, 1*time.Second) {
return // expired
}
// wait ok
Example - wait for Foo, Bar, or Exception.
ctx := client.Mach.NewStateCtx("Start")
err := amhelp.WaitForErrAll(ctx, 1*time.Second, mach,
mach.When1("Foo", nil),
mach.When1("Bar", nil))
if ctx.Err() != nil {
// no err
return nil // expired
}
if err != nil {
// either timeout happened or Exception has been activated
return err
}
Failsafe Request Object
Failsafe mutation requests use failsafe-go policies when mutating a state
machine. Useful for network communication, or when waiting on a state isn’t an option, but also helps with overloaded
machines and populated queues. Policies can be customized and MutRequest objects cloned.
- NewReqAdd
- NewReqAdd1
- NewReqRemove
- NewReqRemove1
- NewMutRequest
Example - try to add state WorkerRequested with 10 retries, 100ms delay, 5s backoff, and 5s max duration.
// failsafe worker request
_, err := amhelp.NewReqAdd1(c.Mach, ssC.WorkerRequested, nil).Run(ctx)
if err != nil {
return err
}
Test Helpers
Test helpers mostly provide assertions for stretchr/testify, but also wrappers
for regular helpers which go t.Fatal on errors (to save some lines).
- AssertIs
- AssertIs1
- AssertNot
- AssertNot1
- AssertNoErrNow
- AssertNoErrEver
- AssertErr
- MachDebug
- MachDebugEnv
- Wait
- WaitForAll
- WaitForAny
- WaitForErrAll
- WaitForErrAny
- GroupWhen1
Example - assert mach has ClientConnected active.
amhelpt.AssertIs1(t, mach, "ClientConnected")
Example - assert mach never had an error (not just currently).
amhelpt.AssertNoErrEver(t, mach)
Complete API
Package-Level Functions (total 56)
func Activations(u uint64) int
func Add1Async(ctx context.Context, mach am.Api, waitState string, addState string, args am.A) am.Result
func Add1Block(ctx context.Context, mach am.Api, state string, args am.A) am.Result
func Add1Sync(ctx context.Context, mach am.Api, state string, args am.A) am.Result
func ArgsToArgs[T](src interface{}, dest T) T
func ArgsToLogMap(args interface{}, maxLen int) map[string]string
func AskAdd(mach am.Api, states am.S, args am.A) am.Result
func AskAdd1(mach am.Api, state string, args am.A) am.Result
func AskEvAdd(e *am.Event, mach am.Api, states am.S, args am.A) am.Result
func AskEvAdd1(e *am.Event, mach am.Api, state string, args am.A) am.Result
func AskEvRemove(e *am.Event, mach am.Api, states am.S, args am.A) am.Result
func AskEvRemove1(e *am.Event, mach am.Api, state string, args am.A) am.Result
func AskRemove(mach am.Api, states am.S, args am.A) am.Result
func AskRemove1(mach am.Api, state string, args am.A) am.Result
func CantAdd(mach am.Api, states am.S, args am.A) bool
func CantAdd1(mach am.Api, state string, args am.A) bool
func CantRemove(mach am.Api, states am.S, args am.A) bool
func CantRemove1(mach am.Api, state string, args am.A) bool
func CopySchema(source am.Schema, target *am.Machine, states am.S) error
func CountRelations(state *am.State) int
func EnableDebugging(stdout bool)
func EvalGetter[T](ctx context.Context, source string, maxTries int, mach *am.Machine, eval func() (T, error)) (T, error)
func ExecAndClose(fn func()) <-chan struct{}
func GetTransitionStates(tx *am.Transition, index am.S) (added am.S, removed am.S, touched am.S)
func GroupWhen1(machs []am.Api, state string, ctx context.Context) ([]<-chan struct{}, error)
func Healthcheck(mach am.Api)
func Implements(statesChecked, statesNeeded am.S) error
func IndexesToStates(allStates am.S, indexes []int) am.S
func Interval(ctx context.Context, length time.Duration, interval time.Duration, fn func() bool) error
func IsDebug() bool
func IsMulti(mach am.Api, state string) bool
func IsTelemetry() bool
func IsTestRunner() bool
func MachDebug(mach am.Api, amDbgAddr string, logLvl am.LogLevel, stdout bool, semConfig *am.SemConfig)
func MachDebugEnv(mach am.Api)
func NewMirror(id string, flat bool, source *am.Machine, handlers any, states am.S) (*am.Machine, error)
func NewMutRequest(mach am.Api, mutType am.MutationType, states am.S, args am.A) *MutRequest
func NewReqAdd(mach am.Api, states am.S, args am.A) *MutRequest
func NewReqAdd1(mach am.Api, state string, args am.A) *MutRequest
func NewReqRemove(mach am.Api, states am.S, args am.A) *MutRequest
func NewReqRemove1(mach am.Api, state string, args am.A) *MutRequest
func NewStateLoop(mach *am.Machine, loopState string, optCheck func() bool) *StateLoop
func Pool(limit int) *errgroup.Group
func PrefixStates(schema am.Schema, prefix string, removeDups bool, optWhitelist, optBlacklist S) am.Schema
func RemoveMulti(mach am.Api, state string) am.HandlerFinal
func ResultToErr(result am.Result) error
func SchemaHash(schema am.Schema) string
func SemConfig(forceFull bool) *am.SemConfig
func SetEnvLogLevel(level am.LogLevel)
func StatesToIndexes(allStates am.S, states am.S) []int
func TagValue(tags []string, key string) string
func Wait(ctx context.Context, length time.Duration) bool
func WaitForAll(ctx context.Context, timeout time.Duration, chans ...<-chan struct{}) error
func WaitForAny(ctx context.Context, timeout time.Duration, chans ...<-chan struct{}) error
func WaitForErrAll(ctx context.Context, timeout time.Duration, mach am.Api, chans ...<-chan struct{}) error
func WaitForErrAny(ctx context.Context, timeout time.Duration, mach *am.Machine, chans ...<-chan struct{}) error
Package-Level Variables (only one)
var SlogToMachLogOpts *slog.HandlerOptions
Package-Level Constants (total 11)
const EnvAmHealthcheck = "AM_HEALTHCHECK"
const EnvAmLogArgs = "AM_LOG_ARGS"
const EnvAmLogChecks = "AM_LOG_CHECKS"
const EnvAmLogFile = "AM_LOG_FILE"
const EnvAmLogFull = "AM_LOG_FULL"
const EnvAmLogGraph = "AM_LOG_GRAPH"
const EnvAmLogQueued = "AM_LOG_QUEUED"
const EnvAmLogStateCtx = "AM_LOG_STATE_CTX"
const EnvAmLogSteps = "AM_LOG_STEPS"
const EnvAmLogWhen = "AM_LOG_WHEN"
const EnvAmTestRunner = "AM_TEST_RUNNER"
Documentation
Status
Testing, not semantically versioned.