Introduction to Observables
In Angular, Observables are a core concept for managing asynchronous data streams. They represent sequences of values that are emitted over time, allowing components and services to react to data changes dynamically. Observables are part of the RxJS (Reactive Extensions for JavaScript) library, which provides powerful operators for transforming, combining, and managing data streams efficiently.
Angular’s HttpClient, Forms, and Event systems all rely heavily on Observables. Instead of handling asynchronous operations with traditional callbacks or Promises, Observables provide a more flexible and composable way to manage data.
What is an Observable?
An Observable is like a data pipeline that continuously emits values. You can think of it as a data stream that your component can “listen” to.
When you subscribe to an Observable, it starts emitting data until it either completes or encounters an error.
Example: Basic Observable
import { Observable } from 'rxjs';
const data$ = new Observable(observer => {
observer.next('Hello');
observer.next('World');
observer.complete();
});
data$.subscribe(value => console.log(value));
Output:
Hello
World
Here, observer.next()
sends data to subscribers, observer.complete()
indicates that the stream has ended, and the subscriber logs each emitted value.
Observables vs Promises
Although Observables and Promises both handle asynchronous data, there are key differences between them.
Promises:
- Handle one value asynchronously.
- Cannot be cancelled once started.
- Execute immediately when created.
Observables:
- Can emit multiple values over time.
- Are lazy — they only start when subscribed to.
- Can be cancelled or unsubscribed.
- Provide many RxJS operators for data transformation.
Example Comparison:
// Promise
const promise = new Promise(resolve => {
setTimeout(() => resolve('Promise resolved!'), 1000);
});
promise.then(console.log);
// Observable
import { Observable } from 'rxjs';
const observable = new Observable(observer => {
let count = 0;
const interval = setInterval(() => {
observer.next(Observable value: ${++count}
);
if (count === 3) {
observer.complete();
clearInterval(interval);
}
}, 1000);
});
observable.subscribe(console.log);
The Observable emits values multiple times, unlike the Promise which resolves once.
Creating Custom Observables
You can create your own custom Observables using the Observable
constructor.
Example:
import { Observable } from 'rxjs';
const numbers$ = new Observable<number>(observer => {
for (let i = 1; i <= 5; i++) {
observer.next(i);
}
observer.complete();
});
numbers$.subscribe({
next: value => console.log(Value: ${value}
),
complete: () => console.log('Stream completed.')
});
Output:
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Stream completed.
This demonstrates how you can push multiple values to subscribers.
Subscription and Unsubscription
When you subscribe to an Observable, Angular returns a Subscription object. This allows you to manually unsubscribe when the component is destroyed, preventing memory leaks.
Example:
import { Component, OnDestroy } from '@angular/core';
import { interval, Subscription } from 'rxjs';
@Component({
selector: 'app-counter',
template: <p>{{ counter }}</p>
})
export class CounterComponent implements OnDestroy {
counter = 0;
private subscription: Subscription;
constructor() {
const counter$ = interval(1000);
this.subscription = counter$.subscribe(value => this.counter = value);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
Here, the interval()
function emits increasing numbers every second. When the component is destroyed, we unsubscribe to stop the stream.
Common RxJS Creation Functions
RxJS provides various helper functions to create Observables easily:
Function | Description |
---|---|
of() | Emits a fixed sequence of values. |
from() | Converts an array or Promise into an Observable. |
interval() | Emits values at regular intervals. |
timer() | Emits values after a delay or periodically. |
fromEvent() | Creates an Observable from DOM events. |
Example:
import { of, from, interval, timer, fromEvent } from 'rxjs';
const of$ = of(10, 20, 30);
of$.subscribe(v => console.log('of:', v));
const from$ = from([1, 2, 3, 4]);
from$.subscribe(v => console.log('from:', v));
const interval$ = interval(1000);
interval$.subscribe(v => console.log('interval:', v));
const timer$ = timer(2000, 1000);
timer$.subscribe(v => console.log('timer:', v));
const click$ = fromEvent(document, 'click');
click$.subscribe(() => console.log('Clicked!'));
Handling Data with Operators
RxJS provides operators to transform and manipulate data streams. Common operators include:
Operator | Description |
---|---|
map() | Transforms emitted values. |
filter() | Filters values based on conditions. |
tap() | Performs side effects like logging. |
mergeMap() | Flattens inner Observables. |
switchMap() | Switches to a new Observable. |
Example with map() and filter():
import { from } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const numbers$ = from([1, 2, 3, 4, 5, 6]);
numbers$
.pipe(
filter(num => num % 2 === 0),
map(num => num * 10)
)
.subscribe(result => console.log(result));
Output:
20
40
60
Error Handling in Observables
Errors can occur during data streams, and RxJS provides operators like catchError()
and retry()
to handle them.
Example:
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
const data$ = throwError(() => new Error('Something went wrong!'));
data$
.pipe(
catchError(err => {
console.error(err.message);
return of('Fallback data');
})
)
.subscribe(value => console.log(value));
Output:
Something went wrong!
Fallback data
Combining Multiple Observables
You can merge or combine streams using operators like merge()
, combineLatest()
, and forkJoin()
.
Example using combineLatest():
import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';
const obs1$ = interval(1000);
const obs2$ = interval(1500);
combineLatest([obs1$, obs2$])
.pipe(map(([val1, val2]) => Obs1: ${val1}, Obs2: ${val2}
))
.subscribe(console.log);
This emits combined results whenever either stream updates.
AsyncPipe in Angular Templates
Angular’s AsyncPipe automatically subscribes to an Observable and updates the template when new data arrives.
Example:
// data.service.ts
import { Injectable } from '@angular/core';
import { Observable, of } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class DataService {
getData(): Observable<string[]> {
return of(['Angular', 'RxJS', 'TypeScript']);
}
}
// app.component.ts
import { Component } from '@angular/core';
import { DataService } from './data.service';
@Component({
selector: 'app-root',
template: `
<ul>
<li *ngFor="let item of dataService.getData() | async">{{ item }}</li>
</ul>
`
})
export class AppComponent {
constructor(public dataService: DataService) {}
}
No manual subscription or unsubscription is required — AsyncPipe handles it automatically.
Unsubscribing Automatically with takeUntil()
Manual unsubscription can be tedious, especially for multiple Observables. The takeUntil()
operator helps automatically complete subscriptions.
Example:
import { Component, OnDestroy } from '@angular/core';
import { Subject, interval, takeUntil } from 'rxjs';
@Component({
selector: 'app-auto-unsubscribe',
template: <p>Check console logs for data stream updates</p>
})
export class AutoUnsubscribeComponent implements OnDestroy {
private destroy$ = new Subject<void>();
constructor() {
interval(1000)
.pipe(takeUntil(this.destroy$))
.subscribe(val => console.log('Stream value:', val));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
This ensures that the Observable stops when the component is destroyed.
Best Practices for Using Observables
- Always unsubscribe manually or use
AsyncPipe
ortakeUntil()
. - Prefer using operators over nested subscriptions.
- Use strong typing for Observables to prevent runtime errors.
- Handle errors gracefully with
catchError()
. - Use BehaviorSubject or ReplaySubject for managing shared state.
Leave a Reply