Skip to main content
GoAkt Streams is a reactive, demand-driven data-processing library built directly on top of the GoAkt actor model. Every pipeline stage runs inside a GoAkt actor, which means it inherits supervision, lifecycle management, and location transparency for free. Pipelines are lazy: nothing executes until RunnableGraph.Run is called against a live ActorSystem.

Why

Raw actor messaging requires you to handle buffer overflow, message loss, and backpressure by hand: every actor that consumes faster or slower than its peers becomes a coordination problem you have to solve case-by-case. Streams handle all of this automatically, giving you a safe and composable programming model on top of actors so you can focus on what flows through the pipeline rather than how to keep it from breaking under load.

Overview

ConceptWhat it means in GoAkt Streams
CorrectnessExactly-once delivery semantics within a local pipeline
BackpressureCredit-based demand pull prevents unbounded buffering
ComposabilityStages are plain Go values; pipelines are assembled declaratively
Actor-nativeEvery stage is an actor; supervision, scheduling, and routing apply naturally
Type-safetySource[T], Flow[In, Out], Sink[T] carry type parameters through the pipeline

Core abstractions

TypeRole
Source[T]Origin of stream elements; produces values of type T
Flow[In, Out]Transformation stage; consumes In, emits Out
Sink[T]Terminal consumer; processes elements and drives backpressure
SubFlow[K, T]”Stream of streams” — partition by key, transform per-substream, merge back
RunnableGraphFully assembled pipeline; a value type that can be Run multiple times
StreamHandleLive handle returned by Run; controls lifecycle and exposes metrics
A minimal pipeline looks like this:
import "github.com/tochemey/goakt/v4/stream"

collector, sink := stream.Collect[int]()

handle, err := stream.From(stream.Of(1, 2, 3, 4, 5)).
    Via(stream.Filter[int](func(n int) bool { return n%2 == 0 })).
    Via(stream.Map(func(n int) int { return n * 10 })).
    To(sink).
    Run(ctx, sys)
if err != nil {
    return err
}

<-handle.Done()
fmt.Println(collector.Items()) // [20 40]

Quick start

Linear pipeline

Use the fluent From / Via / To builder for straight-line pipelines:
foldResult, sink := stream.Fold(0, func(acc, n int) int { return acc + n })

handle, err := stream.From(stream.Range(1, 101)).  // 1..100
    Via(stream.Filter[int64](func(n int64) bool { return n%2 == 0 })).
    Via(stream.Map(func(n int64) int { return int(n) })).
    To(sink).
    Run(ctx, sys)
if err != nil {
    return err
}

<-handle.Done()
fmt.Println(foldResult.Value()) // 2550

Type-changing flows

When a Flow changes the element type, use the package-level Via free function or ViaLinear:
// Via free function
strSrc := stream.Via(
    stream.From(stream.Of("1", "2", "3")).Source(),
    stream.Map(func(s string) int { n, _ := strconv.Atoi(s); return n }),
)

// ViaLinear on a LinearGraph
g := stream.ViaLinear(
    stream.From(stream.Of("1", "2", "3")),
    stream.Map(func(s string) int { n, _ := strconv.Atoi(s); return n }),
)

Sources

Sources are the origin of stream data. All sources apply backpressure — they produce only as many elements as downstream demands.

Finite sources

// Fixed values
stream.Of(1, 2, 3)

// Integer range [start, end)
stream.Range(0, 1000)

// Seed + step function (Fibonacci)
stream.Unfold([]int{0, 1}, func(s []int) ([]int, int, bool) {
    return []int{s[1], s[0] + s[1]}, s[0], true
})

Channel source

Bridges an external Go channel into the pipeline. Backpressure naturally limits how fast the goroutine sends:
ch := make(chan string, 100)
go feedChannel(ch) // your producer

handle, err := stream.From(stream.FromChannel(ch)).
    To(stream.ForEach(func(s string) { log.Println(s) })).
    Run(ctx, sys)

Actor source

Pulls elements from a GoAkt actor using the PullRequest / PullResponse[T] protocol. Useful for integrating existing actor-based producers:
// Your actor must handle *stream.PullRequest and reply with *stream.PullResponse[T].
handle, err := stream.From(stream.FromActor[Order](orderActorPID)).
    To(stream.ForEach(func(o Order) { process(o) })).
    Run(ctx, sys)

Tick source

Emits the current time on a fixed interval. Runs indefinitely until Stop or Abort is called:
handle, err := stream.From(stream.Tick(time.Second)).
    Via(stream.Map(func(t time.Time) string { return t.Format(time.RFC3339) })).
    To(stream.ForEach(func(s string) { log.Println(s) })).
    Run(ctx, sys)

time.Sleep(10 * time.Second)
handle.Stop(ctx)

Network source

Reads []byte frames from a net.Conn. Each Read call produces one element; demand controls how many reads are batched per upstream request:
conn, _ := net.Dial("tcp", "host:port")
handle, err := stream.From(stream.FromConn(conn, 4096)).
    Via(stream.Map(func(b []byte) string { return string(b) })).
    To(stream.ForEach(func(s string) { log.Println(s) })).
    Run(ctx, sys)

Reference

