Custom RxJS Operators to Improve your Angular Apps

tunjiadeyeri

Tunji Adeyeri

Posted on September 1, 2024

Custom RxJS Operators to Improve your Angular Apps

RxJS is a powerful library for reactive programming, and one of its key features being the ability to create custom operators. In this guide, we'll look at some handy custom operators and show how to implement them.

1. withLoading

Create a custom switchMap that seamlessly manages loading states for each stream.

import { Observable, of } from 'rxjs';
import { map, catchError, startWith, finalize } from 'rxjs/operators';

interface WithLoadingResult<T> {
  loading: boolean;
  data?: T;
  error?: any;
}

export function withLoading<T>() {
  return (source: Observable<T>) =>
    source.pipe(
      startWith({ loading: true }),
      map((data) => ({ loading: false, data })),
      catchError((error) => of({ loading: false, error })),
      finalize(() => ({}))
    );
}

// Usage
someObservable$.pipe(withLoading()).subscribe(({ loading, data, error }) => {
  if (loading) {
    console.log('Loading...');
  } else if (error) {
    console.error('Error:', error);
  } else {
    console.log('Data:', data);
  }
});
Enter fullscreen mode Exit fullscreen mode

2. debounceAndDistinct

Effectively manage user input, minimizing unnecessary API requests. We can achieve this by combining debounce and distinctUntilChanged

import { Observable } from 'rxjs';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';

export function debounceAndDistinct<T>(time: number = 300) {
  return (source: Observable<T>) =>
    source.pipe(
      debounceTime(time),
      distinctUntilChanged()
    );
}

// Usage
searchInput$.pipe(debounceAndDistinct()).subscribe((value) => {
  // Perform search with debounced and distinct value
});
Enter fullscreen mode Exit fullscreen mode

3. retryWithBackoff

Apply a backoff strategy for retrying failed requests more intelligently.

import { Observable, throwError, timer } from 'rxjs';
import { mergeMap, retryWhen } from 'rxjs/operators';

export function retryWithBackoff(
  maxRetries: number = 3,
  backoffTime: number = 1000
) {
  return (source: Observable<any>) =>
    source.pipe(
      retryWhen((errors) =>
        errors.pipe(
          mergeMap((error, index) => {
            const retryAttempt = index + 1;
            if (retryAttempt > maxRetries) {
              return throwError(error);
            }
            console.log(`Retry attempt ${retryAttempt}: retrying in ${backoffTime}ms`);
            return timer(backoffTime * retryAttempt);
          })
        )
      )
    );
}

// Usage
apiCall$.pipe(retryWithBackoff()).subscribe(
  (data) => console.log('Success:', data),
  (error) => console.error('Error:', error)
);
Enter fullscreen mode Exit fullscreen mode

4. cachingOperator

Cache API responses to reduce server load and improve efficiency.

import { Observable, of } from 'rxjs';
import { tap, shareReplay } from 'rxjs/operators';

export function cachingOperator<T>(cacheTime: number = 60000) {
  let cachedData: T;
  let cachedTime: number;

  return (source: Observable<T>) =>
    new Observable<T>((observer) => {
      if (cachedData && Date.now() - cachedTime < cacheTime) {
        observer.next(cachedData);
        observer.complete();
      } else {
        source
          .pipe(
            tap((data) => {
              cachedData = data;
              cachedTime = Date.now();
            }),
            shareReplay(1)
          )
          .subscribe(observer);
      }
    });
}

// Usage
apiCall$.pipe(cachingOperator()).subscribe((data) => console.log('Data:', data));
Enter fullscreen mode Exit fullscreen mode

5. progressiveLoading

Load data incrementally, emitting partial results to enhance perceived performance.

import { Observable } from 'rxjs';
import { expand, take, map } from 'rxjs/operators';

export function progressiveLoading<T>(
  pageSize: number = 10,
  maxItems: number = 100
) {
  return (source: Observable<T[]>) =>
    source.pipe(
      expand((items, index) =>
        items.length < maxItems
          ? source.pipe(
              map((newItems) => [...items, ...newItems.slice(0, pageSize)])
            )
          : []
      ),
      take(Math.ceil(maxItems / pageSize))
    );
}

// Usage
apiCall$.pipe(progressiveLoading()).subscribe((partialData) => {
  console.log('Partial data:', partialData);
});
Enter fullscreen mode Exit fullscreen mode

6. errorHandlingOperator

Centralize error management for consistent and streamlined error handling.

