Skip to main content
Distributed Data provides replicated data structures β€” Conflict-free Replicated Data Types (CRDTs) β€” that can be updated independently on any node and are guaranteed to converge to a consistent state. No coordination, no locks, no consensus rounds. This enables use cases like distributed counters, cluster-wide session registries, feature flags, and shopping carts where every node can read and write locally with immediate response, and all nodes converge automatically.

Architecture

Each node in the cluster runs its own Replicator system actor (GoAktReplicator). There is no central coordinator β€” each Replicator owns its node’s local CRDT store and communicates with peer Replicators via GoAkt’s existing TopicActor pub/sub system. Distributed Data architecture - each node runs a Replicator actor with a local CRDT store; all Replicators subscribe to a shared topic and exchange deltas via TopicActor fan-out How updates flow:
  1. A user actor sends an Update message to its local Replicator via Tell or Ask.
  2. The Replicator applies the mutation locally, extracts the delta, and publishes it to the shared goakt.crdt.deltas topic.
  3. The TopicActor delivers the delta to all peer Replicators (existing cluster pub/sub β€” no custom gossip).
  4. Each peer Replicator merges the delta into its local store and notifies any local watchers.
Delta replication flow - user actor on Node A sends Update to local Replicator, Replicator publishes delta via TopicActor, peer Replicators on Node B and C receive and merge Anti-entropy runs periodically as a safety net. Each Replicator exchanges version digests with a random peer, and any divergent keys are repaired. This ensures convergence even if some deltas were lost during a network partition.

CRDT types

All types implement the ReplicatedData interface and are immutable values β€” every mutation returns a new value plus a delta.
TypeDescriptionPrimary use cases
GCounterGrow-only counter. Each node maintains its own increment slot; value = sum of all slots.Monotonic metrics, event counts
PNCounterPositive-negative counter. Two GCounters: one for increments, one for decrements.Rate limiters, gauges, inventory
LWWRegister[T]Last-writer-wins register. Stores a single value with a timestamp.Configuration, feature flags
ORSet[T]Observed-remove set. Supports add and remove without conflict (add-wins semantics).Session tracking, subscriptions
ORMap[K, V]Map where keys are an OR-Set and values are themselves CRDTs.Shopping carts, user profiles
FlagBoolean that can only transition from false to true.One-time signals, feature activation
MVRegister[T]Multi-value register. Concurrent writes are preserved, not overwritten.Conflict-visible state

Typed keys

CRDT keys are typed and serializable. The type parameter ensures compile-time safety β€” no type assertions on read.
var (
    requestCount   = crdt.PNCounterKey("request-count")
    activeSessions = crdt.ORSetKey[string]("active-sessions")
    featureFlag    = crdt.LWWRegisterKey[string]("dark-mode")
    cartItems      = crdt.ORMapKey[string, *crdt.GCounter]("cart")
)

Enabling CRDTs

CRDT replication is enabled through ClusterConfig.WithCRDT(...). When not enabled, no Replicator is spawned β€” zero overhead.
clusterConfig := actor.NewClusterConfig().
    WithDiscovery(discoveryProvider).
    WithCRDT(
        crdt.WithAntiEntropyInterval(30 * time.Second),
        crdt.WithPruneInterval(5 * time.Minute),
        crdt.WithTombstoneTTL(24 * time.Hour),
    )

system, _ := actor.NewActorSystem("my-system",
    actor.WithCluster(clusterConfig),
)

Configuration options

OptionDefaultDescription
WithAntiEntropyInterval(d)30sInterval between anti-entropy digest exchanges with a random peer
WithMaxDeltaSize(n)64KBMaximum serialized delta size per publish; larger deltas trigger full-state transfer
WithPruneInterval(d)5mInterval for pruning expired tombstones and compacting CRDTs
WithTombstoneTTL(d)24hHow long deletion tombstones are retained to prevent key resurrection
WithCoordinationTimeout(d)5sTimeout for coordinated WriteTo/ReadFrom operations
WithRole(r)(all nodes)Restrict CRDT replication to nodes with this role
WithSnapshotInterval(d)(disabled)Interval for periodic BoltDB snapshots (requires WithSnapshotDir)
WithSnapshotDir(dir)(disabled)Directory for snapshot files

