Demystifying RxJS for Angular Developers

Demystifying RxJS for Angular Developers

  • 390

Demystifying RxJS for Angular Developers

Understand Enough RxJS for your basic Angular needs!

One of the biggest changes that the Angular team introduced with the complete rewrite of Angular was the adoption of RxJS and Functional Reactive Programming as a core part of the framework.

For many, it was the holy grail. But for others, instead, it was an obstacle towards adopting and learning Angular — possibly a reason to say away from the framework.

I want to help you demystify Rx and learn enough about it to have a smooth learning experience with Angular.

Is RxJS difficult?

RxJS is undoubtedly a big shift compared to how we used to write code in Angular.js. At first, it is scary for new people to digest and developers mostly hate things they don’t know.

On top of that, at the time of writing, RxJS provides 125 operators — that’s quite a lot! How the heck can you learn Angular and Rx if I have to remember and learn all these operators?

Well, the good news is, you don’t have to learn all the operators. In fact, the amount of operators you will use daily is no more than a dozen. Also — many operators are simple variations of a base operator: they may accept different arguments, or change slightly in behavior, but ultimately they share many similarities to one another.

At the end of this article, you won’t be writing observables like a wizard — but you will have probably gained enough knowledge to start writing and learning enough Angular code with the help of Observables.

Is RxJS required to learn and use Angular?

Kind of.

You don’t need to know everything about it to perform common operations such as fetching data over HTTP requests or retrieving the query parameters of the current page, or handling very complex asynchronous code.

With that said, not knowing some crucial parts of how Observables work, and how they differ from Promises, will probably result in your code being inefficient and buggy.

The Basics of RxJS

RxJS is a library that helps to deal with asynchronous code. Alright, that is a simplified statement — but it’s enough for you to wrap your head around it.

The most important concepts you need to know about Rx are:

  • Observables
  • Observers
  • Subscriptions

An Observable is a stream of values emitted over time. In code, it is often represented with the dollar sign $ as a prefix.

For example, by using the fromEvent operator, we can create an observable that listen to changes to a DOM element element on input events.

We will also provide an Observer, which is the handler executed when the observable emits notifications; finally, we create a Subscription, which we will use to unsubscribe the observable after 2 seconds.

// values$ is the Observable
const values$ = fromEvent(element, 'input');
// observer is the Observer
const observer = {
    next(value) {
        console.log(value);
    }
};
// subscription is the Subscription
const subscription = values$.subscribe(observer);

Notice: an observable won’t start emitting notifications until we subscribe to it! Which means you don’t just have to define your pipeline, but you also need to provide an Observer and pass it to the subscribe function.

Also notice: unsubscribing observables is very important to avoid memory leaks and possible bugs. You can do it by calling the .unsubscribe method on the subscription object, but we will soon see another more declarative way of doing the same.

Now, let’s rewrite this with plain DOM Javascript to see the differences:

const handler = (value) => {
    console.log(value);
};
const listener = document.addEventEventListener(
   element, 'input', handler
);

Even though this is an extremely simple situation, as you can see, the differences are not striking.

Now, let’s make this code slightly more complex, so we can appreciate a little bit more what Observables bring to the table. We want to:

  • retrieve the values from the value of the DOM element
  • filter some of them, and transform them to another value
  • stop the listeners after 2 seconds

Let’s start with the DOM example this time:

const handler = (event) => {
    const value = event.target.value.toUpperCase();
    if (!value.trim()) {
        return;
    }
    
    console.log(value);
};
const listener = document.addEventEventListener(
   element, 'input', handler
);
setTimeout(() => listener(), 2000);

And now, let’s replicate the example with RxJS:

import { map, filter, takeUntil } from 'rxjs/operators';
import { timer } from 'rxjs';
const values$ = fromEvent(element, ‘input’).pipe(
    map((event) => event.target.value.toUpperCase()),
    filter((value) => Boolean(value.trim())),
    takeUntil(timer(2000))
);
values$.subscribe(console.log);

There’s some magic we need to explain there. First of all, we introduced 3 operators and one creational operator.

  • Rx allows us to transform and manipulate the data coming from the notifications thanks to the powerful operators that operate like a pipeline
  • every notification goes through each operator we define. Operators can do different things: transform, filter, unsubscribe, etc.

The first two operators should be familiar: you probably use them extensively with Javascript arrays:

  • map take a value as a parameter and return a new value
  • filter will take the value (transformed by the previous operator)

Finally, takeUntil is an operator that accepts another observable: when the observable passed in input emits a value, then the observable will be unsubscribed reactively.

What does timer do? This is a creational operator, as in, it creates a stream of events that will emit every 2 seconds. That is, once it emits, it will notify the operator takeUntil to unsubscribe the stream.

RxJS in Angular

The Angular team baked in Rx in the very core of the framework. As a result. we will be using observables in common operations you’ll probably be using every day.

Http

One of the most common situations where you’ll be faced with Observables is the HTTP module.

In fact, every HTTP request is an Observable, with one difference: HTTP requests will automatically complete and do not require us to unsubscribe the observable.

class ApiService {
  constructor(private http: HttpClient) {}
  getUsers() {
      return this.http.get('users');
  }
}
// Somewhere in your component
this.api.getUsers().subscribe(users => {
  // do something with users
});

Catching errors

In order to catch errors, which is a very common situation while working with HTTP requests, we can use the operator catchError and handle the response gracefully.

This operator needs a new observable to be returned, which means that most times we may want to pass a value wrapped within a creational operator, of. This operator allows wrapping a value in an observable that will synchronously emit the value. The observable will complete right away.

this.api.getUsers().pipe(
  catchError((error: HttpErrorResponse) => {
      return of([]);
  })
).subscribe(users => {
    // do something with users
});

