Angular : Top 20 RxJS Operators for Reactive Programming

Sagarnath S
5 min readApr 16, 2023
Angular : RxJS operators

RxJS is a library for reactive programming using Observables that makes it easy to work with asynchronous data streams. There are numerous RxJS operators available that allow us to manipulate, transform and combine observables in various ways.

  1. retry: This operator resubscribes to an observable when an error occurs.
import { of } from 'rxjs';
import { map, tap, retry } from 'rxjs/operators';

const numbers = of(1, 2, 3, 'a', 4, 5);
const mappedNumbers = numbers.pipe(
map(num => num * 2),
tap(value => console.log(`Processing ${value}...`)),
retry(2)
);
mappedNumbers.subscribe(
value => console.log(`Result: ${value}`),
error => console.log(`Error: ${error}`)
);

2. finalize: This operator performs a specified action when the observable completes or errors out.

import { of } from 'rxjs';
import { map, tap, finalize } from 'rxjs/operators';

const numbers = of(1, 2, 3);
const mappedNumbers = numbers.pipe(
map(num => num * 2),
tap(value => console.log(`Processing ${value}...`)),
finalize(() => console.log('Finished processing.'))
);
mappedNumbers.subscribe(
value => console.log(`Result: ${value}`),
error => console.log(`Error: ${error}`)
);

3. delay: This operator delays the emission of source observable values by a specified amount of time. One example use case for this operator is to simulate a slow network response.

import { of } from 'rxjs';
import { delay } from 'rxjs/operators';

const response$ = of('Slow network response').pipe(
delay(5000)
);
response$.subscribe(
response => console.log(response),
error => console.error(error),
() => console.log('Completed')
);

4. race: This operator takes an array of observables and returns an observable that emits the first value emitted by any of the input observables.

import { interval } from 'rxjs';
import { race } from 'rxjs/operators';