Using the Replicator

The Replicator PID is accessed via ActorSystem.Replicator(). It returns nil if CRDTs are not enabled. All interaction uses standard Tell and Ask from within an actor’s Receive handler.

Writing data

Send an Update[T] to the Replicator. The Modify function receives the current typed value and returns the updated value.
func (a *RateLimiter) Receive(ctx *actor.ReceiveContext) {
    switch ctx.Message().(type) {
    case *IncomingRequest:
        replicator := ctx.ActorSystem().Replicator()
        ctx.Tell(replicator, &crdt.Update[*crdt.PNCounter]{
            Key:     crdt.PNCounterKey("request-count"),
            Initial: crdt.NewPNCounter(),
            Modify: func(current *crdt.PNCounter) *crdt.PNCounter {
                return current.Increment(ctx.ActorSystem().PeersAddress(), 1)
            },
        })
    }
}
FieldPurpose
KeyTyped key identifying the CRDT
InitialZero value used if the key does not yet exist
ModifyFunction that receives the current value and returns the updated value
WriteTo(optional) Coordination level: crdt.Majority or crdt.All

Reading data

Send a Get[T] to the Replicator. The response is typed β€” no type assertion on the CRDT data itself.
func (a *Dashboard) Receive(ctx *actor.ReceiveContext) {
    switch ctx.Message().(type) {
    case *RefreshMetrics:
        replicator := ctx.ActorSystem().Replicator()
        reply := ctx.Ask(replicator, &crdt.Get[*crdt.PNCounter]{
            Key: crdt.PNCounterKey("request-count"),
        }, time.Second)
        if getResp, ok := reply.(*crdt.GetResponse[*crdt.PNCounter]); ok && getResp.Data != nil {
            a.currentCount = getResp.Data.Value()
        }
    }
}
FieldPurpose
KeyTyped key to read
ReadFrom(optional) Coordination level: crdt.Majority or crdt.All

Subscribing to changes

Actors can watch keys and receive Changed[T] notifications whenever the value changes β€” from local updates or peer deltas.
func (a *SessionMonitor) Receive(ctx *actor.ReceiveContext) {
    switch msg := ctx.Message().(type) {
    case *actor.PostStart:
        replicator := ctx.ActorSystem().Replicator()
        ctx.Tell(replicator, &crdt.Subscribe[*crdt.ORSet[string]]{
            Key: crdt.ORSetKey[string]("active-sessions"),
        })
    case *crdt.Changed[crdt.ReplicatedData]:
        if set, ok := msg.Data.(*crdt.ORSet[string]); ok {
            log.Printf("active sessions updated: %v", set.Elements())
        }
    }
}

Deleting data

ctx.Tell(replicator, &crdt.Delete[*crdt.ORSet[string]]{
    Key:     crdt.ORSetKey[string]("active-sessions"),
    WriteTo: crdt.Majority, // optional coordination
})
Deletion removes the key from the local store and publishes a tombstone to all peers. Tombstones are retained for the configured TTL and pruned periodically. While a tombstone is active, updates for that key are rejected to prevent resurrection.

Consistency model

Default: local-first

Every operation is local-first by default:
  • Writes apply the mutation locally and publish the delta asynchronously. The Replicator returns immediately.
  • Reads return the local value immediately.
This is eventually consistent, with convergence bounded by TopicActor delivery latency (typically sub-second in a healthy cluster).

Optional coordination