ConstructorDescription
Of[T](values...)Finite source from a fixed set of values
Range(start, end)Integer range [start, end)
Unfold[S, T](seed, step)Generates values from a seed with a stateful step function
FromChannel[T](ch)Reads from a Go channel; completes when the channel closes
FromActor[T](pid)Pulls from a GoAkt actor via PullRequest / PullResponse[T]
Tick(interval)Emits time.Time on a fixed interval; runs until cancelled
Merge[T](sources...)Fans N sources into one; completes when all inputs complete
Concat[T](sources...)Consumes sub-sources sequentially; completes when the last completes
Combine[T, U, V](left, right, fn)Zips two sources pairwise via fn (mixed input types)
Zip[T](sources...)Zips N same-typed sources into []T tuples
ZipWith[T, V](fn, sources...)N-input zip via user combine fn
MergePreferred[T](idx, sources...)Like Merge but drains slot idx first when it has data
MergePrioritized[T](weights, srcs)Weighted random slot selection over slots that have data
MergeLatest[T](sources...)Emits []T snapshots of the latest value on every input
MergeSequence[T](extractSeq, srcs)Emits in ascending seq order across N inputs
Broadcast[T](src, n)Fans one source out to N independent branches
Balance[T](src, n)Distributes one source across N branches with round-robin backpressure
Partition[T](src, n, fn)Routes each element to one of N branches via predicate fn
Unzip[T, A, B](src, fn)Splits a source into two sources of different element types
FromConn(conn, bufSize)Reads []byte frames from a net.Conn

Flows

Flows are lazy transformation stages. Each flow operator returns a new Flow value; multiple flows can be chained without materializing anything.

Mapping and filtering

// Map: type-changing, no error path
stream.Map(func(n int) string { return strconv.Itoa(n) })

// TryMap: transformation that may return an error
stream.TryMap(func(s string) (int, error) { return strconv.Atoi(s) })

// Filter: keeps elements where predicate returns true
stream.Filter(func(n int) bool { return n > 0 })

// FlatMap: expands each element into a slice
stream.FlatMap(func(s string) []string { return strings.Split(s, ",") })

// Flatten: unwraps []T elements into individual elements
stream.Flatten[string]()

Streaming flat-map

When the per-element expansion is itself a stream (e.g. paginated API calls, per-row database queries), use the streaming flat-map operators instead of FlatMap. Each input element produces a Source[Out] whose elements are flattened into the main pipeline:
// FlatMapConcat: drain each sub-source fully before starting the next.
// Order is preserved end-to-end.
stream.FlatMapConcat(func(userID int) stream.Source[Order] {
    return fetchUserOrders(userID) // Source[Order]
})

// FlatMapMerge: up to N sub-sources active concurrently; outputs interleaved.
// Order across sub-sources is non-deterministic; order within each preserved.
stream.FlatMapMerge(8, func(userID int) stream.Source[Order] {
    return fetchUserOrders(userID)
})
Backpressure is bounded by the breadth: the FlatMap stage requests at most breadth elements from upstream, and the next batch is requested only as sub-sources complete — so a slow inner source naturally throttles the outer pipeline. Sub-pipeline handles are tracked and aborted on cancellation or failure to avoid leaking goroutines.

Windowing and rate control

// Batch: groups into slices of at most n, flushing early after maxWait
stream.Batch[Event](100, 500*time.Millisecond)

// Throttle: limits throughput to n elements per duration
stream.Throttle[Request](100, time.Second)

// Buffer: asynchronous decoupling buffer with overflow strategy
stream.Buffer[int](256, stream.DropTail)

Stateful flows

// Deduplicate: suppresses consecutive duplicates (T must be comparable)
stream.Deduplicate[string]()

// Scan: running accumulation — emits each intermediate state
stream.Scan(0, func(acc, n int) int { return acc + n })

Parallel processing

ParallelMap applies a function concurrently with up to n goroutines. Use OrderedParallelMap when output order must match input order:
// Unordered — results arrive as goroutines complete
stream.ParallelMap(8, func(req Request) Response { return callAPI(req) })

// Ordered — min-heap resequences results to preserve input order
stream.OrderedParallelMap(8, func(req Request) Response { return callAPI(req) })

Reference

ConstructorDescription
Map[In, Out](fn)Type-changing transformation; no error path
TryMap[In, Out](fn)Transformation with error; ErrorStrategy controls failure handling
Filter[T](predicate)Keeps only elements where predicate returns true
FlatMap[In, Out](fn)Expands each element into a slice of outputs
FlatMapConcat[In, Out](fn)Per-element streaming flat-map; sub-sources drained sequentially
FlatMapMerge[In, Out](breadth, fn)Per-element streaming flat-map; up to breadth sub-sources concurrent
Flatten[T]()Unwraps []T elements into individual elements
Batch[T](n, maxWait)Groups into []T slices of at most n; flushes early after maxWait
Buffer[T](size, strategy)Asynchronous buffer with configurable overflow strategy
Throttle[T](n, per)Limits throughput to at most n elements per per duration
Deduplicate[T]()Suppresses consecutive duplicate elements (T must be comparable)
Scan[In, State](zero, fn)Running accumulation; emits each intermediate state
WithContext[T](key, value)Marks a named tracing boundary; passes elements through unchanged
ParallelMap[In, Out](n, fn)Applies fn concurrently with up to n goroutines; unordered output
OrderedParallelMap[In, Out](n, fn)Like ParallelMap but preserves input order via min-heap resequencing

Sinks

Sinks are the terminal consumers of a pipeline. They drive backpressure by signalling demand upstream; the pipeline does not produce faster than the sink can consume.

Common sinks

// ForEach: side-effect per element
stream.ForEach(func(n int) { log.Println(n) })

// Collect: accumulates all elements; read results after completion
collector, sink := stream.Collect[int]()
// ... run pipeline ...
<-handle.Done()
items := collector.Items()

// Fold: reduces to a single accumulated value
result, sink := stream.Fold(0, func(acc, n int) int { return acc + n })
// ... run pipeline ...
<-handle.Done()
total := result.Value()

// First: captures only the first element then cancels upstream
first, sink := stream.First[int]()
// ... run pipeline ...
<-handle.Done()
val := first.Value()