In the snippet above, we attempt to retrieve users using an HTTP call. If this fails, we gracefully handle the error by returning an empty list.

Forms

Angular reactive forms are some of the most powerful tools that the framework provides, and their reactive nature is what makes them so great.

Every Angular FormControl receives notifications when its value or status gets changed thanks to the observables valueChanges and statusChanges.

const control = new FormControl('');
control.valueChanges.subscribe((value) => {
    console.log('Hey, I changed! My value is:', value);
});
control.statusChanges.subscribe((status) => {
    console.log('Hey, I changed! My status is:', status);
});

You can also use the sync values attached to the FormControl instance, value and status, but there are instances where you should be preferring their asynchronous counterparts for a few reasons:

  • if you’re using ChangeDetectionStrategy.OnPush, and you want your templates to re-render automatically as the value or the status of a form change, you can leverage the observables and the async pipe and automatically update the view
  • you want the values to be transformed and filtered by creating a new stream
  • merge multiple values into one single stream. For example, a stream canSubmitForm$ based on both the value and the status of the form

Async Pipe

As we have just mentioned this, we cannot ignore the magicalasync pipe. Rather than creating a static property within the component that will be set within the observer, you can reference and subscribe your observables directly within the templates.

// component
this.value$ = control.valueChanges$.pipe(
    map((value) => value.toUpperCase()),
);
// template
<span>{{ value$ | async }}</span>

Of course, this works with any observable: HTTP requests, routing parameters, WebSocket streams, etc.

The other great thing about the aysnc pipe is that it cleans up after itself: it will unsubscribe from the observable for you when the component is destroyed, so you don’t need to do it at the component level.

EventEmitter

In order to pass messages between components in the same view, Angular makes use of EventEmitter, that is an extension of RxJS Subjects.

What is a Subject? A Subject is different from an Observable as its value can also be pushed in from the outside.

That means, you can subscribe to a Subject and retrieve its values over time, but you can also push values into its stream. Subjects can be particularly useful for state management.

In the example below, we can see the usage of an EventEmitter used to dispatch outputs from a component.

@Component({
  template: `
    <button (click)="onCancel.emit()">Cancel</button>
 `
})
class CancelButtonComponent {
  @Output() onCancel = new EventEmitter();
}

Unsubscribing Observables

Unsubscribing observables is one of the most ignored things I see from code written by beginners. Of course, most of us are simply not used to doing it as Promises don’t need to be unsubscribed.

But, as we said, Observables are values emitted over time — and thus, if we don’t stop them at some point, they will keep emitting notifications, which can lead to memory leaks or even unwanted bugs.

Not all observables, though, need unsubscribing. Some Observables will, in fact, unsubscribe themselves automatically. Let’s see some examples.

of

Already seen in the example above, of is the simplest creational operator. It will take a value and wrap it in an observable. Once it’s finished, it will complete.

const stream$ = of(0, 1, 2, 3);
const subscription = stream$.subscribe((n) => {
  console.log(n);
});
console.log(subscription.closed);
// Output
// 0
// 1
// 2
// 3
// true

Similarly, HTTP calls, Event Emitters, and Async Pipe do not need to be unsubscribed.

Infinite Observables

Some, probably most, observables will listen to a source of data, such as DOM events, WebSocket streams, timers and intervals, Subjects, NGRX Store selections, etc. that will keep emitting notifications until they are unsubscribed.

This is the sort of observables that can get you in trouble. A common error is selecting a slice of data from an NGRX Store without unsubscribing it:

getData() {
    return this.store.pipe(select(myData));
}

Every time we call the method getData we create a new subscription. If we don’t unsubscribe from it, we will create lots of subscriptions that will emit notifications to the observer. Not only this is a memory leak, but it can also create bugs.

In this sort of scenarios, we can use the operator take:

getData() {
    return this.store.pipe(
      select(myData),
      take(1)
   );
}

This operator will take n emissions before unsubscribing. This ensures that we never create multiple subscriptions.

Promise Vs Observable

Last but not least, let’s talk about the differences between a Promise and an Observable.

First, let’s mention the one thing they have in common: they both help with asynchronous code.

In what whey though?

  • Promises are a one-time value
  • Observables are a stream of values over time. RxJS is a reactive extension on top of Observables: thanks to its powerful list of operators, Observables can do a lot more than promises (for example, cancellation)

RxJS helps to work with existing code written with promises by providing a set of utilities. In fact, we can:

  • retrieve a value from a Promise
  • return the value of an observable as a Promise

Transforming a Promise into an Observable

In many instances, for instance, if you are upgrading from the previous version of Angular, you may have some code written using entirely Promises.

It can be a pain to mix two different techniques for handling async code: one of the things you could do is to transform a Promise into an Observable so that you can more easily refactor the service when it will be upgraded.

const request = this.legacyService.getUsers();
const observable$ = fromPromise(request);
observable$.subscribe(console.log);

Transforming an Observable into a Promise

In some other cases, it can be useful to turn Observables into Promises:

  • to adhere to a 3rd party API, or legacy code, that only accepts promises
  • leverage the async/await syntax to resolve multiple values and write them in a synchronous fashion
async submit() {
  const users = await this.http.get('/users').toPromise();
  const result = await this.http.post('/data', ).toPromise();
  console.log(result);
}

Of course, in a real-world scenario, you may want to do some error handling, in which case the snippet would become a bit terser.

Final Words

Learning RxJS may take some work, and probably will need you to change your perspective on a few concepts. The number of its operators may seem scary and steep — but don’t let that keep you from learning it: in reality, you don’t need to learn them all, and once you learn the most common ones, you will have enough knowledge to write most Angular tasks.

With that said, learning to master RxJS will surely pay off and make you a better Angular developer, bu writing more declarative, reactive and efficient code.