When stronger guarantees are needed, set WriteTo or ReadFrom on the message:
LevelWriteTo behaviorReadFrom behavior
(not set)Apply locally + publish via TopicActorReturn local value
MajorityApply locally + send delta to N/2+1 peers + wait for acksQuery N/2+1 peers, merge results with local value
AllApply locally + send delta to all peers + wait for acksQuery all peers, merge results with local value
When ReadFrom: Majority and WriteTo: Majority are used together, the system provides strong eventual consistency with read-your-writes guarantees.

Durable snapshots

By default, the CRDT store is purely in-memory. If the Replicator crashes, state is rebuilt from peers via anti-entropy. For faster recovery, enable durable snapshots to persist the store to BoltDB periodically.
crdt.WithSnapshotInterval(1 * time.Minute),
crdt.WithSnapshotDir("/var/data/goakt/crdt-snapshots"),
When enabled:
  • The Replicator saves the full store to BoltDB at the configured interval.
  • On startup (or supervisor restart), the Replicator restores from the latest snapshot before participating in anti-entropy.
  • A final snapshot is persisted during graceful shutdown. The snapshot file is retained on disk so the Replicator can restore quickly on the next startup.
Snapshots are a recovery optimization, not a durability guarantee. The source of truth is always the distributed CRDT state across all peers. If a snapshot is lost or corrupted, the Replicator rebuilds from peers.

Observability

When WithMetrics() is enabled on the actor system, the Replicator registers OpenTelemetry instruments:
InstrumentTypeDescription
crdt.replicator.store.sizeInt64ObservableGaugeNumber of CRDT keys in the local store
crdt.replicator.merge.countInt64ObservableCounterTotal merges performed from peer deltas
crdt.replicator.delta.publish.countInt64ObservableCounterDeltas published via TopicActor
crdt.replicator.delta.receive.countInt64ObservableCounterDeltas received from peers
crdt.replicator.coordinated.write.countInt64ObservableCounterCoordinated write operations
crdt.replicator.coordinated.read.countInt64ObservableCounterCoordinated read operations
crdt.replicator.antientropy.countInt64ObservableCounterAnti-entropy rounds completed
crdt.replicator.tombstone.countInt64ObservableGaugeActive tombstones pending pruning
All instruments carry an actor.system attribute with the actor system name.

Full example

A complete working example: a SessionTracker actor that uses an ORSet to maintain a cluster-wide set of active sessions, and a SessionReporter that watches for changes.
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/tochemey/goakt/v4/actor"
    "github.com/tochemey/goakt/v4/crdt"
    "github.com/tochemey/goakt/v4/discovery"
)

// -- Keys (defined once, typically package-level) --

var sessionsKey = crdt.ORSetKey[string]("active-sessions")

// -- Messages --

type AddSession struct{ SessionID string }
type RemoveSession struct{ SessionID string }
type PrintSessions struct{}

// -- SessionTracker: writes to the distributed set --

type SessionTracker struct{}

func (a *SessionTracker) PreStart(*actor.Context) error  { return nil }
func (a *SessionTracker) PostStop(*actor.Context) error  { return nil }

func (a *SessionTracker) Receive(ctx *actor.ReceiveContext) {
    replicator := ctx.ActorSystem().Replicator()
    if replicator == nil {
        return
    }

    switch msg := ctx.Message().(type) {
    case *AddSession:
        nodeID := ctx.ActorSystem().PeersAddress()
        ctx.Tell(replicator, &crdt.Update[*crdt.ORSet[string]]{
            Key:     sessionsKey,
            Initial: crdt.NewORSet[string](),
            Modify: func(current *crdt.ORSet[string]) *crdt.ORSet[string] {
                return current.Add(nodeID, msg.SessionID)
            },
        })
    case *RemoveSession:
        ctx.Tell(replicator, &crdt.Update[*crdt.ORSet[string]]{
            Key:     sessionsKey,
            Initial: crdt.NewORSet[string](),
            Modify: func(current *crdt.ORSet[string]) *crdt.ORSet[string] {
                return current.Remove(msg.SessionID)
            },
        })
    case *PrintSessions:
        reply := ctx.Ask(replicator, &crdt.Get[*crdt.ORSet[string]]{
            Key: sessionsKey,
        }, time.Second)
        if getResp, ok := reply.(*crdt.GetResponse[*crdt.ORSet[string]]); ok && getResp.Data != nil {
            fmt.Printf("active sessions: %v\n", getResp.Data.Elements())
        }
    }
}