// Ignore: discards all elements (useful when side effects are in upstream flows)
stream.Ignore[int]()

Actor and channel sinks

// ToActor: forwards each element to a GoAkt actor via Tell
stream.ToActor[Event](handlerPID)

// ToActorNamed: resolves actor by name on each element and forwards
stream.ToActorNamed[Event](sys, "handler")

// Chan: writes to a Go channel; blocks when full (natural backpressure)
ch := make(chan int, 64)
stream.Chan(ch)

Reference

ConstructorDescription
ForEach[T](fn)Calls fn for each element
Collect[T]()Accumulates all elements; retrieve via Collector[T].Items()
Fold[T, U](zero, fn)Reduces to a single value; retrieve via FoldResult[U].Value()
First[T]()Captures the first element then cancels upstream
Ignore[T]()Discards all elements
Chan[T](ch)Writes each element to a Go channel; applies backpressure when full
ToActor[T](pid)Forwards each element to a GoAkt actor via Tell
ToActorNamed[T](system, name)Resolves actor by name and forwards via Tell

Fan-out and fan-in

Broadcast

Broadcast fans a single source out to N independent branches. Every branch receives every element. Backpressure is enforced by the slowest branch: the hub pulls from upstream only when all active branches have outstanding demand.
branches := stream.Broadcast[Event](stream.FromChannel(eventCh), 2)

// Branch 1: write to database
handle1, _ := stream.From(branches[0]).
    To(stream.ForEach(func(e Event) { db.Insert(e) })).
    Run(ctx, sys)

// Branch 2: publish to Kafka
handle2, _ := stream.From(branches[1]).
    To(stream.ForEach(func(e Event) { kafka.Publish(e) })).
    Run(ctx, sys)
Both pipelines must be materialized. The upstream sub-pipeline starts once the last branch materializes.

Balance

Balance distributes elements across N branches using round-robin routing with backpressure. Each element goes to exactly one branch — the next one with available demand. Use this to parallelise work across independent consumers:
workers := stream.Balance[Job](stream.FromChannel(jobCh), 4)

for i, branch := range workers {
    i := i
    stream.From(branch).
        To(stream.ForEach(func(j Job) { process(i, j) })).
        Run(ctx, sys)
}

Partition

Partition routes each element to exactly one of N branches based on a user predicate. Out-of-range or already-cancelled slot results are dropped silently. Backpressure is conservative: the hub pulls from upstream only when every active branch has outstanding demand, so the destination slot for any incoming element is guaranteed capacity.
branches := stream.Partition(stream.FromChannel(eventCh), 3, func(e Event) int {
    switch e.Kind {
    case "audit":   return 0
    case "metric":  return 1
    default:        return 2
    }
})

for i, b := range branches {
    stream.From(b).To(stream.ForEach(handlers[i])).Run(ctx, sys)
}

Unzip

Unzip splits a single source into two sources of potentially different element types. It is composed of Broadcast(src, 2) plus a Map per branch and follows the same backpressure rules as Broadcast.
type kv struct { K string; V int }

keys, values := stream.Unzip(
    stream.FromChannel(kvCh),
    func(p kv) (string, int) { return p.K, p.V },
)

Merge

Merge fans N independent sources into a single downstream. Elements arrive in non-deterministic order; completion happens when all inputs have completed:
merged := stream.Merge(
    stream.FromChannel(highPriorityCh),
    stream.FromChannel(lowPriorityCh),
)

stream.From(merged).
    To(stream.ForEach(func(e Event) { handle(e) })).
    Run(ctx, sys)

Concat

Concat consumes sub-sources sequentially: the next sub-pipeline only spawns after the current one completes, bounding in-flight cost at one sub-pipeline regardless of source count. Per-source ordering is preserved across the boundary.
all := stream.Concat(
    stream.Of(1, 2, 3),
    stream.Of(4, 5),
    stream.Of(6, 7, 8, 9),
)
// emits: 1, 2, 3, 4, 5, 6, 7, 8, 9

Combine (zip, two-input mixed-type)

Combine pairs elements from two sources with a combine function. It completes when either source is exhausted. Use it when the two inputs have different element types:
prices := stream.FromChannel(priceCh)    // Source[Price]
orders := stream.FromChannel(orderCh)    // Source[Order]

quotes := stream.Combine(prices, orders, func(p Price, o Order) Quote {
    return Quote{Price: p, Order: o}
})

Zip / ZipWith (N-input same-type)

Zip combines N same-typed sources into a stream of []T tuples; ZipWith does the same with a user combine function. Both complete as soon as any input is exhausted.
// []int snapshots, one per matched group
trios := stream.Zip(
    stream.Of(1, 2, 3),
    stream.Of(10, 20, 30),
    stream.Of(100, 200, 300),
)

// User combine: sum across slots
sums := stream.ZipWith(
    func(parts []int) int { s := 0; for _, v := range parts { s += v }; return s },
    stream.Of(1, 2, 3),
    stream.Of(10, 20, 30),
    stream.Of(100, 200, 300),
)

MergePreferred

Like Merge, but always drains the priority slot first when it has data, falling back to the lowest-indexed non-empty slot otherwise. Useful when one input represents a higher-priority feed (e.g. a control channel) that should be drained ahead of background traffic. Panics on out-of-range index.
src := stream.MergePreferred(
    0, // slot 0 = control
    stream.FromChannel(controlCh),
    stream.FromChannel(workCh),
)

MergePrioritized

Picks the next slot by weighted random over slots that currently have data. Slots with weight 0 are only selected when they are the only ones with data. Panics on weights / sources length mismatch.
src := stream.MergePrioritized(
    []int{3, 1}, // src1 wins ~3:1
    src1,
    src2,
)

MergeLatest