const source1$ = interval(1000);
const source2$ = interval(2000);
const source3$ = interval(3000);
const race$ = race([source1$, source2$, source3$]);
race$.subscribe(
value => console.log(`Value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

5. tap: This operator allows you to perform side effects without modifying the emitted values.

import { of } from 'rxjs';
import { tap } from 'rxjs/operators';

const numbers = of(1, 2, 3);
const logNumbers = numbers.pipe(
tap(num => console.log(`Value emitted: ${num}`))
);
logNumbers.subscribe();

6. mergeMap: This operator maps each source value to an inner observable and merges all the inner observables into a single observable..

import { fromEvent } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const button = document.getElementById('click-button');
const clicks$ = fromEvent(button, 'click');
const source$ = clicks$.pipe(
mergeMap(() => from(fetch('https://api/posts')))
);
source$.subscribe(
value => console.log(`Value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

7. switchMap: This operator maps each value to a new observable and switches to the latest observable.

import { fromEvent } from 'rxjs';
import { switchMap } from 'rxjs/operators';

const button = document.getElementById('fetch-button');
const clicks = fromEvent(button, 'click');
const data = clicks.pipe(
switchMap(() => fetch('https://api-call/{api-id}'))
);
data.subscribe(response => {
console.log(response);
});

8. concatMap: This operator maps each source value to an inner observable and concatenates all the inner observables into a single observable.

import { fromEvent } from 'rxjs';
import { concatMap } from 'rxjs/operators';

const button = document.getElementById('click-button');
const clicks$ = fromEvent(button, 'click');
const source$ = clicks$.pipe(
concatMap(() => from(fetch('https://api/posts')))
);
source$.subscribe(
value => console.log(`Value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

9. take: This operator takes a specified number of values from an observable.

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const numbers = interval(1000);
const firstFiveNumbers = numbers.pipe(
take(5)
);
firstFiveNumbers.subscribe(num => console.log(num));

10. takeUntil: This operator takes values from an observable until a notifier observable emits.

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const numbers = interval(1000);
const stop = timer(5000);
const firstFiveNumbers = numbers.pipe(
takeUntil(stop)
);
firstFiveNumbers.subscribe(num => console.log(num));

11. takeLast: This operator emits only the last n values emitted by an observable.

import { of } from 'rxjs';
import { takeLast } from 'rxjs/operators';

const numbers = of(1, 2, 3, 4, 5);
const lastThree = numbers.pipe(
takeLast(3)
);
lastThree.subscribe(
value => console.log(`Value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

12. takeWhile: This operator takes values from an observable while a predicate function returns true.

import { interval } from 'rxjs';
import { takeWhile } from 'rxjs/operators';

const numbers = interval(1000);
const evenNumbers = numbers.pipe(
takeWhile(num => num % 2 === 0)
);
evenNumbers.subscribe(num => console.log(num));

13. debounceTime: This operator waits for a specified amount of time after the last emission before emitting a value.

import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

const input = document.getElementById('search-input');
const search = fromEvent(input, 'input').pipe(
debounceTime(500)
);
search.subscribe(event => {
console.log(event.target.value);
});

14. distinctUntilChanged: This operator filters out values that are the same as the previous value.

import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const numbers = from([1, 2, 2, 3, 3, 3]);
const uniqueNumbers = numbers.pipe(
distinctUntilChanged()
);
uniqueNumbers.subscribe(num => console.log(num));

15. pluck: This operator extracts a property value from each emitted object.

import { from } from 'rxjs';
import { pluck } from 'rxjs/operators';

const users = from([
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 30 },
{ name: 'Charlie', age: 35 }
]);
const names = users.pipe(
pluck('name')
);
names.subscribe(name => console.log(name));

16. shareReplay: This operator shares the same source observable among multiple subscribers and replays a specified number of emissions to new subscribers.

import { Observable } from 'rxjs';
import { shareReplay } from 'rxjs/operators';

const source$ = new Observable(observer => {
console.log('Source Observable created.');
observer.next(1);
observer.next(2);
observer.next(3);
});
const sharedSource$ = source$.pipe(
shareReplay(2)
);
sharedSource$.subscribe(
value => console.log(`Subscriber A: ${value}`)
);
sharedSource$.subscribe(
value => console.log(`Subscriber B: ${value}`)
);

17. exhaustMap: This operator ignores new source emissions while the inner observable is still emitting. Once the inner observable completes, it uses the latest source value to create a new inner observable.

import { interval, fromEvent } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

const button = document.getElementById('click-button');
const clicks$ = fromEvent(button, 'click');
const source$ = clicks$.pipe(
exhaustMap(() => interval(1000).pipe(take(3)))
);
source$.subscribe(
value => console.log(`Value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

18. distinctUntilKeyChanged: This operator filters out consecutive duplicate values emitted by the source observable based on a specified key.

import { from } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';

interface Person {
name: string;
age: number;
}
const persons: Person[] = [
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 30 },
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 35 },
{ name: 'Alice', age: 30 }
];
const source$ = from(persons);
const distinct$ = source$.pipe(distinctUntilKeyChanged('name'));
distinct$.subscribe(
value => console.log(`Value: ${JSON.stringify(value)}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

19. skip: This operator skips a specified number of values emitted by the source observable and emits the rest.

import { interval } from 'rxjs';
import { skip } from 'rxjs/operators';

const source$ = interval(1000);
const skip$ = source$.pipe(skip(3));
skip$.subscribe(
value => console.log(`Value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Completed')
);

20. partition: This operator separates the source observable's values into two separate observables based on a specified condition.

import { of } from 'rxjs';
import { partition } from 'rxjs/operators';

const source$ = of(1, 2, 3, 4, 5, 6);
const [even$, odd$] = source$.pipe(partition(num => num % 2 === 0));
even$.subscribe(
value => console.log(`Even value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Even completed')
);
odd$.subscribe(
value => console.log(`Odd value: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log('Odd completed')
);

Thank you for reading if you have any doubts. drop the message in comments.

Follow me on Medium or LinkedIn to read more about Angular and TS!

--

--

Sagarnath S

Software Development Engineer - I @CareStack || Web Developer || Angular || ASP.Net Core || C# || TypeScript || HTML || CSS