The Magic behind RxJS

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and automatically notifies them of any state changes (usually by calling one of their methods). (Wikipedia)

An operator is simple a function, they are the horse-power behind observables, providing an elegant, declarative and common API to complex tasks in both async/sync world.

Max showing RxJS Library
Observable.subscribe(observer:Observer):Subscription;
const { from } = require(\'rxjs\');
const { map, filter } = require(\'rxjs/operators\');

const observable = from([1, 2, 3, 4, 5])
.pipe(
map ( x => x + 1),
filter( x => x % 2 === 0),
map( x => x - 1)
);

observable.subscribe(
val => console.log(\'odd:\', val),
error => console.error(error),
() => console.log(\'complete\')
);
function from(initialData) {
return {
pipe(...pipeFunction) {
return {
subscribe(onNext, onError, onComplete) {
}
}
}
}
}
function createObservable(operator) {
return {
subscribe(next) {
operator(next);
}
}
}
How Operators Works in RxJS
function from(initialData){
return {
pipe: (...pipeFunctions) => {
return {
subscribe: (onNext, onError, onComplete) => {
const dataObservable = createObservable(x => initialData.forEach(x))

let currentObservable = dataObservable;

pipeFunctions.forEach(pipeFunc => {
currentObservable = pipeFunc(currentObservable)
})

currentObservable.subscribe(onNext);

onComplete();
}
}
}
}
}
function map(mapFunction){
return sourceObservable => {
const currentObservable = createObservable(destinationNext => {
sourceObservable.subscribe(value => {
const newValue = mapFunction(value)
destinationNext(newValue);
})
})
return currentObservable;
}
}

function filter(filterFunction){
return sourceObservable => {
const currentObservable = createObservable(destinationNext => {
sourceObservable.subscribe(value => {
if(filterFunction(value)){
destinationNext(value);
}
})
})
return currentObservable;
}
}
const { interval } = require('rxjs');
const { take, filter } = require('rxjs/operators');

const source = interval(1000)
.pipe(take(3600));

source.subscribe(
console.log,
console.err,
() => console.log('--> Completed!'),
);

console.log('--> Subscribed!')

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store