Emits []T snapshots of the latest value seen on every input. Each upstream emission queues one snapshot, but the first only fires once every input has emitted at least once. Completes when every input completes.
snapshots := stream.MergeLatest(
    stream.FromChannel(temperatureCh),
    stream.FromChannel(humidityCh),
)

stream.From(snapshots).
    To(stream.ForEach(func(s []float64) {
        log.Printf("temp=%.1f humidity=%.1f", s[0], s[1])
    })).
    Run(ctx, sys)

MergeSequence

Emits in strict ascending sequence-number order across N inputs by buffering out-of-order elements in a min-heap. Inputs must collectively produce a contiguous range of sequence numbers starting from 0. If all inputs complete with the next-expected sequence still missing, the stream fails with a missing-sequence error.
type item struct { Seq int64; Val string }

ordered := stream.MergeSequence(
    func(i item) int64 { return i.Seq },
    evenSeqSrc, // emits seqs 0, 2, 4, ...
    oddSeqSrc,  // emits seqs 1, 3, 5, ...
)

Substreams (SubFlow)

Three constructors produce a SubFlow, each with different routing semantics: GroupBy partitions by a key function (one substream per distinct key, materialised lazily), SplitWhen starts a new substream when a predicate is true (the triggering element becomes the FIRST of the new substream), and SplitAfter ends the current substream after a predicate-true element (the triggering element becomes the LAST of the closing substream). Per-substream state — Scan accumulators, Deduplicate’s last-seen value, anything stateful in the per-substream chain — is genuinely independent because each substream is a real GoAkt actor pipeline materialised lazily on its first element. MergeSubstreams collapses the partitioned stream back into a flat Source[T] whose elements interleave non-deterministically across substreams while preserving order within a single substream.
// Partition events by tenantID, count events per tenant via a per-substream
// Scan, then merge all per-tenant counters into a single flat stream.
src := stream.FromChannel(eventCh) // Source[Event]

byTenant := stream.GroupBy(src, 1024, func(e Event) string { return e.TenantID })
counted  := stream.SubFlowVia(byTenant, stream.Scan(0, func(acc int, _ Event) int { return acc + 1 }))
merged   := stream.MergeSubstreams(counted)

stream.From(merged).
    To(stream.ForEach(func(n int) { log.Println(n) })).
    Run(ctx, sys)
The key function is captured at GroupBy time and applied to upstream elements regardless of how many SubFlowVia calls follow — the per-substream output type can change freely without affecting routing.

Splitting on a delimiter

SplitWhen and SplitAfter take a predicate instead of a key function and produce a SubFlow[int, T] whose synthetic substream IDs increment on each rotation. They differ in where the triggering element lands:
// SplitWhen: predicate-true element starts a NEW substream.
//   input:        1, 2, 0, 3, 4, 0, 5
//   substreams:  [1, 2] [0, 3, 4] [0, 5]
src := stream.FromChannel(eventCh) // Source[int]
sf  := stream.SplitWhen(src, func(n int) bool { return n == 0 })

// SplitAfter: predicate-true element ENDS the current substream.
//   input:        1, 2, 0, 3, 4, 0, 5
//   substreams:  [1, 2, 0] [3, 4, 0] [5]
sf  := stream.SplitAfter(src, func(n int) bool { return n == 0 })
These are the natural primitives for parsing delimited streams (newline-terminated frames, control sentinels, batch markers) where each delimiter starts or ends a logical group rather than tagging a key. Substreams are sequential — at most one is actively receiving new elements at a time — so the maxSubstreams cap is generally unnecessary and 0 (unbounded) is fine.

Cardinality cap

The maxSubstreams parameter caps the number of concurrently-open substreams; pass 0 for unbounded. Exceeding the cap terminates the stream with ErrTooManySubstreams, so unbounded key cardinality is a visible failure rather than a silent memory leak:
sf := stream.GroupBy(src, 100, keyFn) // up to 100 distinct keys

Per-substream backpressure

Each substream has an independent in-flight cap (default 256 elements). When a substream is at capacity, the configured OverflowStrategy decides what happens to new elements with that key. Substream feed sources acknowledge consumption to the splitter in batches (matching the flowActor watermark refill at perKeyBuffer/4), so per-key memory is bounded without per-element protocol overhead.
sf := stream.GroupBy(src, 0, keyFn).
    WithSubstreamBuffer(64, stream.FailSource)
StrategyBehaviour
DropTail (default)Discard the newest element when the substream is at capacity
DropHead / BackpressureSourceCurrently collapse to drop-newest — DropHead would break per-key ordering and BackpressureSource would deadlock the splitter on inbound
FailSourceTerminate the stream with ErrSubstreamOverflow
Drops are reported through the existing OnDrop hook and the droppedElements metric. For tighter or more elaborate bounds (e.g. time-based windowing per key), compose a Buffer or Throttle flow inside the per-substream chain.

Per-substream error strategy

When a per-key sub-pipeline fails, SubstreamErrorStrategy controls how the splitter reacts. The default mirrors the FailFast contract of linear flows; the alternatives let one substream’s failure stay isolated.
StrategyBehaviour
SubstreamFailAll (default)Surface the error verbatim via StreamHandle.Err() and terminate the entire stream
SubstreamDropBlocklist the failing key and complete that substream silently; sibling substreams finish normally; further elements with the blocklisted key are dropped
SubstreamRestartDiscard the failed pipeline; the next element with that key spawns a fresh substream from scratch
sf := stream.GroupBy(src, 0, keyFn).
    WithErrorStrategy(stream.SubstreamDrop)

Reference

