reactive-programming

安装量: 148
排名: #5828

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill reactive-programming

Reactive Programming Overview

Build responsive applications using reactive streams and observables for handling asynchronous data flows.

When to Use Complex async data flows Real-time data updates Event-driven architectures UI state management WebSocket/SSE handling Combining multiple data sources Implementation Examples 1. RxJS Basics import { Observable, Subject, BehaviorSubject, fromEvent, interval } from 'rxjs'; import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

// Create observable from array const numbers$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); });

numbers$.subscribe({ next: value => console.log(value), error: err => console.error(err), complete: () => console.log('Done') });

// Subject (multicast) const subject = new Subject();

subject.subscribe(value => console.log('Sub 1:', value)); subject.subscribe(value => console.log('Sub 2:', value));

subject.next(1); // Both subscribers receive

// BehaviorSubject (with initial value) const state$ = new BehaviorSubject({ count: 0 });

state$.subscribe(state => console.log('State:', state));

state$.next({ count: 1 }); state$.next({ count: 2 });

// Operators const source$ = interval(1000);

source$.pipe( map(n => n * 2), filter(n => n > 5), take(5) ).subscribe(value => console.log(value));

  1. Search with Debounce import { fromEvent } from 'rxjs'; import { debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs/operators'; import { of } from 'rxjs';

const searchInput = document.querySelector('#search') as HTMLInputElement;

const search$ = fromEvent(searchInput, 'input').pipe( map((event: Event) => (event.target as HTMLInputElement).value), debounceTime(300), // Wait 300ms after typing distinctUntilChanged(), // Only if value changed switchMap(query => { if (!query) return of([]);

return fetch(`/api/search?q=${query}`)
  .then(res => res.json())
  .catch(() => of([]));

}), catchError(error => { console.error('Search error:', error); return of([]); }) );

search$.subscribe(results => { console.log('Search results:', results); displayResults(results); });

function displayResults(results: any[]) { // Update UI }

  1. State Management import { BehaviorSubject } from 'rxjs'; import { map } from 'rxjs/operators';

interface AppState { user: { id: string; name: string } | null; cart: Array<{ id: string; quantity: number }>; loading: boolean; }

class StateManager { private state$ = new BehaviorSubject({ user: null, cart: [], loading: false });

// Selectors user$ = this.state$.pipe( map(state => state.user), distinctUntilChanged() );

cart$ = this.state$.pipe( map(state => state.cart), distinctUntilChanged() );

cartTotal$ = this.cart$.pipe( map(cart => cart.reduce((sum, item) => sum + item.quantity, 0)) );

loading$ = this.state$.pipe( map(state => state.loading) );

// Actions setUser(user: AppState['user']): void { this.state$.next({ ...this.state$.value, user }); }

addToCart(item: { id: string; quantity: number }): void { const cart = [...this.state$.value.cart]; const existing = cart.find(i => i.id === item.id);

if (existing) {
  existing.quantity += item.quantity;
} else {
  cart.push(item);
}

this.state$.next({
  ...this.state$.value,
  cart
});

}

setLoading(loading: boolean): void { this.state$.next({ ...this.state$.value, loading }); }

getState(): AppState { return this.state$.value; } }

// Usage const store = new StateManager();

store.user$.subscribe(user => { console.log('User:', user); });

store.cartTotal$.subscribe(total => { console.log('Cart items:', total); });

store.setUser({ id: '123', name: 'John' }); store.addToCart({ id: 'item1', quantity: 2 });

  1. WebSocket with Reconnection import { Observable, timer } from 'rxjs'; import { retryWhen, tap, delayWhen } from 'rxjs/operators';

function createWebSocketObservable(url: string): Observable { return new Observable(subscriber => { let ws: WebSocket;

const connect = () => {
  ws = new WebSocket(url);

  ws.onopen = () => {
    console.log('WebSocket connected');
  };

  ws.onmessage = (event) => {
    try {
      const data = JSON.parse(event.data);
      subscriber.next(data);
    } catch (error) {
      console.error('Parse error:', error);
    }
  };

  ws.onerror = (error) => {
    console.error('WebSocket error:', error);
    subscriber.error(error);
  };

  ws.onclose = () => {
    console.log('WebSocket closed');
    subscriber.error(new Error('Connection closed'));
  };
};

connect();

return () => {
  if (ws) {
    ws.close();
  }
};

}).pipe( retryWhen(errors => errors.pipe( tap(err => console.log('Retrying connection...', err)), delayWhen((_, i) => timer(Math.min(1000 * Math.pow(2, i), 30000))) ) ) ); }

// Usage const ws$ = createWebSocketObservable('wss://api.example.com/ws');

ws$.subscribe({ next: data => console.log('Received:', data), error: err => console.error('Error:', err) });

  1. Combining Multiple Streams import { combineLatest, merge, forkJoin, zip } from 'rxjs';

// combineLatest - emits when any input emits const users$ = fetchUsers(); const settings$ = fetchSettings();

combineLatest([users$, settings$]).subscribe(([users, settings]) => { console.log('Users:', users); console.log('Settings:', settings); });

// merge - combine multiple observables const clicks$ = fromEvent(button1, 'click'); const hovers$ = fromEvent(button2, 'mouseover');

merge(clicks$, hovers$).subscribe(event => { console.log('Event:', event.type); });

// forkJoin - wait for all to complete (like Promise.all) forkJoin({ users: fetchUsers(), posts: fetchPosts(), comments: fetchComments() }).subscribe(({ users, posts, comments }) => { console.log('All data loaded:', { users, posts, comments }); });

// zip - combine corresponding values const names$ = of('Alice', 'Bob', 'Charlie'); const ages$ = of(25, 30, 35);

zip(names$, ages$).subscribe(([name, age]) => { console.log(${name} is ${age} years old); });

  1. Backpressure Handling import { Subject } from 'rxjs'; import { bufferTime, throttleTime } from 'rxjs/operators';

// Buffer events const events$ = new Subject();

events$.pipe( bufferTime(1000), // Collect events for 1 second filter(buffer => buffer.length > 0) ).subscribe(events => { console.log('Batch:', events); processBatch(events); });

// Throttle events const clicks$ = fromEvent(button, 'click');

clicks$.pipe( throttleTime(1000) // Only allow one every second ).subscribe(() => { console.log('Click processed'); });

function processBatch(events: string[]) { // Process batch }

  1. Custom Operators import { Observable } from 'rxjs';

function tapLog(message: string) { return (source: Observable) => { return new Observable(subscriber => { return source.subscribe({ next: value => { console.log(message, value); subscriber.next(value); }, error: err => subscriber.error(err), complete: () => subscriber.complete() }); }); }; }

// Usage source$.pipe( tapLog('Before map:'), map(x => x * 2), tapLog('After map:') ).subscribe();

Best Practices ✅ DO Unsubscribe to prevent memory leaks Use operators to transform data Handle errors properly Use shareReplay for expensive operations Combine streams when needed Test reactive code ❌ DON'T Subscribe multiple times to same observable Forget to unsubscribe Use nested subscriptions Ignore error handling Make observables stateful Common Operators Operator Purpose map Transform values filter Filter values debounceTime Wait before emitting distinctUntilChanged Only emit if changed switchMap Switch to new observable mergeMap Merge multiple observables catchError Handle errors tap Side effects take Take n values takeUntil Take until condition Resources RxJS Documentation Learn RxJS RxJS Marbles

返回排行榜