RunnableGraph.Run is called against a live
ActorSystem.
Overview
| Concept | What it means in GoAkt Streams |
|---|---|
| Correctness | Exactly-once delivery semantics within a local pipeline |
| Backpressure | Credit-based demand pull prevents unbounded buffering |
| Composability | Stages are plain Go values; pipelines are assembled declaratively |
| Actor-native | Every stage is an actor; supervision, scheduling, and routing apply naturally |
| Type-safety | Source[T], Flow[In, Out], Sink[T] carry type parameters through the pipeline |
Core abstractions
| Type | Role |
|---|---|
Source[T] | Origin of stream elements; produces values of type T |
Flow[In, Out] | Transformation stage; consumes In, emits Out |
Sink[T] | Terminal consumer; processes elements and drives backpressure |
RunnableGraph | Fully assembled pipeline; a value type that can be Run multiple times |
StreamHandle | Live handle returned by Run; controls lifecycle and exposes metrics |
Quick start
Linear pipeline
Use the fluentFrom / Via / To builder for straight-line pipelines:
Type-changing flows
When aFlow changes the element type, use the package-level Via free function or ViaLinear:
Sources
Sources are the origin of stream data. All sources apply backpressure β they produce only as many elements as downstream demands.Finite sources
Channel source
Bridges an external Go channel into the pipeline. Backpressure naturally limits how fast the goroutine sends:Actor source
Pulls elements from a GoAkt actor using thePullRequest / PullResponse[T] protocol. Useful for integrating existing
actor-based producers:
Tick source
Emits the current time on a fixed interval. Runs indefinitely untilStop or Abort is called:
Network source
Reads[]byte frames from a net.Conn. Each Read call produces one element; demand controls how many reads
are batched per upstream request:
Reference
| Constructor | Description |
|---|---|
Of[T](values...) | Finite source from a fixed set of values |
Range(start, end) | Integer range [start, end) |
Unfold[S, T](seed, step) | Generates values from a seed with a stateful step function |
FromChannel[T](ch) | Reads from a Go channel; completes when the channel closes |
FromActor[T](pid) | Pulls from a GoAkt actor via PullRequest / PullResponse[T] |
Tick(interval) | Emits time.Time on a fixed interval; runs until cancelled |
Merge[T](sources...) | Fans N sources into one; completes when all inputs complete |
Combine[T, U, V](left, right, fn) | Zips two sources pairwise via fn (zip semantics) |
Broadcast[T](src, n) | Fans one source out to N independent branches |
Balance[T](src, n) | Distributes one source across N branches with round-robin backpressure |
FromConn(conn, bufSize) | Reads []byte frames from a net.Conn |
Flows
Flows are lazy transformation stages. Each flow operator returns a newFlow value; multiple flows can be chained
without materializing anything.
Mapping and filtering
Windowing and rate control
Stateful flows
Parallel processing
ParallelMap applies a function concurrently with up to n goroutines. Use OrderedParallelMap when output order
must match input order:
Reference
| Constructor | Description |
|---|---|
Map[In, Out](fn) | Type-changing transformation; no error path |
TryMap[In, Out](fn) | Transformation with error; ErrorStrategy controls failure handling |
Filter[T](predicate) | Keeps only elements where predicate returns true |
FlatMap[In, Out](fn) | Expands each element into a slice of outputs |
Flatten[T]() | Unwraps []T elements into individual elements |
Batch[T](n, maxWait) | Groups into []T slices of at most n; flushes early after maxWait |
Buffer[T](size, strategy) | Asynchronous buffer with configurable overflow strategy |
Throttle[T](n, per) | Limits throughput to at most n elements per per duration |
Deduplicate[T]() | Suppresses consecutive duplicate elements (T must be comparable) |
Scan[In, State](zero, fn) | Running accumulation; emits each intermediate state |
WithContext[T](key, value) | Marks a named tracing boundary; passes elements through unchanged |
ParallelMap[In, Out](n, fn) | Applies fn concurrently with up to n goroutines; unordered output |
OrderedParallelMap[In, Out](n, fn) | Like ParallelMap but preserves input order via min-heap resequencing |
Sinks
Sinks are the terminal consumers of a pipeline. They drive backpressure by signalling demand upstream; the pipeline does not produce faster than the sink can consume.Common sinks
Actor and channel sinks
Reference
| Constructor | Description |
|---|---|
ForEach[T](fn) | Calls fn for each element |
Collect[T]() | Accumulates all elements; retrieve via Collector[T].Items() |
Fold[T, U](zero, fn) | Reduces to a single value; retrieve via FoldResult[U].Value() |
First[T]() | Captures the first element then cancels upstream |
Ignore[T]() | Discards all elements |
Chan[T](ch) | Writes each element to a Go channel; applies backpressure when full |
ToActor[T](pid) | Forwards each element to a GoAkt actor via Tell |
ToActorNamed[T](system, name) | Resolves actor by name and forwards via Tell |
Fan-out and fan-in
Broadcast
Broadcast fans a single source out to N independent branches. Every branch receives every element. Backpressure is
enforced by the slowest branch: the hub pulls from upstream only when all active branches have outstanding demand.
Balance
Balance distributes elements across N branches using round-robin routing with backpressure. Each element goes to
exactly one branch β the next one with available demand. Use this to parallelise work across independent consumers:
Merge
Merge fans N independent sources into a single downstream. Elements arrive in non-deterministic order; completion
happens when all inputs have completed:
Combine (zip)
Combine pairs elements from two sources with a combine function (zip semantics). It completes when either source
is exhausted:
Pipeline DSL (Graph builder)
For non-linear topologies β fan-out branches that later merge, or multiple independent pipelines sharing a source β use theGraph DSL:
Backpressure
GoAkt Streams uses credit-based demand propagation. The sink signals how many elements it can accept; each stage propagates that demand upstream. Sources produce only what is requested.StageConfig):
| Parameter | Default | Purpose |
|---|---|---|
InitialDemand | 224 | First batch of demand sent by a sink to its upstream |
RefillThreshold | 64 | Credit level at which a refill is requested |
BufferSize | 256 | Mailbox buffer (bounded mailbox capacity) |
- When
credit > RefillThreshold, no refill is sent β the pipeline is healthy. - When
credit β€ RefillThreshold, the stage requestsInitialDemand - creditmore elements from upstream. - This keeps in-flight data bounded while keeping the pipeline saturated.
Error handling
Each stage can be configured with an independentErrorStrategy:
| Strategy | Behaviour |
|---|---|
FailFast (default) | Propagate error downstream, shut down the pipeline |
Resume | Skip the failed element; request a replacement from upstream |
Retry | Re-process the element up to RetryConfig.MaxAttempts times before failing |
Supervise | Delegate to actor supervision (currently equivalent to FailFast) |
Overflow strategies
Overflow strategies apply when a stageβs internal buffer is full:| Strategy | Behaviour |
|---|---|
DropTail (default) | Discard the newest incoming element |
DropHead | Discard the oldest buffered element |
DropBuffer | Discard the entire buffer |
Backpressure | Block the upstream (apply natural backpressure) |
Fail | Treat overflow as a fatal error |
Stage fusion
Adjacent stateless stages (Map, Filter) are automatically fused into a single actor, eliminating intermediate
mailbox enqueues and reducing allocations.
| Mode | Behaviour |
|---|---|
FuseStateless (default) | Fuses adjacent Map and Filter stages into one actor |
FuseNone | No fusion; each stage is a separate actor (useful for debugging) |
FuseAggressive | Fuses all fusable adjacent stages |
FuseNone when profiling
individual stage throughput.
StreamHandle lifecycle
RunnableGraph.Run returns a StreamHandle for controlling the pipeline at runtime:
| Method | Purpose |
|---|---|
ID() | Unique stream identifier |
Done() | Channel closed when the stream terminates (naturally or by error) |
Err() | Terminal error; valid only after Done() is closed |
Stop(ctx) | Orderly shutdown; drains in-flight elements, then stops all stages |
Abort() | Immediate termination; discards buffered elements |
Metrics() | Returns a StreamMetrics snapshot |
Observability
Metrics
StreamHandle.Metrics() returns a live snapshot of element counts:
Tracer
Attach aTracer to any Flow or Sink for per-element distributed tracing:
Configuration reference
StageConfig carries per-stage configuration. Apply it via the builder methods on Flow, Sink, or Source:
| Field | Type | Default | Purpose |
|---|---|---|---|
InitialDemand | int64 | 224 | First demand batch from sink to upstream |
RefillThreshold | int64 | 64 | Credit level that triggers a refill request |
ErrorStrategy | ErrorStrategy | FailFast | Element-level failure handling |
RetryConfig.MaxAttempts | int | 1 | Retry attempts when ErrorStrategy is Retry |
OverflowStrategy | OverflowStrategy | DropTail | Behaviour when internal buffer is full |
PullTimeout | time.Duration | 5s | Timeout for FromActor pull requests |
BufferSize | int | 256 | Bounded mailbox capacity (BufferSize Γ 2 slots) |
Mailbox | actor.Mailbox | nil | Override the default mailbox implementation |
Name | string | auto | Custom actor name for this stage |
Tags | map[string]string | nil | Key/value labels propagated to metrics and traces |
Tracer | Tracer | nil | Optional distributed tracing hook |
Fusion | bool | true | Whether this stage may participate in stage fusion |
MicroBatch | int | 1 | Accumulate this many elements before forwarding (flow stages only) |
OnDrop | func(any, string) | nil | Called when an element is dropped due to overflow or exhausted retries |
Comparison with other approaches
Go channels and goroutines
Go channels are the native concurrency primitive. They are low-level, composable, and require no dependencies β but they demand that the developer hand-craft every concern that GoAkt Streams handles automatically.| Concern | Channels + goroutines | GoAkt Streams |
|---|---|---|
| Backpressure | Manual (blocking sends or select/default) | Automatic credit-based demand propagation |
| Error handling | Explicit error channels and coordination logic | ErrorStrategy per stage (FailFast, Resume, Retry) |
| Fan-out | Manual goroutine fan; complex synchronisation | Broadcast, Balance primitives |
| Fan-in | select or sync.WaitGroup coordination | Merge, Combine primitives |
| Windowing | Custom timer goroutines and buffers | Batch[T](n, maxWait) built-in |
| Rate limiting | Custom token bucket / ticker goroutines | Throttle[T](n, per) built-in |
| Cancellation | context.Context plumbing through every goroutine | handle.Stop(ctx) / handle.Abort() |
| Lifecycle | Manual sync.WaitGroup + goroutine tracking | StreamHandle.Done() + supervision tree |
| Observability | Manual counters | Built-in StreamMetrics and Tracer |
RxGo
RxGo is a Go implementation of Reactive Extensions (ReactiveX). It offers a rich operator library and a familiar API for developers coming from RxJS or RxJava.| Aspect | RxGo | GoAkt Streams |
|---|---|---|
| Backpressure | Not supported in v2; the observable push model can overflow without consumer-side control | Pull-only, credit-based; source never outpaces downstream |
| Concurrency model | Goroutines with optional WithPool options | Actor-per-stage; supervised, lifecycle-managed |
| Type safety | interface{} items in v2; generic operators not yet complete | Full generics: Flow[In, Out], Source[T], Sink[T] |
| Actor integration | None | First-class: FromActor, ToActor, actor supervision |
| Error model | Error items in the stream; OnError handler | Typed ErrorStrategy (FailFast, Resume, Retry) per stage |
| Lifecycle | Dispose pattern | StreamHandle with Stop / Abort / Done |
| Operator richness | Very rich (debounce, zip, skip, take, windowβ¦) | Core operators; FlatMap, Batch, Throttle, ParallelMap |
| Fan-out | Connectable observables | Broadcast and Balance with automatic backpressure |
| Supervision | None | GoAkt actor supervision tree |
| Status | Maintenance mode | Actively maintained |
Benthos / Redpanda Connect
Benthos (now Redpanda Connect) is a production-grade stream-processing engine focused on connecting data systems. It is configuration-driven and ships with hundreds of connectors.| Aspect | Benthos / Redpanda Connect | GoAkt Streams |
|---|---|---|
| Primary audience | Platform / DevOps teams building data pipelines | Go application developers integrating stream logic into services |
| Configuration model | YAML configuration files | Code-first Go API |
| Backpressure | Ack-based; built into every connector | Credit-based demand propagation |
| Connectors | 200+ built-in (Kafka, S3, HTTP, databasesβ¦) | User-defined via Source, Sink wrappers |
| Type safety | Untyped message blobs | Generic Source[T] / Flow[In,Out] |
| Embedding | Supported via Go library | Native β it is a Go library |
| Processing model | Component DAG configured in YAML | Composable Go values assembled in code |
| Testing | Unit tests via YAML fixtures | Standard Go tests; testkit |
| Error handling | Retry / DLQ per connector | ErrorStrategy + OnDrop callback per stage |
| Actor integration | None | First-class GoAkt actor interop |
Apache Kafka Streams
Kafka Streams is a JVM client library for building stateful stream processors on top of Apache Kafka.| Aspect | Kafka Streams | GoAkt Streams |
|---|---|---|
| Language | Java / Kotlin (JVM) | Go |
| Broker dependency | Requires Apache Kafka | No broker; in-process |
| Durability | Persistent topics; replayable | In-memory within the process; ephemeral |
| State stores | RocksDB-backed state stores; fault-tolerant | Stateful stages via actor state; no persistent store |
| Scalability | Horizontal via Kafka partitions | Vertical + actor location transparency |
| Backpressure | Implicit via Kafka consumer offset | Explicit credit-based demand |
| Exactly-once | Exactly-once semantics with transactions | Exactly-once within a local pipeline |
| Windowing | Tumbling, hopping, sliding, session windows | Batch[T](n, maxWait) for simple windowing |
| Table joins | KTable / KStream joins with changelog topics | Not built-in; compose via actor state |
| Operator richness | Filter, map, flatMap, groupBy, aggregate, join | See flow reference table above |
Akka Streams
Akka Streams (Scala / Java) is the direct inspiration for GoAkt Streams. Both implement the Reactive Streams specification.| Aspect | Akka Streams | GoAkt Streams |
|---|---|---|
| Language | Scala / Java (JVM) | Go |
| Backpressure | Reactive Streams specification | Credit-based pull; equivalent semantics |
| Graph DSL | Rich typed graph builder (GraphDSL) | Graph named-node builder |
| Stage fusion | Automatic operator fusion | FuseStateless (default), FuseNone, FuseAggressive |
| Materializer | ActorMaterializer | GoAkt ActorSystem |
| Error handling | SupervisionStrategy per flow | ErrorStrategy per stage |
| Fan-out | Broadcast, Balance, Partition | Broadcast, Balance |
| Fan-in | Merge, MergePreferred, Zip, ZipWith | Merge, Combine |
| Actor interop | Source.actorRef, Sink.actorRef | FromActor, ToActor, ToActorNamed |
| Type safety | Full (Scala generics) | Full (Go generics) |
| Operator richness | Very rich | Core operators; sufficient for most workloads |
| Runtime | JVM; Akka actor system | Go runtime; GoAkt actor system |
| Licence | BSL 1.1 (Lightbend) | MIT |
Source, Flow, Sink, demand-driven
backpressure, stage fusion β while adapting them to Go idioms and the GoAkt actor hierarchy. The PullRequest /
PullResponse actor source protocol is analogous to Akkaβs Source.actorRefWithBackpressure.
Related
- Observability β Metrics and OpenTelemetry integration
- Event Streams β Internal system and cluster event bus
- PubSub β Application-level topic-based pub/sub
- Actor Scheduling β Timer and cron scheduling used by
TickandBatch - Supervision β Actor failure handling that underpins stream stage recovery