Constructor / methodDescription
GroupBy[T, K](src, maxSubstreams, keyFn)Partition src by keyFn; returns SubFlow[K, T]
SplitWhen[T](src, predicate)New substream begins when predicate is true; element is FIRST of new substream
SplitAfter[T](src, predicate)Current substream ends after predicate is true; element is LAST of that substream
SubFlowVia[K, In, Out](sf, flow)Append a transformation to each substream’s pipeline
MergeSubstreams[K, T](sf)Collapse substreams back into a flat Source[T] (non-deterministic interleave)
SubFlow.WithSubstreamBuffer(perKeyBuffer, strategy)Per-substream in-flight cap and overflow strategy
SubFlow.WithErrorStrategy(strategy)Per-substream failure handling policy

Pipeline DSL (Graph builder)

For non-linear topologies (fan-out branches that later merge, or multiple independent pipelines sharing a source) use the Graph DSL. Nodes are identified by string names and connected by referencing upstream names:
// The Graph DSL is type-erased: nodes operate on Source[any], Flow[any, any],
// and Sink[any]. Use these typed constructors when assembling.
g := stream.NewGraph().
    AddSource("in",     stream.FromChannel[any](eventCh)).
    AddFlow("validate", stream.Filter[any](isValid),         "in").
    AddFlow("enrich",   stream.Map[any, any](enrich),        "validate").
    AddSink("db",       stream.ForEach[any](db.Insert),      "enrich").
    AddSink("metrics",  stream.ForEach[any](metrics.Record), "enrich")

rg, err := g.Build()
if err != nil { return err }

handle, err := rg.Run(ctx, sys)
Two fan-in node kinds are available alongside AddFlow and AddSink:
  • MergeInto(name, from...) interleaves elements from multiple upstream nodes (non-deterministic order).
  • ConcatInto(name, from...) consumes upstreams sequentially, preserving per-source ordering across the boundary.
g := stream.NewGraph().
    AddSource("a",   srcA).
    AddSource("b",   srcB).
    ConcatInto("ab", "a", "b").     // a then b, in order
    AddSink("out",   sink, "ab")

Cross-node stream refs

SourceRef[T] and SinkRef[T] are wire-portable handles to a stream endpoint actor. Pass one inside any registered remote message and a different node materialises it back into an ordinary Source[T] or Sink[T] — the same fluent builder works whether the producer and consumer share an actor system or live on opposite ends of a cluster. The model mirrors Akka StreamRefs. Refs encode the producer node’s address (host, port, actor name), so cross-node resolution is a direct RemoteLookup against the producer’s remote server. There is no reliance on cluster-registry replication — the consumer can resolve and subscribe the moment the producer node is reachable, regardless of how loaded the cluster’s actor registry is.

Setup

Cross-node refs only work when both sides of the connection have the wire control protocol and the element type registered with their remoting layer. Without this registration the consumer’s bridge can’t subscribe (control messages won’t deserialize) or the producer’s elements can’t be serialised — symptoms include hanging Run calls, stream-level "no serializer found for message type ..." errors, and elements silently routed to dead-letter. Required on every node that participates:
remoteCfg := remote.NewConfig("0.0.0.0", 9000,
    stream.RemoteOptions(),                  // (1) control-plane wire types
    remote.WithSerializables(new(MyEvent)),  // (2) your element type T
)

sys, err := actor.NewActorSystem("my-app",
    actor.WithRemote(remoteCfg),             // remoting must be enabled
    actor.WithCluster(clusterCfg),           // for cluster-based discovery
)
  1. stream.RemoteOptions() registers the small subscribe / request / complete / error / cancel control-plane wire types. Append it to your existing remote.NewConfig options on every node — producer side, consumer side, and any node that might hold or forward a ref. Forgetting it on either side is the most common cause of hangs: the bridge sends streamSubscribeWire and the endpoint never sees a recognisable message because its remoting layer can’t decode the type.
  2. Element types must be registered with remote.WithSerializables(new(MyEvent)) — same as for any other rctx.Tell to a remote PID. Refs do not wrap elements in an envelope, so missing element-type registration on the producer side fails the send, and missing it on the consumer side fails the receive.
Things that will silently break the feature otherwise:
  • Asymmetric registration. The producer registers MyEvent but the consumer does not (or vice versa). The end with the missing registration logs a "no serializer found" warning when the message arrives; for the consumer that means elements never reach the local sink. Always register the same element types on both ends.
  • Registering the value type as a non-pointer. remote.WithSerializables keys on the exact type passed in — typically *MyEvent from new(MyEvent). Sending a value-type element over the wire still works because the bridge’s wireForm helper auto-wraps non-pointer values into a *T registered key, but if you pass a struct value (instead of new(...) or an interface pointer) into WithSerializables itself, the registry stores the wrong type and lookups miss. Stick to the new(MyEvent) form.
  • Skipping stream.RemoteOptions() on a forwarding-only node. Any node that may receive a SourceRef[T] / SinkRef[T] inside another message — even just to inspect or relay it — needs stream.RemoteOptions() registered, because the ref struct itself is serialised through the same registry. A node missing it will fail to deserialize incoming refs.
  • Disabling remoting entirely. Cross-node refs require actor.WithRemote(...) on both ends. Without it ref.Source(sys) / ref.Sink(sys) materialise a bridge that can’t reach the remote endpoint and surfaces an "actor not found" resolution error after the retry budget expires.
  • Producer node unreachable at resolve time. Refs carry the producer node’s host:port, so resolution is a direct RemoteLookup against the producer’s remote server — there is no dependency on async cluster registry propagation. If the producer node is briefly unreachable when the consumer resolves (network blip, just-restarting node), the bridge retries with jittered exponential backoff for up to 10s. After that it surfaces a "resolve source ref" / "resolve sink ref" error rather than hanging — handle it in the receiving side just like any remote-Tell failure.
