RxJS Implementation Skill
Quick Start
Observable Basics
import { Observable } from 'rxjs';
// Create observable const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); });
// Subscribe const subscription = observable.subscribe({ next: (value) => console.log(value), error: (error) => console.error(error), complete: () => console.log('Done') });
// Unsubscribe subscription.unsubscribe();
Common Operators
import { map, filter, switchMap, takeUntil } from 'rxjs/operators';
// Transformation data$.pipe( map(user => user.name), filter(name => name.length > 0) ).subscribe(name => console.log(name));
// Higher-order userId$.pipe( switchMap(id => this.userService.getUser(id)) ).subscribe(user => console.log(user));
Subjects
Subject Types
import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';
// Subject - No initial value const subject = new Subject<string>(); subject.next('hello');
// BehaviorSubject - Has initial value const behavior = new BehaviorSubject<string>('initial'); behavior.next('new value');
// ReplaySubject - Replays N values const replay = new ReplaySubject<string>(3); replay.next('one'); replay.next('two');
Service with Subject
@Injectable() export class NotificationService { private messageSubject = new Subject<string>(); public message$ = this.messageSubject.asObservable();
notify(message: string) { this.messageSubject.next(message); } }
// Usage constructor(private notification: NotificationService) { this.notification.message$.subscribe(msg => { console.log('Notification:', msg); }); }
Transformation Operators
// map - Transform values source$.pipe( map(user => user.name) )
// switchMap - Switch to new observable (cancel previous) userId$.pipe( switchMap(id => this.userService.getUser(id)) )
// mergeMap - Merge all results fileIds$.pipe( mergeMap(id => this.downloadFile(id)) )
// concatMap - Sequential processing tasks$.pipe( concatMap(task => this.processTask(task)) )
// exhaustMap - Ignore new while processing clicks$.pipe( exhaustMap(() => this.longRequest()) )
Filtering Operators
// filter - Only pass matching values data$.pipe( filter(item => item.active) )
// first - Take first value data$.pipe(first())
// take - Take N values data$.pipe(take(5))
// takeUntil - Take until condition data$.pipe( takeUntil(this.destroy$) )
// distinct - Filter duplicates data$.pipe( distinct(), distinctUntilChanged() )
// debounceTime - Wait N ms input$.pipe( debounceTime(300), distinctUntilChanged() )
Combination Operators
import { combineLatest, merge, concat, zip } from 'rxjs';
// combineLatest - Latest from all combineLatest([user$, settings$, theme$]).pipe( map(([user, settings, theme]) => ({ user, settings, theme })) )
// merge - Values from any merge(click$, hover$, input$)
// concat - Sequential concat(request1$, request2$, request3$)
// zip - Wait for all zip(form1$, form2$, form3$)
// withLatestFrom - Combine with latest click$.pipe( withLatestFrom(user$), map(([click, user]) => ({ click, user })) )
Error Handling
// catchError - Handle errors data$.pipe( catchError(error => { console.error('Error:', error); return of(defaultValue); }) )
// retry - Retry on error request$.pipe( retry(3), catchError(error => throwError(error)) )
// timeout - Timeout if no value request$.pipe( timeout(5000), catchError(error => of(null)) )
Memory Leak Prevention
Unsubscribe Pattern
private destroy$ = new Subject<void>();
ngOnInit() { this.data$.pipe( takeUntil(this.destroy$) ).subscribe(data => { this.processData(data); }); }
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }
Async Pipe (Preferred)
// Component export class UserComponent { user$ = this.userService.getUser(1);
constructor(private userService: UserService) {} }
// Template - Async pipe handles unsubscribe <div>{{ user$ | async as user }} <p>{{ user.name }}</p> </div>
Advanced Patterns
Share Operator
// Hot observable - Share single subscription readonly users$ = this.http.get('/api/users').pipe( shareReplay(1) // Cache last result );
// Now multiple subscriptions use same HTTP request this.users$.subscribe(users => {...}); this.users$.subscribe(users => {...}); // Reuses cached
Scan for State
// Accumulate state const counter$ = clicks$.pipe( scan((count) => count + 1, 0) )
// Complex state const appState$ = actions$.pipe( scan((state, action) => { switch(action.type) { case 'ADD_USER': return { ...state, users: [...state.users, action.user] }; case 'DELETE_USER': return { ...state, users: state.users.filter(u => u.id !== action.id) }; default: return state; } }, initialState) )
Forkjoin for Multiple Requests
// Parallel requests forkJoin({ users: this.userService.getUsers(), settings: this.settingService.getSettings(), themes: this.themeService.getThemes() }).subscribe(({ users, settings, themes }) => { console.log('All loaded:', users, settings, themes); })
Testing Observables
import { marbles } from 'rxjs-marbles';
it('should map values correctly', marbles((m) => { const source = m.hot('a-b-|', { a: 1, b: 2 }); const expected = m.cold('x-y-|', { x: 2, y: 4 });
const result = source.pipe( map(x => x * 2) );
m.expect(result).toBeObservable(expected); }));
Best Practices
-
Always unsubscribe: Use takeUntil or async pipe
-
Use higher-order operators: switchMap, mergeMap, etc.
-
Avoid nested subscriptions: Use operators instead
-
Share subscriptions: Use share/shareReplay for expensive operations
-
Handle errors: Always include catchError
-
Type your observables: Observable<User> not just Observable
Common Mistakes to Avoid
// ❌ Wrong - Creates multiple subscriptions this.data$.subscribe(d => { this.data$.subscribe(d2 => { // nested subscriptions! }); });
// ✅ Correct - Use switchMap this.data$.pipe( switchMap(d => this.otherService.fetch(d)) ).subscribe(result => { // handled });
// ❌ Wrong - Memory leak ngOnInit() { this.data$.subscribe(data => this.data = data); }
// ✅ Correct - Unsubscribe or async ngOnInit() { this.data$ = this.service.getData(); } // In template: {{ data$ | async }}
Resources
-
RxJS Documentation
-
Interactive Diagrams
-
RxJS Operators