golang-samber-ro

安装量: 2.3K
排名: #2307

安装

npx skills add https://github.com/samber/cc-skills-golang --skill golang-samber-ro
Persona:
You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode:
Use
ultrathink
when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
samber/ro — Reactive Streams for Go
Go implementation of
ReactiveX
. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
github.com/samber/ro
ro.samber.dev
pkg.go.dev/github.com/samber/ro
This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.
Why samber/ro (Streams vs Slices)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators.
samber/ro
solves this with declarative, chainable stream operators.
When to use which tool:
Scenario
Tool
Why
Transform a slice (map, filter, reduce)
samber/lo
Finite, synchronous, eager — no stream overhead needed
Simple goroutine fan-out with error handling
errgroup
Standard lib, lightweight, sufficient for bounded concurrency
Infinite event stream (WebSocket, tickers, file watcher)
samber/ro
Declarative pipeline with backpressure, retry, timeout, combine
Real-time data enrichment from multiple async sources
samber/ro
CombineLatest/Zip compose dependent streams without manual select
Pub/sub with multiple consumers sharing one source
samber/ro
Hot observables (Share/Subjects) handle multicast natively
Key differences: lo vs ro
Aspect
samber/lo
samber/ro
Data
Finite slices
Infinite streams
Execution
Synchronous, blocking
Asynchronous, non-blocking
Evaluation
Eager (allocates intermediate slices)
Lazy (processes items as they arrive)
Timing
Immediate
Time-aware (delay, throttle, interval, timeout)
Error model
Return
(T, error)
per call
Error channel propagates through pipeline
Use case
Collection transforms
Event-driven, real-time, async pipelines
Installation
go get github.com/samber/ro
Core Concepts
Four building blocks:
Observable
— a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
Observer
— a consumer with three callbacks:
onNext(T)
,
onError(error)
,
onComplete()
Operator
— a function that transforms an observable into another observable, chained via
Pipe
Subscription
— the connection between observable and observer. Call
.Wait()
to block or
.Unsubscribe()
to cancel
observable
:=
ro
.
Pipe2
(
ro
.
RangeWithInterval
(
0
,
5
,
1
*
time
.
Second
)
,
ro
.
Filter
(
func
(
x
int
)
bool
{
return
x
%
2
==
0
}
)
,
ro
.
Map
(
func
(
x
int
)
string
{
return
fmt
.
Sprintf
(
"even-%d"
,
x
)
}
)
,
)
observable
.
Subscribe
(
ro
.
NewObserver
(
func
(
s
string
)
{
fmt
.
Println
(
s
)
}
,
// onNext
func
(
err
error
)
{
log
.
Println
(
err
)
}
,
// onError
func
(
)
{
fmt
.
Println
(
"Done!"
)
}
,
// onComplete
)
)
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values
,
err
:=
ro
.
Collect
(
observable
)
Cold vs Hot Observables
Cold
(default): each
.Subscribe()
starts a new independent execution. Safe and predictable — use by default.
Hot
multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events. Convert with Behavior Share() Cold → hot with reference counting. Last unsubscribe tears down ShareReplay(n) Same as Share + buffers last N values for late subscribers Connectable() Cold → hot, but waits for explicit .Connect() call Subjects Natively hot — call .Send() , .Error() , .Complete() directly Subject Constructor Replay behavior PublishSubject NewPublishSubjectT None — late subscribers miss past events BehaviorSubject NewBehaviorSubjectT Replays last value to new subscribers ReplaySubject NewReplaySubjectT Replays last N values AsyncSubject NewAsyncSubjectT Emits only last value, only on complete UnicastSubject NewUnicastSubjectT Single subscriber only For subject details and hot observable patterns, see Subjects Guide . Operator Quick Reference Category Key operators Purpose Creation Just , FromSlice , FromChannel , Range , Interval , Defer , Future Create observables from various sources Transform Map , MapErr , FlatMap , Scan , Reduce , GroupBy Transform or accumulate stream values Filter Filter , Take , TakeLast , Skip , Distinct , Find , First , Last Selectively emit values Combine Merge , Concat , Zip2 – Zip6 , CombineLatest2 – CombineLatest5 , Race Merge multiple observables Error Catch , OnErrorReturn , OnErrorResumeNextWith , Retry , RetryWithConfig Recover from errors Timing Delay , DelayEach , Timeout , ThrottleTime , SampleTime , BufferWithTime Control emission timing Side effect Tap / Do , TapOnNext , TapOnError , TapOnComplete Observe without altering stream Terminal Collect , ToSlice , ToChannel , ToMap Consume stream into Go types Use typed Pipe2 , Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking. For the complete operator catalog (150+ operators with signatures), see Operators Guide . Common Mistakes Mistake Why it fails Fix Using ro.OnNext() without error handler Errors are silently dropped — bugs hide in production Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks Using untyped Pipe() instead of Pipe2 / Pipe3 Loses compile-time type safety, errors surface at runtime Use Pipe2 , Pipe3 ... Pipe25 for typed operator chains Forgetting .Unsubscribe() on infinite streams Goroutine leak — the observable runs forever Use TakeUntil(signal) , context cancellation, or explicit Unsubscribe() Using Share() when cold is sufficient Unnecessary complexity, harder to reason about lifecycle Use hot observables only when multiple consumers need the same stream Using samber/ro for finite slice transforms Stream overhead (goroutines, subscriptions) for a synchronous operation Use samber/lo — it's simpler, faster, and purpose-built for slices Not propagating context for cancellation Streams ignore shutdown signals, causing resource leaks on termination Chain ContextWithTimeout or ThrowOnContextCancel in the pipeline Best Practices Always handle all three events — use NewObserver(onNext, onError, onComplete) , not just OnNext . Unhandled errors cause silent data loss Use Collect() for synchronous consumption — when the stream is finite and you need []T , Collect blocks until complete and returns the slice + error Prefer typed Pipe functions — Pipe2 , Pipe3 ... Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chains Bound infinite streams — use Take(n) , TakeUntil(signal) , Timeout(d) , or context cancellation. Unbounded streams leak goroutines Use Tap / Do for observability — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoring Prefer samber/lo for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use lo . Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressure Plugin Ecosystem 40+ plugins extend ro with domain-specific operators: Category Plugins Import path prefix Encoding JSON, CSV, Base64, Gob plugins/encoding/... Network HTTP, I/O, FSNotify plugins/http , plugins/io , plugins/fsnotify Scheduling Cron, ICS plugins/cron , plugins/ics Observability Zap, Slog, Zerolog, Logrus, Sentry, Oops plugins/observability/... , plugins/samber/oops Rate limiting Native, Ulule plugins/ratelimit/... Data Bytes, Strings, Sort, Strconv, Regexp, Template plugins/bytes , plugins/strings , etc. System Process, Signal plugins/proc , plugins/signal For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem . For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns . If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues . Cross-References → See samber/cc-skills-golang@golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice → See samber/cc-skills-golang@golang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelines → See samber/cc-skills-golang@golang-samber-hot skill for in-memory caching (also available as an ro plugin) → See samber/cc-skills-golang@golang-concurrency skill for goroutine/channel patterns when reactive streams are overkill → See samber/cc-skills-golang@golang-observability skill for monitoring reactive pipelines in production
返回排行榜