Element values cross the wire as ordinary remote messages — no extra envelope, no double encoding. Whatever is registered with remote.WithSerializables round-trips automatically, just as it would for any other rctx.Tell to a remote PID.

SourceRef — publish a producer

A SourceRef[T] exposes a local Source[T] to a remote consumer. The ref is a plain serialisable value — wrap it in any user-defined message type you like (registered with the same remoting layer) and ship it however your application already moves data between nodes.
// User-defined wrapper — anything carrying the ref alongside whatever
// other context the consumer needs.
type OffersFeed struct {
    Ref stream.SourceRef[int]
}

// Both nodes register the wrapper plus the int element type:
//   stream.RemoteOptions(),
//   remote.WithSerializables(new(OffersFeed), new(int)),

// Producer node (sysA)
ref, err := stream.Of(1, 2, 3, 4, 5).SourceRef(ctx, sysA)
if err != nil { return err }

// Hand the ref off through whatever channel makes sense for your app —
// a remote Tell, a cluster pub/sub topic, an actor registry, an HTTP
// response body decoded back into an OffersFeed, etc.
publisher.Tell(ctx, downstreamPID, &OffersFeed{Ref: ref})

// Consumer node (sysB), inside the actor that received OffersFeed:
feed := msg.(*OffersFeed)
col, sink := stream.Collect[int]()
h, err := feed.Ref.Source(sysB).
    Via(stream.Map(func(n int) int { return n * 2 })).
    To(sink).
    Run(ctx, sysB)
The underlying source pipeline is materialised lazily — only when a subscriber connects via ref.Source(...).Run(...). Constructing the ref is cheap.

SinkRef — publish a consumer

The mirror image: a SinkRef[T] exposes a local Sink[T] so a remote node can pipe data into it.
// Consumer node (sysA)
col, sink := stream.Collect[int]()
ref, err := sink.SinkRef(ctx, sysA)
if err != nil { return err }

// Producer node (sysB)
h, err := stream.From(stream.Of(10, 20, 30, 40, 50)).
    To(ref.Sink(sysB)).
    Run(ctx, sysB)

Semantics

  • Single subscription per ref. A second ref.Source(...) / ref.Sink(...) materialisation observes a stream-level error ("already subscribed" while the first stream is active, "already consumed" after it finishes). Refs are one-shot, matching Akka StreamRefs.
  • Wire-level credit. The consumer’s downstream demand drives streamRequestWire to the producer; the producer ships exactly that many elements before pausing. A bounded pending queue (1024 elements) on the producer side converts a slow-consumer scenario into a visible "backpressure overflow" stream error rather than unbounded buffering on the producer node.
  • Bounded lifecycle. After the stream completes (success or error), the endpoint actor stays alive for a 30-second grace window so a racing late subscriber gets a clean rejection rather than telling a dead actor and hanging. After the grace window the endpoint reaps itself via system.ScheduleOnce. Refs that are never consumed are reaped at actor-system shutdown.
  • Death detection. Bridges Watch their remote endpoint as defense in depth — an unexpected endpoint death (panic, system shutdown mid-stream, expired grace on a stale ref) surfaces as a stream error on the bridge’s StreamHandle.Err(), never a hang. Combined with retry-with-jitter resolution that absorbs cluster propagation latency, this keeps cross-node graphs reliable under load.

Reference

Constructor / methodDescription
Source[T].SourceRef(ctx, sys)Publish a Source[T] as a wire-portable SourceRef[T]
(SourceRef[T]).Source(sys) Source[T]Adapt a ref back into a Source[T] for use in a local graph
Sink[T].SinkRef(ctx, sys)Publish a Sink[T] as a wire-portable SinkRef[T]
(SinkRef[T]).Sink(sys) Sink[T]Adapt a ref back into a Sink[T] for use in a local graph
RemoteOptions() remote.OptionRegisters the control-plane wire protocol with remote.NewConfig

Backpressure

GoAkt Streams uses credit-based demand propagation. The sink signals how many elements it can accept; each stage propagates that demand upstream. Sources produce only what is requested.
Sink ──[streamRequest n=224]──► Flow ──[streamRequest n=224]──► Source
     ◄─[streamElement × 224]──       ◄─[streamElement × 224]──
Key parameters (in StageConfig):
ParameterDefaultPurpose
InitialDemand224First batch of demand sent by a sink to its upstream
RefillThreshold64Credit level at which a refill is requested
BufferSize256Mailbox buffer (bounded mailbox capacity)
The sliding-window watermark means:
  • When credit > RefillThreshold, no refill is sent — the pipeline is healthy.
  • When credit ≤ RefillThreshold, the stage requests InitialDemand - credit more elements from upstream.
  • This keeps in-flight data bounded while keeping the pipeline saturated.

Error handling

Each stage can be configured with an independent ErrorStrategy:
StrategyBehaviour
FailFast (default)Propagate error downstream, shut down the pipeline
ResumeSkip the failed element; request a replacement from upstream
RetryRe-process the element up to RetryConfig.MaxAttempts times before failing
SuperviseDelegate to actor supervision (currently equivalent to FailFast)
stream.From(stream.FromChannel(ch)).
    Via(stream.TryMap(parse).
        WithErrorStrategy(stream.Retry).
        WithRetryConfig(stream.RetryConfig{MaxAttempts: 3})).
    To(sink).
    Run(ctx, sys)

Overflow strategies

Overflow strategies apply when a stage’s internal buffer is full:
StrategyBehaviour
DropTail (default)Discard the newest incoming element
DropHeadDiscard the oldest buffered element
DropBufferDiscard the entire buffer
BackpressureBlock the upstream (apply natural backpressure)
FailTreat overflow as a fatal error

Stage fusion

