/pkg/rpc/README.md
aRPC is a transparent RPC for state machines implemented using asyncmachine-go. It’s
clock-based and features many optimizations, e.g. having most of the API methods executed locally (as state changes are
regularly pushed to the client). It’s built on top of cenkalti/rpc2, net/rpc,
and soheilhy/cmux. Check out a dedicated example, gRPC benchmark,
and integration tests tutorial.
Features
- mutation methods
- wait methods
- clock pushes (from source mutations)
- remote contexts
- multiplexing
- reconnect / fail-safety
- worker sending payloads to the client
- REPL
- queue ticks support
- initial optimizations
Not implemented (yet):
WhenArgs,Err()PushAllTicks- chunked payloads
- TLS
- compression
- msgpack encoding
Each RPC server can handle 1 RPC client at a time, but 1 state source (asyncmachine) can have many RPC servers attached to itself (via Tracer API). Additionally, remote RPC workers can also have RPC servers attached to themselves, creating a tree structure (see /examples/benchmark_state_source).
Components
Worker
Any state machine can be exposed as an RPC worker, as long as it implements /pkg/rpc/states/WorkerStructDef.
This can be done either manually, or by using state helpers (SchemaMerge,
SAdd), or by generating a states file with
am-gen. It’s also required to have the states verified by Machine.VerifyStates.
Worker can send data to the client via the SendPayload state.
import (
am "github.com/pancsta/asyncmachine-go/pkg/machine"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
// ...
// inherit from RPC worker
ssStruct := am.SchemaMerge(ssrpc.WorkerStruct, am.Schema{
"Foo": {Require: am.S{"Bar"}},
"Bar": {},
})
ssNames := am.SAdd(ssrpc.WorkerStates.Names(), am.S{"Foo", "Bar"})
// init
worker := am.New(ctx, ssStruct, nil)
worker.VerifyStates(ssNames)
// ...
// send data to the client
worker.Add1(ssrpc.WorkerStates.SendPayload, arpc.Pass(&arpc.A{
Name: "mypayload",
Payload: &arpc.ArgsPayload{
Name: "mypayload",
Source: "worker1",
Data: []byte{1,2,3},
},
}))
Worker Schema
State schema from /pkg/rpc/states/ss_rpc_worker.go.
type WorkerStatesDef struct {
ErrProviding string
ErrSendPayload string
SendPayload string
}
Server
Each RPC server can handle 1 client at a time. Both client and server need the same worker states definition (structure map and ordered list of states). After the initial handshake, server will be pushing local state changes every PushInterval, while state changes made by an RPC client are delivered synchronously. Server starts listening on either Addr, Listener, or Conn. Basic ACL is possible via AllowId.
import (
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
// ...
// var addr string
// var worker *am.Machine
// init
s, err := arpc.NewServer(ctx, addr, worker.ID, worker, nil)
if err != nil {
panic(err)
}
// start
s.Start()
err = amhelp.WaitForAll(ctx, 2*time.Second,
s.Mach.When1(ssrpc.ServerStates.RpcReady, ctx))
if ctx.Err() != nil {
return
}
if err != nil {
return err
}
// react to the client
<-worker.When1("Foo", nil)
print("Client added Foo")
worker.Add1("Bar", nil)
Server Schema
State schema from /pkg/rpc/states/ss_rpc_server.go.
Client
Each RPC client can connect to 1 server and needs to know worker’s machine schema and order. Data send by a worker via
SendPayload will be received by a Consumer machine
(passed via ClientOpts.Consumer) as an Add
mutation of the WorkerPayload state (see a detailed diagram). Client supports
fail-safety for both connection (eg ConnRetries,
ConnRetryBackoff) and calls (eg CallRetries,
CallRetryBackoff).
After the client’s Ready state becomes active, it exposes a remote worker at client.Worker. Remote worker implements
most of Machine’s methods, many of which
are evaluated locally (like Is, When,
NewStateCtx). See machine.Api
for a full list.
import (
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
// ...
var addr string
// worker state structure
var ssStruct am.Schema
// worker state names
var ssNames am.S
// consumer
consumer := am.New(ctx, ssrpc.ConsumerStruct, nil)
// init
c, err := arpc.NewClient(ctx, addr, "clientid", ssStruct, ssNames, &arpc.ClientOpts{
Consumer: consumer,
})
if err != nil {
panic(err)
}
// start
c.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
c.Mach.When1(ssrpc.ClientStates.Ready, ctx))
if ctx.Err() != nil {
return
}
if err != nil {
return err
}
// use the remote worker
c.Worker.Add1("Foo", nil)
<-c.Worker.When1("Bar", nil)
print("Server added Bar")
Client Schema
State schema from /pkg/rpc/states/ss_rpc_client.go.
Multiplexer
Because 1 server can serve only 1 client (for simplicity), it’s often required to use a port multiplexer. It’s very simple to create one using NewMux and a callback function, which returns a new server instance.
import (
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
// ...
// new server per each new client (optional)
var newServer arpc.MuxNewServer = func(num int64, _ net.Conn) (*Server, error) {
name := fmt.Sprintf("%s-%d", t.Name(), num)
s, err := NewServer(ctx, "", name, w, nil)
if err != nil {
t.Fatal(err)
}
return s, nil
}
// start cmux
mux, err := arpc.NewMux(ctx, t.Name(), newServer, nil)
if err != nil {
t.Fatal(err)
}
mux.Listener = listener // or mux.Addr := ":1234"
mux.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
mux.Mach.When1(ssrpc.MuxStates.Ready, ctx))
if ctx.Err() != nil {
return
}
if err != nil {
return err
}
Multiplexer Schema
State schema from /pkg/rpc/states/ss_mux.go.
Documentation
Benchmark: aRPC vs gRPC
A simple and opinionated benchmark showing a subscribe-get-process scenario, implemented in both gRPC and aRPC. See
/examples/benchmark_grpc for details and source code.
> task benchmark-grpc
...
BenchmarkClientArpc
client_arpc_test.go:136: Transferred: 609 bytes
client_arpc_test.go:137: Calls: 4
client_arpc_test.go:138: Errors: 0
client_arpc_test.go:136: Transferred: 1,149,424 bytes
client_arpc_test.go:137: Calls: 10,003
client_arpc_test.go:138: Errors: 0
BenchmarkClientArpc-8 10000 248913 ns/op 28405 B/op 766 allocs/op
BenchmarkClientGrpc
client_grpc_test.go:117: Transferred: 1,113 bytes
client_grpc_test.go:118: Calls: 9
client_grpc_test.go:119: Errors: 0
client_grpc_test.go:117: Transferred: 3,400,812 bytes
client_grpc_test.go:118: Calls: 30,006
client_grpc_test.go:119: Errors: 0
BenchmarkClientGrpc-8 10000 262693 ns/op 19593 B/op 391 allocs/op
BenchmarkClientLocal
BenchmarkClientLocal-8 10000 434.4 ns/op 16 B/op 1 allocs/op
PASS
ok github.com/pancsta/asyncmachine-go/examples/benchmark_grpc 5.187s
API
aRPC implements /pkg/machine#Api, which is a large subset of /pkg/machine#Machine methods. Below the full list,
with distinction which methods happen where (locally or on remote).
// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State
// Api is a subset of Machine for alternative implementations.
type Api interface {
// ///// REMOTE
// Mutations (remote)
Add1(state string, args A) Result
Add(states S, args A) Result
Remove1(state string, args A) Result
Remove(states S, args A) Result
AddErr(err error, args A) Result
AddErrState(state string, err error, args A) Result
Toggle(states S, args A) Result
Toggle1(state string, args A) Result
Set(states S, args A) Result
// Traced mutations (remote)
EvAdd1(event *Event, state string, args A) Result
EvAdd(event *Event, states S, args A) Result
EvRemove1(event *Event, state string, args A) Result
EvRemove(event *Event, states S, args A) Result
EvAddErr(event *Event, err error, args A) Result
EvAddErrState(event *Event, state string, err error, args A) Result
EvToggle(event *Event, states S, args A) Result
EvToggle1(event *Event, state string, args A) Result
// Waiting (remote)
WhenArgs(state string, args A, ctx context.Context) <-chan struct{}
// Getters (remote)
Err() error
// ///// LOCAL
// Checking (local)
IsErr() bool
Is(states S) bool
Is1(state string) bool
Any(states ...S) bool
Any1(state ...string) bool
Not(states S) bool
Not1(state string) bool
IsTime(time Time, states S) bool
WasTime(time Time, states S) bool
IsClock(clock Clock) bool
WasClock(clock Clock) bool
Has(states S) bool
Has1(state string) bool
CanAdd(states S, args A) Result
CanAdd1(state string, args A) Result
CanRemove(states S, args A) Result
CanRemove1(state string, args A) Result
CountActive(states S) int
// Waiting (local)
When(states S, ctx context.Context) <-chan struct{}
When1(state string, ctx context.Context) <-chan struct{}
WhenNot(states S, ctx context.Context) <-chan struct{}
WhenNot1(state string, ctx context.Context) <-chan struct{}
WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
WhenErr(ctx context.Context) <-chan struct{}
WhenQueue(tick Result) <-chan struct{}
// Getters (local)
StateNames() S
StateNamesMatch(re *regexp.Regexp) S
ActiveStates() S
Tick(state string) uint64
Clock(states S) Clock
Time(states S) Time
TimeSum(states S) uint64
QueueTick() uint64
NewStateCtx(state string) context.Context
Export() *Serialized
Schema() Schema
Switch(groups ...S) string
Groups() (map[string][]int, []string)
Index(states S) []int
Index1(state string) int
// Misc (local)
Id() string
ParentId() string
Tags() []string
Ctx() context.Context
String() string
StringAll() string
Log(msg string, args ...any)
SemLogger() SemLogger
Inspect(states S) string
BindHandlers(handlers any) error
DetachHandlers(handlers any) error
HasHandlers() bool
StatesVerified() bool
Tracers() []Tracer
DetachTracer(tracer Tracer) error
BindTracer(tracer Tracer) error
AddBreakpoint1(added string, removed string, strict bool)
AddBreakpoint(added S, removed S, strict bool)
Dispose()
WhenDisposed() <-chan struct{}
IsDisposed() bool
}
Tests
aRPC passes the whole test suite of /pkg/machine
for the exposed methods and provides a couple of optimization-focused tests (on top of tests for
basic RPC).
Optimizations
aRPC implements several optimization strategies to achieve the results.
net/rpcmethod names as runes- binary format of
encoding/gob - index-based clock
[0, 100, 0, 120]
- diff-based clock updates
[0, 1, 0, 1]
- debounced server-mutation clock pushes
[0, 5, 2, 1]
- partial clock updates
[[1, 1], [3, 1]]
Status
Testing, not semantically versioned.