import { Observable, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

export function errorHandlingOperator<T>() {
  return (source: Observable<T>) =>
    source.pipe(
      catchError((error) => {
        console.error('An error occurred:', error);
        // You can add custom error handling logic here
        return throwError('Something went wrong. Please try again later.');
      })
    );
}

// Usage
apiCall$.pipe(errorHandlingOperator()).subscribe(
  (data) => console.log('Data:', data),
  (error) => console.error('Handled error:', error)
);
Enter fullscreen mode Exit fullscreen mode

7. optimisticUpdate

Immediately update the UI before API confirmation, with a rollback in case of failure.

import { Observable, of } from 'rxjs';
import { map, catchError, switchMap } from 'rxjs/operators';

export function optimisticUpdate<T, R>(
  updateFn: (data: T) => T,
  apiCall: (data: T) => Observable<R>
) {
  return (source: Observable<T>) =>
    source.pipe(
      map((data) => ({ optimistic: updateFn(data), original: data })),
      switchMap(({ optimistic, original }) =>
        apiCall(optimistic).pipe(
          map(() => optimistic),
          catchError(() => {
            console.warn('API call failed. Reverting to original data.');
            return of(original);
          })
        )
      )
    );
}

// Usage
const updateTodo = (todo: Todo): Todo => ({ ...todo, completed: true });
const apiUpdateTodo = (todo: Todo): Observable<Todo> => // API call implementation

todoStream$.pipe(optimisticUpdate(updateTodo, apiUpdateTodo))
  .subscribe((updatedTodo) => console.log('Updated todo:', updatedTodo));
Enter fullscreen mode Exit fullscreen mode

8. throttleAndBuffer

Use throttle combined with buffer to batch updates and manage data streams effectively.

import { Observable } from 'rxjs';
import { buffer, throttleTime } from 'rxjs/operators';

export function throttleAndBuffer<T>(time: number = 1000) {
  return (source: Observable<T>) =>
    source.pipe(
      buffer(source.pipe(throttleTime(time))),
      filter((batch) => batch.length > 0)
    );
}

// Usage
dataStream$.pipe(throttleAndBuffer()).subscribe((batch) => {
  console.log('Batched updates:', batch);
});
Enter fullscreen mode Exit fullscreen mode

9. conditionalMerge

Merge multiple observables based on dynamic conditions for flexible data combination.

import { Observable, merge } from 'rxjs';
import { filter } from 'rxjs/operators';

export function conditionalMerge<T>(
  ...sources: Array<[Observable<T>, (value: T) => boolean]>
) {
  return merge(
    ...sources.map(([source, condition]) =>
      source.pipe(filter(condition))
    )
  );
}

// Usage
const source1$ = of(1, 2, 3, 4);
const source2$ = of('a', 'b', 'c', 'd');

conditionalMerge(
  [source1$, (value) => value % 2 === 0],
  [source2$, (value) => ['a', 'c'].includes(value)]
).subscribe((value) => console.log('Merged value:', value));
Enter fullscreen mode Exit fullscreen mode

10. smartPolling

Implement adaptive polling intervals that respond to data changes or user activity.

import { Observable, timer } from 'rxjs';
import { switchMap, tap, distinctUntilChanged } from 'rxjs/operators';

export function smartPolling<T>(
  pollFn: () => Observable<T>,
  baseInterval: number = 5000,
  maxInterval: number = 60000
) {
  let currentInterval = baseInterval;
  let lastValue: T;

  return new Observable<T>((observer) => {
    const subscription = timer(0, baseInterval)
      .pipe(
        switchMap(() => pollFn()),
        tap((value) => {
          if (JSON.stringify(value) !== JSON.stringify(lastValue)) {
            currentInterval = baseInterval;
          } else {
            currentInterval = Math.min(currentInterval * 2, maxInterval);
          }
          lastValue = value;
        }),
        distinctUntilChanged()
      )
      .subscribe(observer);

    return () => subscription.unsubscribe();
  });
}

// Usage
const pollApi = (): Observable<Data> => // API call implementation

smartPolling(pollApi).subscribe((data) => console.log('Polled data:', data));
Enter fullscreen mode Exit fullscreen mode

11. filterOnlyPresent

Filters out null or undefined values from the stream, ensuring only valid data is emitted.

import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';

export function filterOnlyPresent<T>() {
  return (source: Observable<T>) =>
    source.pipe(
      filter((value): value is NonNullable<T> => value !== null && value !== undefined)
    );
}

// Usage
someStream$.pipe(filterOnlyPresent()).subscribe((value) => {
  console.log('Non-null value:', value);
});
Enter fullscreen mode Exit fullscreen mode

12. filterOnlyPropertyPresent

Emits only when a specified property in the returned object is neither null nor undefined.

import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';

export function filterOnlyPropertyPresent<T>(prop: keyof T) {
  return (source: Observable<T>) =>
    source.pipe(
      filter((value) => value[prop] !== null && value[prop] !== undefined)
    );
}

// Usage
interface User {
  id: number;
  name: string | null;
}

userStream$.pipe(filterOnlyPropertyPresent('name')).subscribe((user) => {
  console.log('User with name:', user);
});
Enter fullscreen mode Exit fullscreen mode

These custom operators can greatly improve your RxJS workflows, making your code more efficient and cleaner. Be sure to thoroughly test these operators in your specific scenarios and adjust them as needed.

💖 💪 🙅 🚩
tunjiadeyeri
Tunji Adeyeri

Posted on September 1, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related