Adjacent stateless stages (Map, Filter) are automatically fused into a single actor, eliminating intermediate mailbox enqueues and reducing allocations.
// Without fusion (FuseNone): source → map-actor → filter-actor → sink-actor
// With fusion (FuseStateless, default): source → fused-actor → sink-actor

handle, err := graph.
    WithFusion(stream.FuseStateless). // default; explicit here for clarity
    Run(ctx, sys)
ModeBehaviour
FuseStateless (default)Fuses adjacent Map and Filter stages into one actor
FuseNoneNo fusion; each stage is a separate actor (useful for debugging)
FuseAggressiveFuses all fusable adjacent stages
Fusion is transparent: the external API and semantics are identical. Disable it with FuseNone when profiling individual stage throughput.

StreamHandle lifecycle

RunnableGraph.Run returns a StreamHandle for controlling the pipeline at runtime:
handle, err := graph.Run(ctx, sys)
if err != nil {
    return err
}

// Wait for natural completion
<-handle.Done()
if err := handle.Err(); err != nil {
    log.Printf("stream %s failed: %v", handle.ID(), err)
}

// Orderly shutdown: drains in-flight elements before stopping
if err := handle.Stop(ctx); err != nil {
    return err
}

// Immediate abort: discards all buffered elements
handle.Abort()
MethodPurpose
ID()Unique stream identifier
Done()Channel closed when the stream terminates (naturally or by error)
Err()Terminal error; valid only after Done() is closed
Stop(ctx)Orderly shutdown; drains in-flight elements, then stops all stages
Abort()Immediate termination; discards buffered elements
Metrics()Returns a StreamMetrics snapshot

Observability

Metrics

StreamHandle.Metrics() returns a live snapshot of element counts:
m := handle.Metrics()
log.Printf("in=%d out=%d errors=%d", m.ElementsIn(), m.ElementsOut(), m.Errors())

Tracer

Attach a Tracer to any Flow or Sink for per-element distributed tracing:
type myTracer struct{}

func (t *myTracer) OnElement(stageName string, seqNo uint64, latencyNs int64) {
    span.AddEvent(stageName, trace.WithAttributes(
        attribute.Int64("seqNo", int64(seqNo)),
        attribute.Int64("latencyNs", latencyNs),
    ))
}
func (t *myTracer) OnDemand(stageName string, n int64)    {}
func (t *myTracer) OnError(stageName string, err error)   { span.RecordError(err) }
func (t *myTracer) OnComplete(stageName string)            { span.End() }

stream.From(src).
    Via(stream.Map(transform).WithTracer(&myTracer{})).
    To(sink).
    Run(ctx, sys)

Configuration reference

StageConfig carries per-stage configuration. Apply it via the builder methods on Flow, Sink, or Source:
stream.Map(fn).
    WithErrorStrategy(stream.Retry).
    WithRetryConfig(stream.RetryConfig{MaxAttempts: 3}).
    WithName("parse-stage").
    WithTracer(tracer).
    WithMailbox(actor.NewBoundedMailbox(512))
FieldTypeDefaultPurpose
InitialDemandint64224First demand batch from sink to upstream
RefillThresholdint6464Credit level that triggers a refill request
ErrorStrategyErrorStrategyFailFastElement-level failure handling
RetryConfig.MaxAttemptsint1Retry attempts when ErrorStrategy is Retry
OverflowStrategyOverflowStrategyDropTailBehaviour when internal buffer is full
PullTimeouttime.Duration5sTimeout for FromActor pull requests
BufferSizeint256Bounded mailbox capacity (BufferSize × 2 slots)
Mailboxactor.MailboxnilOverride the default mailbox implementation
NamestringautoCustom actor name for this stage
Tagsmap[string]stringnilKey/value labels propagated to metrics and traces
TracerTracernilOptional distributed tracing hook
FusionbooltrueWhether this stage may participate in stage fusion
MicroBatchint1Accumulate this many elements before forwarding (flow stages only)
OnDropfunc(any, string)nilCalled when an element is dropped due to overflow or exhausted retries

Comparison with other approaches

Go channels and goroutines

Go channels are the native concurrency primitive. They are low-level, composable, and require no dependencies — but they demand that the developer hand-craft every concern that GoAkt Streams handles automatically.
ConcernChannels + goroutinesGoAkt Streams
BackpressureManual (blocking sends or select/default)Automatic credit-based demand propagation
Error handlingExplicit error channels and coordination logicErrorStrategy per stage (FailFast, Resume, Retry)
Fan-outManual goroutine fan; complex synchronisationBroadcast, Balance primitives
Fan-inselect or sync.WaitGroup coordinationMerge, Combine primitives
WindowingCustom timer goroutines and buffersBatch[T](n, maxWait) built-in
Rate limitingCustom token bucket / ticker goroutinesThrottle[T](n, per) built-in
Cancellationcontext.Context plumbing through every goroutinehandle.Stop(ctx) / handle.Abort()
LifecycleManual sync.WaitGroup + goroutine trackingStreamHandle.Done() + supervision tree
ObservabilityManual countersBuilt-in StreamMetrics and Tracer
When to prefer channels: simple point-to-point pipelines with one producer and one consumer, or when you need the absolute minimum overhead and are comfortable managing error propagation manually. When to prefer GoAkt Streams: multi-stage transformations, fan-out/fan-in topologies, variable-rate sources, anything requiring windowing, batching, or rate limiting, and whenever you want a managed lifecycle.

RxGo

