RxJS Custom Operator

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. It is highly used in development of FE applications and one of the most common API where it is enclosed is known as RxJS.
Use Case
We want to be able to apply a transformation function on each element of an emitted array object through our stream.
Implementation
Either by reverse engineering one of the already provided operators or by experimenting yourselves, our implementation will require from us some new concrete implementations of a series of important steps. So, one of the approach would be to create the following objects:
- An operator function
Let me explain…
Our arrayMap function takes as input a projection function, which in our case is any mapping function (i.e x => x.toUpperCase()):
project: ProjectFn<T, R>
where ProjectFn is
type ProjectFn<T, R> = (value: T, index: number) => R;
and replies with a function that takes as input an Observable, and replies with an Observable (we use a helper function for this):
return (source: Observable<T[]>) =>
source.lift(new MyOperator(project))
Note 1: Beware of the T[] type i.e we specifically expect an array or iterable for our Observables.
Note 2: lifting is an important concept mostly used in Functional Programming during which functions that operate on specific type of inputs can be converted to functions that operate on different type of inputs performing similar type of operations i.e
from f: a -> b ===> to f: List[a] -> List[b]
Moving on…
2. An Operator
Simply enough, it is an Operator object that preserves the project function and also implements “call” interface. The latter relies on a custom subscription using one final custom object used for this purpose.
3. A custom subscriber object
In a few words…
we construct a Subscriber typed object which implements _call function in a way that the rest of the pipeable operations get notified of either an error or the result of series of operations. In our case, the actual implementation is contained in those 3 lines of code:
result = values.map(value =>
this.project.call(this, value))
What we really want to do is, apply our project function to every element of our array input values i.e
Final Thoughts
Creating a custom Rx operator is a slightly complex process, however there are a couple of helper functions, already provided by RxJS library, which makes development of such rx operator, as our arrayMap, easier to do. An important thing to mention is also the use of an ‘index’ parameter that could be used on operators related to time or step…the possibilities are endless, as long as there is imagination ;)