RxJS is a library that combines the Observer pattern—where Observables or Subjects notify Observers of state changes—with functional programming to handle data changes over time.
When programming with RxJS, you are usually:
-
Creating Observables to subscribe later
-
Creating new Observables by piping existing Observables
-
Subscribing to Observables to get data
As Observables are “the most basic building block of RxJS," having a solid understanding of Observables is paramount to becoming proficient in RxJS.
In this blog post, you’ll learn what an Observable is, the different types of Observables and how they are helpful.
What is an Observable?
In essence, Observables represent data (or a state) that changes over time. Observables have two critical methods you should know about: subscribe and pipe.
Observable.subscribe()
subscribe
will start the execution of an Observable and provides a way to observe Observable emitted notifications by passing Observer handlers.
The Observer argument of subscribe
allows you to react when the Observable emits a new value, errors, or completes through next
, error
, and complete
callbacks.
Each subscription to an Observable will create a new execution of the Observable.
// emit sequential numbers every second and console.log that number
interval(1000).subscribe({
next: (value) => console.log(value),
error: () => {},
complete: () => {},
});
// if you don't care about error or complete, you can pass only the next function:
interval(1000).subscribe((value) => console.log(value));
Observable.pipe()
The sole reason the pipe
method exists is to improve readability.
Without pipe
, composing Observable streams with operators would be a nightmare. Operators are pure functions that take a source Observable and return a new Observable.
Imagine writing the following code to compose an Observable Stream:
/**
* 1. interval - emit sequential numbers every second
* 2. map - multiply emitted values by 2
* 3. map - emit a string `Result: ${x}`
* 4. tap - log every emitted value
*/
tap((value) => console.log(value))(
map((x) => `Result: ${x}`)(map((x: number) => x * 2)(interval(1000)))
).subscribe();
Albeit the code above is valid, the pipe
method is always preferred.
Using the pipe
method, the code becomes more readable:
/**
* 1. interval - emit sequential numbers every second
* 2. map - multiply emitted values by 2
* 3. map - emit a string `Result: ${x}`
* 4. tap - log every emitted value
*/
interval(1000)
.pipe(
map((x) => x * 2),
map((x) => `Result: ${x}`)
)
.subscribe((value) => console.log(value));
What is a Subject?
The Subject class extends the Observable class to allow multicasting emissions to all Observers - in other words: each new subscription to a Subject will not create a new execution of the Subject.
Subjects keep track of all their Observers.
Subject.next(), Subject.error() and Subject.complete()
A Subject's next
, error
, and complete
methods will call all of its Observers' next
, error
, and complete
methods.
const mySubject1 = new Subject();
const mySubject2 = new Subject();
mySubject1.subscribe({
next: (value) => console.log(`Subject 1 Next: ${value}`),
error: (error) => console.log(`Subject 1 Error: ${error}`),
});
mySubject1.next('A Value');
mySubject1.error('Something bad happened');
mySubject2.subscribe({
next: (value) => console.log(`Subject 2 Next: ${value}`),
complete: () => console.log(`Subject 2 Complete`),
});
mySubject2.complete();
Subject.asObservable()
The
asObservable
method is useful when you want to hide the Subject’s methods when exposing it to consumers that should only use it as an Observable.
For example, you might expose the Subject in a service as an Observable, so components using that service will not be able to call the next method.
// MyService file
const mySubject = new Subject<MyType>();
public getSubject(): Observable<MyType> {
return this.mySubject.asObservable();
}
// MyComponent file
myService.getSubject().next();
// Property 'next' does not exist on type 'Observable<MyType>'.
What is a BehaviorSubject?
constructor(value: T)
BehaviorSubject extends Subject to ensure late subscribers are notified with the latest emitted value.
A BehaviorSubject differs from a Subject in two ways.
-
BehaviorSubject requires an initial value passed in its constructor
-
Observers will receive the latest emitted value when they subscribe to a BehaviorSubject
const myBehaviorSubject = new BehaviorSubject('Initial Value');
myBehaviorSubject.subscribe((value) =>
console.log(`First Subscription: ${value}`)
);
myBehaviorSubject.next('Second Value');
// a late subscription will receive the latest emitted value
myBehaviorSubject.subscribe((value) =>
console.log(`Late Subscription: ${value}`)
);
What is a ReplaySubject?
constructor(_bufferSize: number = Infinity, _windowTime: number = Infinity, _timestampProvider: TimestampProvider = dateTimestampProvider)
ReplaySubject also extends Subject. ReplaySubject behaves similarly to BehaviorSubject but is capable of emitting multiple values when subscribed to.
ReplaySubject differs from BehaviorSubject in three ways:
-
A ReplaySubject does not have an initial value.
-
A ReplaySubject emits the last
bufferSize
emissions when Observers subscribe to it. Values are held in a buffer forwindowTime
. -
A ReplaySubject still replays values for Observers even if it has been completed with an error.
const myReplaySubject = new ReplaySubject<number>();
myReplaySubject.subscribe({
next: (value) => console.log(`ReplaySubject Next: ${value}`),
error: (error) => console.log(`ReplaySubject Error: ${error}`),
complete: () => console.log(`ReplaySubject Complete`),
});
myReplaySubject.next(1);
myReplaySubject.next(2);
myReplaySubject.next(3);
myReplaySubject.error('Something bad happened');
// late subscription
myReplaySubject.subscribe({
next: (value) => console.log(`ReplaySubject Next: ${value}`),
error: (error) => console.log(`ReplaySubject Error: ${error}`),
complete: () => console.log(`ReplaySubject Complete`),
});
What is an AsyncSubject?
An AsyncSubject only emits the last value to its Observers when it completes. That’s it.
const myAsyncSubject = new AsyncSubject();
myAsyncSubject.subscribe({
next: (value) => console.log(`AsyncSubject Next: ${value}`),
complete: () => console.log(`AsyncSubject Complete`),
});
myAsyncSubject.next('First Value');
myAsyncSubject.next('Second Value');
setTimeout(() => myAsyncSubject.complete(), 3000);
On The Road to Mastering RxJS
Now that you know how to use all the Observable Types—Observable, Subject, BehaviorSubject, ReplaySubject, and AsyncSubject—we highly recommend taking a look at the operators' list in the official RxJS documentation, so you are at least exposed to most operators and know there’s a solution to every problem you are ever going to face.
Finally, Bitovi offers a free RxJS course with great introductory and advanced content. You don't want to miss it. 👀
Are Subjects a tricky subject for you?
Bitovi has expert Angular consultants ready to help you tackle your most challenging RxJS questions. Schedule your free consultation today!