RxGo is a Go implementation of Reactive Extensions (ReactiveX). It offers a rich operator library and a familiar API for developers coming from RxJS or RxJava.
AspectRxGoGoAkt Streams
BackpressureNot supported in v2; the observable push model can overflow without consumer-side controlPull-only, credit-based; source never outpaces downstream
Concurrency modelGoroutines with optional WithPool optionsActor-per-stage; supervised, lifecycle-managed
Type safetyinterface{} items in v2; generic operators not yet completeFull generics: Flow[In, Out], Source[T], Sink[T]
Actor integrationNoneFirst-class: FromActor, ToActor, actor supervision
Error modelError items in the stream; OnError handlerTyped ErrorStrategy (FailFast, Resume, Retry) per stage
LifecycleDispose patternStreamHandle with Stop / Abort / Done
Operator richnessVery rich (debounce, zip, skip, take, window…)Core operators; FlatMap, Batch, Throttle, ParallelMap
Fan-outConnectable observablesBroadcast and Balance with automatic backpressure
SupervisionNoneGoAkt actor supervision tree
StatusMaintenance modeActively maintained
Choose RxGo if you are building standalone Go applications, want the full ReactiveX operator vocabulary, and do not need backpressure or actor integration. Choose GoAkt Streams if you need guaranteed backpressure, are already in a GoAkt system, or require supervised stages with automatic restart.

Benthos / Redpanda Connect

Benthos (now Redpanda Connect) is a production-grade stream-processing engine focused on connecting data systems. It is configuration-driven and ships with hundreds of connectors.
AspectBenthos / Redpanda ConnectGoAkt Streams
Primary audiencePlatform / DevOps teams building data pipelinesGo application developers integrating stream logic into services
Configuration modelYAML configuration filesCode-first Go API
BackpressureAck-based; built into every connectorCredit-based demand propagation
Connectors200+ built-in (Kafka, S3, HTTP, databases…)User-defined via Source, Sink wrappers
Type safetyUntyped message blobsGeneric Source[T] / Flow[In,Out]
EmbeddingSupported via Go libraryNative — it is a Go library
Processing modelComponent DAG configured in YAMLComposable Go values assembled in code
TestingUnit tests via YAML fixturesStandard Go tests; testkit
Error handlingRetry / DLQ per connectorErrorStrategy + OnDrop callback per stage
Actor integrationNoneFirst-class GoAkt actor interop
Choose Benthos if you are building a standalone data pipeline product, need ready-made connectors to external systems, or want configuration-driven deployments without writing Go code. Choose GoAkt Streams if stream processing is one facet of a larger GoAkt application, you want to express pipelines in idiomatic Go code, or you need deep integration with the actor supervision model.

Apache Kafka Streams

Kafka Streams is a JVM client library for building stateful stream processors on top of Apache Kafka.
AspectKafka StreamsGoAkt Streams
LanguageJava / Kotlin (JVM)Go
Broker dependencyRequires Apache KafkaNo broker; in-process
DurabilityPersistent topics; replayableIn-memory within the process; ephemeral
State storesRocksDB-backed state stores; fault-tolerantStateful stages via actor state; no persistent store
ScalabilityHorizontal via Kafka partitionsVertical + actor location transparency
BackpressureImplicit via Kafka consumer offsetExplicit credit-based demand
Exactly-onceExactly-once semantics with transactionsExactly-once within a local pipeline
WindowingTumbling, hopping, sliding, session windowsBatch[T](n, maxWait) for simple windowing
Table joinsKTable / KStream joins with changelog topicsNot built-in; compose via actor state
Operator richnessFilter, map, flatMap, groupBy, aggregate, joinSee flow reference table above
Choose Kafka Streams when you need durable, replayable, horizontally scalable stream processing on the JVM with Kafka as the backbone. Choose GoAkt Streams for in-process stream processing embedded in a Go service, with no external broker dependency and tight integration with GoAkt actor supervision.

Akka Streams

Akka Streams (Scala / Java) is the direct inspiration for GoAkt Streams. Both implement the Reactive Streams specification.
AspectAkka StreamsGoAkt Streams
LanguageScala / Java (JVM)Go
BackpressureReactive Streams specificationCredit-based pull; equivalent semantics
Graph DSLRich typed graph builder (GraphDSL)Graph named-node builder
Stage fusionAutomatic operator fusionFuseStateless (default), FuseNone, FuseAggressive
MaterializerActorMaterializerGoAkt ActorSystem
Error handlingSupervisionStrategy per flowErrorStrategy per stage
Fan-outBroadcast, Balance, PartitionBroadcast, Balance
Fan-inMerge, MergePreferred, Zip, ZipWithMerge, Combine
SubstreamsgroupBy, splitWhen, splitAfter, mergeSubstreams, concatSubstreamsGroupBy, SplitWhen, SplitAfter, MergeSubstreams (with per-substream buffer and error strategy)
Cross-node refsSourceRef, SinkRefSourceRef, SinkRef (single-subscription, grace-period reaped, credit-paced)
Actor interopSource.actorRef, Sink.actorRefFromActor, ToActor, ToActorNamed
Type safetyFull (Scala generics)Full (Go generics)
Operator richnessVery richCore operators; sufficient for most workloads
RuntimeJVM; Akka actor systemGo runtime; GoAkt actor system
LicenceBSL 1.1 (Lightbend)MIT
GoAkt Streams deliberately mirrors Akka Streams’ core concepts — lazy Source, Flow, Sink, demand-driven backpressure, stage fusion — while adapting them to Go idioms and the GoAkt actor hierarchy. The PullRequest / PullResponse actor source protocol is analogous to Akka’s Source.actorRefWithBackpressure.
  • Observability — Metrics and OpenTelemetry integration
  • Event Streams — Internal system and cluster event bus
  • PubSub — Application-level topic-based pub/sub
  • Actor Scheduling — Timer and cron scheduling used by Tick and Batch
  • Supervision — Actor failure handling that underpins stream stage recovery