You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode:
Use
ultrathink
when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
samber/ro — Reactive Streams for Go
Go implementation of
ReactiveX
. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
github.com/samber/ro
ro.samber.dev
pkg.go.dev/github.com/samber/ro
This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.
Why samber/ro (Streams vs Slices)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators.
samber/ro
solves this with declarative, chainable stream operators.
When to use which tool:
Scenario
Tool
Why
Transform a slice (map, filter, reduce)
samber/lo
Finite, synchronous, eager — no stream overhead needed
Simple goroutine fan-out with error handling
errgroup
Standard lib, lightweight, sufficient for bounded concurrency
Declarative pipeline with backpressure, retry, timeout, combine
Real-time data enrichment from multiple async sources
samber/ro
CombineLatest/Zip compose dependent streams without manual select
Pub/sub with multiple consumers sharing one source
samber/ro
Hot observables (Share/Subjects) handle multicast natively
Key differences: lo vs ro
Aspect
samber/lo
samber/ro
Data
Finite slices
Infinite streams
Execution
Synchronous, blocking
Asynchronous, non-blocking
Evaluation
Eager (allocates intermediate slices)
Lazy (processes items as they arrive)
Timing
Immediate
Time-aware (delay, throttle, interval, timeout)
Error model
Return
(T, error)
per call
Error channel propagates through pipeline
Use case
Collection transforms
Event-driven, real-time, async pipelines
Installation
go get github.com/samber/ro
Core Concepts
Four building blocks:
Observable
— a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
Observer
— a consumer with three callbacks:
onNext(T)
,
onError(error)
,
onComplete()
Operator
— a function that transforms an observable into another observable, chained via
Pipe
Subscription
— the connection between observable and observer. Call
.Wait()
to block or
.Unsubscribe()
to cancel
observable
:=
ro
.
Pipe2
(
ro
.
RangeWithInterval
(
0
,
5
,
1
*
time
.
Second
)
,
ro
.
Filter
(
func
(
x
int
)
bool
{
return
x
%
2
==
0
}
)
,
ro
.
Map
(
func
(
x
int
)
string
{
return
fmt
.
Sprintf
(
"even-%d"
,
x
)
}
)
,
)
observable
.
Subscribe
(
ro
.
NewObserver
(
func
(
s
string
)
{
fmt
.
Println
(
s
)
}
,
// onNext
func
(
err
error
)
{
log
.
Println
(
err
)
}
,
// onError
func
(
)
{
fmt
.
Println
(
"Done!"
)
}
,
// onComplete
)
)
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values
,
err
:=
ro
.
Collect
(
observable
)
Cold vs Hot Observables
Cold
(default): each
.Subscribe()
starts a new independent execution. Safe and predictable — use by default.
Hot
multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
Convert with
Behavior
Share()
Cold → hot with reference counting. Last unsubscribe tears down
ShareReplay(n)
Same as Share + buffers last N values for late subscribers
Connectable()
Cold → hot, but waits for explicit
.Connect()
call
Subjects
Natively hot — call
.Send()
,
.Error()
,
.Complete()
directly
Subject
Constructor
Replay behavior
PublishSubject
NewPublishSubjectT
None — late subscribers miss past events
BehaviorSubject
NewBehaviorSubjectT
Replays last value to new subscribers
ReplaySubject
NewReplaySubjectT
Replays last N values
AsyncSubject
NewAsyncSubjectT
Emits only last value, only on complete
UnicastSubject
NewUnicastSubjectT
Single subscriber only
For subject details and hot observable patterns, see
Subjects Guide
.
Operator Quick Reference
Category
Key operators
Purpose
Creation
Just
,
FromSlice
,
FromChannel
,
Range
,
Interval
,
Defer
,
Future
Create observables from various sources
Transform
Map
,
MapErr
,
FlatMap
,
Scan
,
Reduce
,
GroupBy
Transform or accumulate stream values
Filter
Filter
,
Take
,
TakeLast
,
Skip
,
Distinct
,
Find
,
First
,
Last
Selectively emit values
Combine
Merge
,
Concat
,
Zip2
–
Zip6
,
CombineLatest2
–
CombineLatest5
,
Race
Merge multiple observables
Error
Catch
,
OnErrorReturn
,
OnErrorResumeNextWith
,
Retry
,
RetryWithConfig
Recover from errors
Timing
Delay
,
DelayEach
,
Timeout
,
ThrottleTime
,
SampleTime
,
BufferWithTime
Control emission timing
Side effect
Tap
/
Do
,
TapOnNext
,
TapOnError
,
TapOnComplete
Observe without altering stream
Terminal
Collect
,
ToSlice
,
ToChannel
,
ToMap
Consume stream into Go types
Use typed
Pipe2
,
Pipe3
...
Pipe25
for compile-time type safety across operator chains. The untyped
Pipe
uses
any
and loses type checking.
For the complete operator catalog (150+ operators with signatures), see
Operators Guide
.
Common Mistakes
Mistake
Why it fails
Fix
Using
ro.OnNext()
without error handler
Errors are silently dropped — bugs hide in production
Use
ro.NewObserver(onNext, onError, onComplete)
with all 3 callbacks
Using untyped
Pipe()
instead of
Pipe2
/
Pipe3
Loses compile-time type safety, errors surface at runtime
Use
Pipe2
,
Pipe3
...
Pipe25
for typed operator chains
Forgetting
.Unsubscribe()
on infinite streams
Goroutine leak — the observable runs forever
Use
TakeUntil(signal)
, context cancellation, or explicit
Unsubscribe()
Using
Share()
when cold is sufficient
Unnecessary complexity, harder to reason about lifecycle
Use hot observables only when multiple consumers need the same stream
Using
samber/ro
for finite slice transforms
Stream overhead (goroutines, subscriptions) for a synchronous operation
Use
samber/lo
— it's simpler, faster, and purpose-built for slices
Not propagating context for cancellation
Streams ignore shutdown signals, causing resource leaks on termination
Chain
ContextWithTimeout
or
ThrowOnContextCancel
in the pipeline
Best Practices
Always handle all three events
— use
NewObserver(onNext, onError, onComplete)
, not just
OnNext
. Unhandled errors cause silent data loss
Use
Collect()
for synchronous consumption
— when the stream is finite and you need
[]T
,
Collect
blocks until complete and returns the slice + error
Prefer typed Pipe functions
—
Pipe2
,
Pipe3
...
Pipe25
catch type mismatches at compile time. Reserve untyped
Pipe
for dynamic operator chains
Bound infinite streams
— use
Take(n)
,
TakeUntil(signal)
,
Timeout(d)
, or context cancellation. Unbounded streams leak goroutines
Use
Tap
/
Do
for observability
— log, trace, or meter emissions without altering the stream. Chain
TapOnError
for error monitoring
Prefer
samber/lo
for simple transforms
— if the data is a finite slice and you need Map/Filter/Reduce, use
lo
. Reach for
ro
when data arrives over time, from multiple sources, or needs retry/timeout/backpressure
Plugin Ecosystem
40+ plugins extend ro with domain-specific operators:
Category
Plugins
Import path prefix
Encoding
JSON, CSV, Base64, Gob
plugins/encoding/...
Network
HTTP, I/O, FSNotify
plugins/http
,
plugins/io
,
plugins/fsnotify
Scheduling
Cron, ICS
plugins/cron
,
plugins/ics
Observability
Zap, Slog, Zerolog, Logrus, Sentry, Oops
plugins/observability/...
,
plugins/samber/oops
Rate limiting
Native, Ulule
plugins/ratelimit/...
Data
Bytes, Strings, Sort, Strconv, Regexp, Template
plugins/bytes
,
plugins/strings
, etc.
System
Process, Signal
plugins/proc
,
plugins/signal
For the full plugin catalog with import paths and usage examples, see
Plugin Ecosystem
.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see
Patterns
.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at
github.com/samber/ro/issues
.
Cross-References
→ See
samber/cc-skills-golang@golang-samber-lo
skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
→ See
samber/cc-skills-golang@golang-samber-mo
skill for monadic types (Option, Result, Either) that compose with ro pipelines
→ See
samber/cc-skills-golang@golang-samber-hot
skill for in-memory caching (also available as an ro plugin)
→ See
samber/cc-skills-golang@golang-concurrency
skill for goroutine/channel patterns when reactive streams are overkill
→ See
samber/cc-skills-golang@golang-observability
skill for monitoring reactive pipelines in production