// -- SessionReporter: watches for changes --

type SessionReporter struct{}

func (a *SessionReporter) PreStart(*actor.Context) error  { return nil }
func (a *SessionReporter) PostStop(*actor.Context) error  { return nil }

func (a *SessionReporter) Receive(ctx *actor.ReceiveContext) {
    switch msg := ctx.Message().(type) {
    case *actor.PostStart:
        replicator := ctx.ActorSystem().Replicator()
        if replicator != nil {
            ctx.Tell(replicator, &crdt.Subscribe[*crdt.ORSet[string]]{
                Key: sessionsKey,
            })
        }
    case *crdt.Changed[crdt.ReplicatedData]:
        if set, ok := msg.Data.(*crdt.ORSet[string]); ok {
            log.Printf("[reporter] sessions changed: %v (count=%d)",
                set.Elements(), set.Len())
        }
    }
}

// -- Main --

func main() {
    ctx := context.Background()

    // Replace with your discovery provider (NATS, Kubernetes, etc.)
    // See: /clustering/service-discovery
    var discoveryProvider discovery.Provider

    clusterConfig := actor.NewClusterConfig().
        WithDiscovery(discoveryProvider).
        WithKinds(&SessionTracker{}, &SessionReporter{}).
        WithCRDT(
            crdt.WithAntiEntropyInterval(30 * time.Second),
            crdt.WithPruneInterval(5 * time.Minute),
        )

    system, _ := actor.NewActorSystem("session-app",
        actor.WithCluster(clusterConfig),
    )
    _ = system.Start(ctx)
    defer system.Stop(ctx)

    // Spawn actors
    tracker, _ := system.Spawn(ctx, "tracker", &SessionTracker{})
    _, _ = system.Spawn(ctx, "reporter", &SessionReporter{})

    // Simulate usage
    _ = actor.Tell(ctx, tracker, &AddSession{SessionID: "sess-abc"})
    _ = actor.Tell(ctx, tracker, &AddSession{SessionID: "sess-def"})
    time.Sleep(time.Second)
    _ = actor.Tell(ctx, tracker, &PrintSessions{})

    // On other nodes, the SessionReporter will receive Changed notifications
    // as deltas replicate via TopicActor.
}

Limitations

  • Memory-bound β€” All CRDT state is held in-memory. Memory usage grows linearly with the number of keys and the size of each value. ORSets and ORMaps carry metadata (causal dots) that grows with unique add operations; periodic compaction prunes redundant dots.
  • Single-topic dissemination β€” All deltas for all keys flow through one TopicActor topic (goakt.crdt.deltas). This simplifies the protocol but means a high update rate on many keys produces proportional TopicActor throughput. For most workloads this is not a bottleneck.
  • No cross-datacenter replication β€” CRDTs replicate within a single cluster. Cross-datacenter delta exchange via the DataCenter control plane is planned for a future release.
  • Tombstone window β€” Deleted keys are protected by a tombstone for the configured TTL. If a node is partitioned for longer than the TTL and still holds the key, it may resurrect the key when it rejoins. Set WithTombstoneTTL appropriately for your partition tolerance requirements.
  • Generic type constraints β€” ORSet[T] requires T to be comparable. LWWRegister[T], MVRegister[T], and ORMap[K, V] require values to be serializable by the codec layer. The built-in codec currently supports GCounter, PNCounter, Flag, LWWRegister[string], MVRegister[string], ORSet[string], and ORMap[string, *GCounter].
  • No partial updates β€” The Modify function in Update[T] receives the full current value and must return the full updated value. There is no patch or field-level delta mechanism.