Documentation Index
Fetch the complete documentation index at: https://docs.goakt.dev/llms.txt
Use this file to discover all available pages before exploring further.
GoAkt Streams is a reactive, demand-driven data-processing library built directly on top of the GoAkt actor model.
Every pipeline stage runs inside a GoAkt actor, which means it inherits supervision, lifecycle management, and location
transparency for free. Pipelines are lazy: nothing executes until RunnableGraph.Run is called against a live
ActorSystem.
Overview
| Concept | What it means in GoAkt Streams |
|---|
| Correctness | Exactly-once delivery semantics within a local pipeline |
| Backpressure | Credit-based demand pull prevents unbounded buffering |
| Composability | Stages are plain Go values; pipelines are assembled declaratively |
| Actor-native | Every stage is an actor; supervision, scheduling, and routing apply naturally |
| Type-safety | Source[T], Flow[In, Out], Sink[T] carry type parameters through the pipeline |
Core abstractions
| Type | Role |
|---|
Source[T] | Origin of stream elements; produces values of type T |
Flow[In, Out] | Transformation stage; consumes In, emits Out |
Sink[T] | Terminal consumer; processes elements and drives backpressure |
RunnableGraph | Fully assembled pipeline; a value type that can be Run multiple times |
StreamHandle | Live handle returned by Run; controls lifecycle and exposes metrics |
A minimal pipeline looks like this:
import "github.com/tochemey/goakt/v4/stream"
collector, sink := stream.Collect[int]()
handle, err := stream.From(stream.Of(1, 2, 3, 4, 5)).
Via(stream.Filter[int](func(n int) bool { return n%2 == 0 })).
Via(stream.Map(func(n int) int { return n * 10 })).
To(sink).
Run(ctx, sys)
if err != nil {
return err
}
<-handle.Done()
fmt.Println(collector.Items()) // [20 40]
Quick start
Linear pipeline
Use the fluent From / Via / To builder for straight-line pipelines:
foldResult, sink := stream.Fold(0, func(acc, n int) int { return acc + n })
handle, err := stream.From(stream.Range(1, 101)). // 1..100
Via(stream.Filter[int64](func(n int64) bool { return n%2 == 0 })).
Via(stream.Map(func(n int64) int { return int(n) })).
To(sink).
Run(ctx, sys)
if err != nil {
return err
}
<-handle.Done()
fmt.Println(foldResult.Value()) // 2550
Type-changing flows
When a Flow changes the element type, use the package-level Via free function or ViaLinear:
// Via free function
strSrc := stream.Via(
stream.From(stream.Of("1", "2", "3")).Source(),
stream.Map(func(s string) int { n, _ := strconv.Atoi(s); return n }),
)
// ViaLinear on a LinearGraph
g := stream.ViaLinear(
stream.From(stream.Of("1", "2", "3")),
stream.Map(func(s string) int { n, _ := strconv.Atoi(s); return n }),
)
Sources
Sources are the origin of stream data. All sources apply backpressure β they produce only as many elements as downstream demands.
Finite sources
// Fixed values
stream.Of(1, 2, 3)
// Integer range [start, end)
stream.Range(0, 1000)
// Seed + step function (Fibonacci)
stream.Unfold([]int{0, 1}, func(s []int) ([]int, int, bool) {
return []int{s[1], s[0] + s[1]}, s[0], true
})
Channel source
Bridges an external Go channel into the pipeline. Backpressure naturally limits how fast the goroutine sends:
ch := make(chan string, 100)
go feedChannel(ch) // your producer
handle, err := stream.From(stream.FromChannel(ch)).
To(stream.ForEach(func(s string) { log.Println(s) })).
Run(ctx, sys)
Actor source
Pulls elements from a GoAkt actor using the PullRequest / PullResponse[T] protocol. Useful for integrating existing
actor-based producers:
// Your actor must handle *stream.PullRequest and reply with *stream.PullResponse[T].
handle, err := stream.From(stream.FromActor[Order](orderActorPID)).
To(stream.ForEach(func(o Order) { process(o) })).
Run(ctx, sys)
Tick source
Emits the current time on a fixed interval. Runs indefinitely until Stop or Abort is called:
handle, err := stream.From(stream.Tick(time.Second)).
Via(stream.Map(func(t time.Time) string { return t.Format(time.RFC3339) })).
To(stream.ForEach(func(s string) { log.Println(s) })).
Run(ctx, sys)
time.Sleep(10 * time.Second)
handle.Stop(ctx)
Network source
Reads []byte frames from a net.Conn. Each Read call produces one element; demand controls how many reads
are batched per upstream request:
conn, _ := net.Dial("tcp", "host:port")
handle, err := stream.From(stream.FromConn(conn, 4096)).
Via(stream.Map(func(b []byte) string { return string(b) })).
To(stream.ForEach(func(s string) { log.Println(s) })).
Run(ctx, sys)
Reference
| Constructor | Description |
|---|
Of[T](values...) | Finite source from a fixed set of values |
Range(start, end) | Integer range [start, end) |
Unfold[S, T](seed, step) | Generates values from a seed with a stateful step function |
FromChannel[T](ch) | Reads from a Go channel; completes when the channel closes |
FromActor[T](pid) | Pulls from a GoAkt actor via PullRequest / PullResponse[T] |
Tick(interval) | Emits time.Time on a fixed interval; runs until cancelled |
Merge[T](sources...) | Fans N sources into one; completes when all inputs complete |
Combine[T, U, V](left, right, fn) | Zips two sources pairwise via fn (zip semantics) |
Broadcast[T](src, n) | Fans one source out to N independent branches |
Balance[T](src, n) | Distributes one source across N branches with round-robin backpressure |
FromConn(conn, bufSize) | Reads []byte frames from a net.Conn |
Flows are lazy transformation stages. Each flow operator returns a new Flow value; multiple flows can be chained
without materializing anything.
Mapping and filtering
// Map: type-changing, no error path
stream.Map(func(n int) string { return strconv.Itoa(n) })
// TryMap: transformation that may return an error
stream.TryMap(func(s string) (int, error) { return strconv.Atoi(s) })
// Filter: keeps elements where predicate returns true
stream.Filter(func(n int) bool { return n > 0 })
// FlatMap: expands each element into a slice
stream.FlatMap(func(s string) []string { return strings.Split(s, ",") })
// Flatten: unwraps []T elements into individual elements
stream.Flatten[string]()
Windowing and rate control
// Batch: groups into slices of at most n, flushing early after maxWait
stream.Batch[Event](100, 500*time.Millisecond)
// Throttle: limits throughput to n elements per duration
stream.Throttle[Request](100, time.Second)
// Buffer: asynchronous decoupling buffer with overflow strategy
stream.Buffer[int](256, stream.DropTail)
Stateful flows
// Deduplicate: suppresses consecutive duplicates (T must be comparable)
stream.Deduplicate[string]()
// Scan: running accumulation β emits each intermediate state
stream.Scan(0, func(acc, n int) int { return acc + n })
Parallel processing
ParallelMap applies a function concurrently with up to n goroutines. Use OrderedParallelMap when output order
must match input order:
// Unordered β results arrive as goroutines complete
stream.ParallelMap(8, func(req Request) Response { return callAPI(req) })
// Ordered β min-heap resequences results to preserve input order
stream.OrderedParallelMap(8, func(req Request) Response { return callAPI(req) })
Reference
| Constructor | Description |
|---|
Map[In, Out](fn) | Type-changing transformation; no error path |
TryMap[In, Out](fn) | Transformation with error; ErrorStrategy controls failure handling |
Filter[T](predicate) | Keeps only elements where predicate returns true |
FlatMap[In, Out](fn) | Expands each element into a slice of outputs |
Flatten[T]() | Unwraps []T elements into individual elements |
Batch[T](n, maxWait) | Groups into []T slices of at most n; flushes early after maxWait |
Buffer[T](size, strategy) | Asynchronous buffer with configurable overflow strategy |
Throttle[T](n, per) | Limits throughput to at most n elements per per duration |
Deduplicate[T]() | Suppresses consecutive duplicate elements (T must be comparable) |
Scan[In, State](zero, fn) | Running accumulation; emits each intermediate state |
WithContext[T](key, value) | Marks a named tracing boundary; passes elements through unchanged |
ParallelMap[In, Out](n, fn) | Applies fn concurrently with up to n goroutines; unordered output |
OrderedParallelMap[In, Out](n, fn) | Like ParallelMap but preserves input order via min-heap resequencing |
Sinks are the terminal consumers of a pipeline. They drive backpressure by signalling demand upstream; the pipeline
does not produce faster than the sink can consume.
Common sinks
// ForEach: side-effect per element
stream.ForEach(func(n int) { log.Println(n) })
// Collect: accumulates all elements; read results after completion
collector, sink := stream.Collect[int]()
// ... run pipeline ...
<-handle.Done()
items := collector.Items()
// Fold: reduces to a single accumulated value
result, sink := stream.Fold(0, func(acc, n int) int { return acc + n })
// ... run pipeline ...
<-handle.Done()
total := result.Value()
// First: captures only the first element then cancels upstream
first, sink := stream.First[int]()
// ... run pipeline ...
<-handle.Done()
val := first.Value()
// Ignore: discards all elements (useful when side effects are in upstream flows)
stream.Ignore[int]()
Actor and channel sinks
// ToActor: forwards each element to a GoAkt actor via Tell
stream.ToActor[Event](handlerPID)
// ToActorNamed: resolves actor by name on each element and forwards
stream.ToActorNamed[Event](sys, "handler")
// Chan: writes to a Go channel; blocks when full (natural backpressure)
ch := make(chan int, 64)
stream.Chan(ch)
Reference
| Constructor | Description |
|---|
ForEach[T](fn) | Calls fn for each element |
Collect[T]() | Accumulates all elements; retrieve via Collector[T].Items() |
Fold[T, U](zero, fn) | Reduces to a single value; retrieve via FoldResult[U].Value() |
First[T]() | Captures the first element then cancels upstream |
Ignore[T]() | Discards all elements |
Chan[T](ch) | Writes each element to a Go channel; applies backpressure when full |
ToActor[T](pid) | Forwards each element to a GoAkt actor via Tell |
ToActorNamed[T](system, name) | Resolves actor by name and forwards via Tell |
Fan-out and fan-in
Broadcast
Broadcast fans a single source out to N independent branches. Every branch receives every element. Backpressure is
enforced by the slowest branch: the hub pulls from upstream only when all active branches have outstanding demand.
branches := stream.Broadcast[Event](stream.FromChannel(eventCh), 2)
// Branch 1: write to database
handle1, _ := stream.From(branches[0]).
To(stream.ForEach(func(e Event) { db.Insert(e) })).
Run(ctx, sys)
// Branch 2: publish to Kafka
handle2, _ := stream.From(branches[1]).
To(stream.ForEach(func(e Event) { kafka.Publish(e) })).
Run(ctx, sys)
Both pipelines must be materialized. The upstream sub-pipeline starts once the last branch materializes.
Balance
Balance distributes elements across N branches using round-robin routing with backpressure. Each element goes to
exactly one branch β the next one with available demand. Use this to parallelise work across independent consumers:
workers := stream.Balance[Job](stream.FromChannel(jobCh), 4)
for i, branch := range workers {
i := i
stream.From(branch).
To(stream.ForEach(func(j Job) { process(i, j) })).
Run(ctx, sys)
}
Merge fans N independent sources into a single downstream. Elements arrive in non-deterministic order; completion
happens when all inputs have completed:
merged := stream.Merge(
stream.FromChannel(highPriorityCh),
stream.FromChannel(lowPriorityCh),
)
stream.From(merged).
To(stream.ForEach(func(e Event) { handle(e) })).
Run(ctx, sys)
Combine (zip)
Combine pairs elements from two sources with a combine function (zip semantics). It completes when either source
is exhausted:
prices := stream.FromChannel(priceCh) // Source[Price]
orders := stream.FromChannel(orderCh) // Source[Order]
stream.From(stream.Combine(prices, orders, func(p Price, o Order) Quote {
return Quote{Price: p, Order: o}
})).To(stream.ForEach(func(q Quote) { submit(q) })).
Run(ctx, sys)
Pipeline DSL (Graph builder)
For non-linear topologies β fan-out branches that later merge, or multiple independent pipelines sharing a source β
use the Graph DSL:
g := stream.NewGraph()
g.Source("in", stream.FromChannel(eventCh))
g.Flow("validate", stream.From("in"), stream.Filter[Event](isValid))
g.Flow("enrich", stream.From("validate"), stream.Map(enrich))
g.Sink("db", stream.From("enrich"), stream.ForEach(func(e Event) { db.Insert(e) }))
g.Sink("metrics", stream.From("enrich"), stream.ForEach(func(e Event) { metrics.Record(e) }))
handle, err := g.Build().Run(ctx, sys)
Backpressure
GoAkt Streams uses credit-based demand propagation. The sink signals how many elements it can accept; each stage
propagates that demand upstream. Sources produce only what is requested.
Sink ββ[streamRequest n=224]βββΊ Flow ββ[streamRequest n=224]βββΊ Source
ββ[streamElement Γ 224]ββ ββ[streamElement Γ 224]ββ
Key parameters (in StageConfig):
| Parameter | Default | Purpose |
|---|
InitialDemand | 224 | First batch of demand sent by a sink to its upstream |
RefillThreshold | 64 | Credit level at which a refill is requested |
BufferSize | 256 | Mailbox buffer (bounded mailbox capacity) |
The sliding-window watermark means:
- When
credit > RefillThreshold, no refill is sent β the pipeline is healthy.
- When
credit β€ RefillThreshold, the stage requests InitialDemand - credit more elements from upstream.
- This keeps in-flight data bounded while keeping the pipeline saturated.
Error handling
Each stage can be configured with an independent ErrorStrategy:
| Strategy | Behaviour |
|---|
FailFast (default) | Propagate error downstream, shut down the pipeline |
Resume | Skip the failed element; request a replacement from upstream |
Retry | Re-process the element up to RetryConfig.MaxAttempts times before failing |
Supervise | Delegate to actor supervision (currently equivalent to FailFast) |
stream.From(stream.FromChannel(ch)).
Via(stream.TryMap(parse).
WithErrorStrategy(stream.Retry).
WithRetryConfig(stream.RetryConfig{MaxAttempts: 3})).
To(sink).
Run(ctx, sys)
Overflow strategies
Overflow strategies apply when a stageβs internal buffer is full:
| Strategy | Behaviour |
|---|
DropTail (default) | Discard the newest incoming element |
DropHead | Discard the oldest buffered element |
DropBuffer | Discard the entire buffer |
Backpressure | Block the upstream (apply natural backpressure) |
Fail | Treat overflow as a fatal error |
Stage fusion
Adjacent stateless stages (Map, Filter) are automatically fused into a single actor, eliminating intermediate
mailbox enqueues and reducing allocations.
// Without fusion (FuseNone): source β map-actor β filter-actor β sink-actor
// With fusion (FuseStateless, default): source β fused-actor β sink-actor
handle, err := graph.
WithFusion(stream.FuseStateless). // default; explicit here for clarity
Run(ctx, sys)
| Mode | Behaviour |
|---|
FuseStateless (default) | Fuses adjacent Map and Filter stages into one actor |
FuseNone | No fusion; each stage is a separate actor (useful for debugging) |
FuseAggressive | Fuses all fusable adjacent stages |
Fusion is transparent: the external API and semantics are identical. Disable it with FuseNone when profiling
individual stage throughput.
StreamHandle lifecycle
RunnableGraph.Run returns a StreamHandle for controlling the pipeline at runtime:
handle, err := graph.Run(ctx, sys)
if err != nil {
return err
}
// Wait for natural completion
<-handle.Done()
if err := handle.Err(); err != nil {
log.Printf("stream %s failed: %v", handle.ID(), err)
}
// Orderly shutdown: drains in-flight elements before stopping
if err := handle.Stop(ctx); err != nil {
return err
}
// Immediate abort: discards all buffered elements
handle.Abort()
| Method | Purpose |
|---|
ID() | Unique stream identifier |
Done() | Channel closed when the stream terminates (naturally or by error) |
Err() | Terminal error; valid only after Done() is closed |
Stop(ctx) | Orderly shutdown; drains in-flight elements, then stops all stages |
Abort() | Immediate termination; discards buffered elements |
Metrics() | Returns a StreamMetrics snapshot |
Observability
Metrics
StreamHandle.Metrics() returns a live snapshot of element counts:
m := handle.Metrics()
log.Printf("in=%d out=%d errors=%d", m.ElementsIn(), m.ElementsOut(), m.Errors())
Attach a Tracer to any Flow or Sink for per-element distributed tracing:
type myTracer struct{}
func (t *myTracer) OnElement(stageName string, seqNo uint64, latencyNs int64) {
span.AddEvent(stageName, trace.WithAttributes(
attribute.Int64("seqNo", int64(seqNo)),
attribute.Int64("latencyNs", latencyNs),
))
}
func (t *myTracer) OnDemand(stageName string, n int64) {}
func (t *myTracer) OnError(stageName string, err error) { span.RecordError(err) }
func (t *myTracer) OnComplete(stageName string) { span.End() }
stream.From(src).
Via(stream.Map(transform).WithTracer(&myTracer{})).
To(sink).
Run(ctx, sys)
Configuration reference
StageConfig carries per-stage configuration. Apply it via the builder methods on Flow, Sink, or Source:
stream.Map(fn).
WithErrorStrategy(stream.Retry).
WithRetryConfig(stream.RetryConfig{MaxAttempts: 3}).
WithName("parse-stage").
WithTracer(tracer).
WithMailbox(actor.NewBoundedMailbox(512))
| Field | Type | Default | Purpose |
|---|
InitialDemand | int64 | 224 | First demand batch from sink to upstream |
RefillThreshold | int64 | 64 | Credit level that triggers a refill request |
ErrorStrategy | ErrorStrategy | FailFast | Element-level failure handling |
RetryConfig.MaxAttempts | int | 1 | Retry attempts when ErrorStrategy is Retry |
OverflowStrategy | OverflowStrategy | DropTail | Behaviour when internal buffer is full |
PullTimeout | time.Duration | 5s | Timeout for FromActor pull requests |
BufferSize | int | 256 | Bounded mailbox capacity (BufferSize Γ 2 slots) |
Mailbox | actor.Mailbox | nil | Override the default mailbox implementation |
Name | string | auto | Custom actor name for this stage |
Tags | map[string]string | nil | Key/value labels propagated to metrics and traces |
Tracer | Tracer | nil | Optional distributed tracing hook |
Fusion | bool | true | Whether this stage may participate in stage fusion |
MicroBatch | int | 1 | Accumulate this many elements before forwarding (flow stages only) |
OnDrop | func(any, string) | nil | Called when an element is dropped due to overflow or exhausted retries |
Comparison with other approaches
Go channels and goroutines
Go channels are the native concurrency primitive. They are low-level, composable, and require no dependencies β but
they demand that the developer hand-craft every concern that GoAkt Streams handles automatically.
| Concern | Channels + goroutines | GoAkt Streams |
|---|
| Backpressure | Manual (blocking sends or select/default) | Automatic credit-based demand propagation |
| Error handling | Explicit error channels and coordination logic | ErrorStrategy per stage (FailFast, Resume, Retry) |
| Fan-out | Manual goroutine fan; complex synchronisation | Broadcast, Balance primitives |
| Fan-in | select or sync.WaitGroup coordination | Merge, Combine primitives |
| Windowing | Custom timer goroutines and buffers | Batch[T](n, maxWait) built-in |
| Rate limiting | Custom token bucket / ticker goroutines | Throttle[T](n, per) built-in |
| Cancellation | context.Context plumbing through every goroutine | handle.Stop(ctx) / handle.Abort() |
| Lifecycle | Manual sync.WaitGroup + goroutine tracking | StreamHandle.Done() + supervision tree |
| Observability | Manual counters | Built-in StreamMetrics and Tracer |
When to prefer channels: simple point-to-point pipelines with one producer and one consumer, or when you need
the absolute minimum overhead and are comfortable managing error propagation manually.
When to prefer GoAkt Streams: multi-stage transformations, fan-out/fan-in topologies, variable-rate sources,
anything requiring windowing, batching, or rate limiting, and whenever you want a managed lifecycle.
RxGo is a Go implementation of Reactive Extensions (ReactiveX). It offers a
rich operator library and a familiar API for developers coming from RxJS or RxJava.
| Aspect | RxGo | GoAkt Streams |
|---|
| Backpressure | Not supported in v2; the observable push model can overflow without consumer-side control | Pull-only, credit-based; source never outpaces downstream |
| Concurrency model | Goroutines with optional WithPool options | Actor-per-stage; supervised, lifecycle-managed |
| Type safety | interface{} items in v2; generic operators not yet complete | Full generics: Flow[In, Out], Source[T], Sink[T] |
| Actor integration | None | First-class: FromActor, ToActor, actor supervision |
| Error model | Error items in the stream; OnError handler | Typed ErrorStrategy (FailFast, Resume, Retry) per stage |
| Lifecycle | Dispose pattern | StreamHandle with Stop / Abort / Done |
| Operator richness | Very rich (debounce, zip, skip, take, windowβ¦) | Core operators; FlatMap, Batch, Throttle, ParallelMap |
| Fan-out | Connectable observables | Broadcast and Balance with automatic backpressure |
| Supervision | None | GoAkt actor supervision tree |
| Status | Maintenance mode | Actively maintained |
Choose RxGo if you are building standalone Go applications, want the full ReactiveX operator vocabulary, and do
not need backpressure or actor integration.
Choose GoAkt Streams if you need guaranteed backpressure, are already in a GoAkt system, or require supervised
stages with automatic restart.
Benthos / Redpanda Connect
Benthos (now Redpanda Connect) is a production-grade stream-processing engine focused on
connecting data systems. It is configuration-driven and ships with hundreds of connectors.
| Aspect | Benthos / Redpanda Connect | GoAkt Streams |
|---|
| Primary audience | Platform / DevOps teams building data pipelines | Go application developers integrating stream logic into services |
| Configuration model | YAML configuration files | Code-first Go API |
| Backpressure | Ack-based; built into every connector | Credit-based demand propagation |
| Connectors | 200+ built-in (Kafka, S3, HTTP, databasesβ¦) | User-defined via Source, Sink wrappers |
| Type safety | Untyped message blobs | Generic Source[T] / Flow[In,Out] |
| Embedding | Supported via Go library | Native β it is a Go library |
| Processing model | Component DAG configured in YAML | Composable Go values assembled in code |
| Testing | Unit tests via YAML fixtures | Standard Go tests; testkit |
| Error handling | Retry / DLQ per connector | ErrorStrategy + OnDrop callback per stage |
| Actor integration | None | First-class GoAkt actor interop |
Choose Benthos if you are building a standalone data pipeline product, need ready-made connectors to external
systems, or want configuration-driven deployments without writing Go code.
Choose GoAkt Streams if stream processing is one facet of a larger GoAkt application, you want to express pipelines
in idiomatic Go code, or you need deep integration with the actor supervision model.
Apache Kafka Streams
Kafka Streams is a JVM client library for building stateful stream
processors on top of Apache Kafka.
| Aspect | Kafka Streams | GoAkt Streams |
|---|
| Language | Java / Kotlin (JVM) | Go |
| Broker dependency | Requires Apache Kafka | No broker; in-process |
| Durability | Persistent topics; replayable | In-memory within the process; ephemeral |
| State stores | RocksDB-backed state stores; fault-tolerant | Stateful stages via actor state; no persistent store |
| Scalability | Horizontal via Kafka partitions | Vertical + actor location transparency |
| Backpressure | Implicit via Kafka consumer offset | Explicit credit-based demand |
| Exactly-once | Exactly-once semantics with transactions | Exactly-once within a local pipeline |
| Windowing | Tumbling, hopping, sliding, session windows | Batch[T](n, maxWait) for simple windowing |
| Table joins | KTable / KStream joins with changelog topics | Not built-in; compose via actor state |
| Operator richness | Filter, map, flatMap, groupBy, aggregate, join | See flow reference table above |
Choose Kafka Streams when you need durable, replayable, horizontally scalable stream processing on the JVM with
Kafka as the backbone.
Choose GoAkt Streams for in-process stream processing embedded in a Go service, with no external broker dependency
and tight integration with GoAkt actor supervision.
Akka Streams
Akka Streams (Scala / Java) is the direct inspiration for
GoAkt Streams. Both implement the Reactive Streams specification.
| Aspect | Akka Streams | GoAkt Streams |
|---|
| Language | Scala / Java (JVM) | Go |
| Backpressure | Reactive Streams specification | Credit-based pull; equivalent semantics |
| Graph DSL | Rich typed graph builder (GraphDSL) | Graph named-node builder |
| Stage fusion | Automatic operator fusion | FuseStateless (default), FuseNone, FuseAggressive |
| Materializer | ActorMaterializer | GoAkt ActorSystem |
| Error handling | SupervisionStrategy per flow | ErrorStrategy per stage |
| Fan-out | Broadcast, Balance, Partition | Broadcast, Balance |
| Fan-in | Merge, MergePreferred, Zip, ZipWith | Merge, Combine |
| Actor interop | Source.actorRef, Sink.actorRef | FromActor, ToActor, ToActorNamed |
| Type safety | Full (Scala generics) | Full (Go generics) |
| Operator richness | Very rich | Core operators; sufficient for most workloads |
| Runtime | JVM; Akka actor system | Go runtime; GoAkt actor system |
| Licence | BSL 1.1 (Lightbend) | MIT |
GoAkt Streams deliberately mirrors Akka Streamsβ core concepts β lazy Source, Flow, Sink, demand-driven
backpressure, stage fusion β while adapting them to Go idioms and the GoAkt actor hierarchy. The PullRequest /
PullResponse actor source protocol is analogous to Akkaβs Source.actorRefWithBackpressure.
- Observability β Metrics and OpenTelemetry integration
- Event Streams β Internal system and cluster event bus
- PubSub β Application-level topic-based pub/sub
- Actor Scheduling β Timer and cron scheduling used by
Tick and Batch
- Supervision β Actor failure handling that underpins stream stage recovery