Adds an array of methods to the specified target Constructor, passing in the Constructor itself as the last argument to each.
// adding creators to Observable so you can call Observable.fromPromise(), etc
addCreators(Observable, [fromEvent, fromPromise])
Adds an array of operators to the specified target object, modifying each
so that this
is passed in as the first argument.
// a common thing you might want to do so that you can call observable.map(...).filter(...)
addOperators(Observable.prototype, [filter, map, scan])
// if you don't want to modify all Observables
addOperators(myCustomObservable, [reduce, take])
"Catch" an Observable sequence by continuing with another Observable sequence
upon error. Takes a function as an argument that will be passed the error value
and must return an Observable. When assigned to a Constructor this method is called with .catch()
(Observable)
Input Observable.
(Function)
Catching function. Must return an Observable.
Observable
:
new Observable
catchError(
new Observable(observer => observer.error('bad stuff')),
e => Observable.of(`Caught error: ${e}`)
)
.subscribe(console.log) // Caught error: bad stuff
// If available on Observable.prototype
stream
.catch(e => alternateStream)
.subscribe(...)
Subscribe to a list of Observables in order. As one completes, the next is subscribed.
(...Observable)
List of Observables
Observable
:
new Observable with concated values
concat(
Observable.of(1, 2, 3),
Observable.of(4, 5, 6),
Observable.of(7, 8, 9)
) // 1, 2, 3, 4, 5, 6, 7, 8, 9
// if available on Observable.prototype
firstStream
.concat(nextStream)
.subscribe(handle)
Debounces an Observable stream by a specified number of milliseconds.
(Observable)
Input Observable
(number)
Milliseconds to debounce by
Observable
:
Debounced Observable
debounce(clickEvents, 300)
// if available on Observable.prototype
clickEvents
.debounce(300)
.map(handler)
Returns an Observable from the result of a function exectuted upon subscription. Handles async functions, emitting an event upon resolution. Similar to fromPromise but useful for cases where you don't want to do an expensive async thing until something subscribes.
(Function)
A function, potentially async
(Function
= Observable
)
Observable constructor
Observable
:
Observable
async function makeCall() {
const params = buildParams(...)
const response = await fetch(params)
return response.data
}
// call will not happen after this statement
const callResult = Observable
.defer(makeCall)
.map(handleResponse)
// but it will now
callResult.subscribe(...)
// will also work with a function that returns a promise
function doAsyncThing() {
return new Promise(...)
}
Observable.defer(doAsyncThing).subscribe(...)
// will also work with non-async functions
function doExpensiveSynchronousThing() {
...
}
Observable.defer(doExpensiveSynchronousThing).subscribe(...)
Delay stream events by the specified number of milliseconds. Maintains the relative spacing of events.
(Observable)
Input Observable
(number)
Delay time in milliseconds
Observable
:
New Observable
Creates an observable that immediately terminates with the provided error
(any)
Error value
(Function
= Observable
)
Observable constructor
Observable
:
New Observable
error('error message').subscribe({ error: console.log }) // logs 'error message'
// rejecting a certain case
stream
.filter(specificCase)
.flatMap(value => Observable.error(value))
.subscribe(observer)
Returns a new Observable that emits only the values of the input Observable for which a provided function returns truthy.
(Observable)
Input Observable
(Function)
Filtering function
Observable
:
New filtered Observable
filter(Observable.of(1, 2, 3, 4), value ==> value % 2) // 2, 4
// If available on Observable.prototype
Observable.of(1, 2, 3, 4).filter(value => value % 2 === 1) // 1, 3
Applies a mapping function to the items emitted by an Observable, then transforms the result into an Observable whos outputs are flattened into a single Observable.
(Observable)
Input Observable
(Function)
Mapping function. May return a value, another Observable, or iterable.
The result will be transformed into an Observable with Observable.from
Observable
:
New Observable that emits the flattened outputs of all mapped values
// Combining flatMap with fromEvent and fromPromise to create an Observable stream
// of data fetched when a certain element is clicked
function fetchOnClick(element) {
return flatMap(
fromEvent(element, 'click'),
click => fromPromise(fetchData(click))
)
}
// if available on Observable.prototype
// assuming registerSocket returns an Observable, this will merge all new messages
// from a set of websockets
socketUrls
.flatMap(url => registerSocket(url))
.map(parseMessage)
.subscribe(handleEvent)
Calls a function once for each observed value. Returns a Promise that resolves when the input Observable is complete.
(Observable)
Input Observable
(Function)
Function to call
Promise
:
Promise resolving when the Observable is complete
forEach(
Observable.of(1, 2, 3),
value => console.log(`{value} mississippi`)
)
.then(() => console.log('Ready or not here I come!'))
.catch(() => console.log('Something went wrong'))
// if available on Observable.prototype
myObservable
.forEach(doAThing)
.then(wrapItUp)
.catch(panic)
Creates an Observable by adding an eventListener to the specified DOMElement. Removes event listener when the Observable's observer is unsubscribed
(Element)
Dom element to listen on
(String)
Event type
(Function
= Observable
)
Observable constructor
Observable
:
Observable sequence of events
Observable.fromEvent(button, 'click').subscribe(handleClick)
Transforms a Promise in to an Observable that emits a single value upon Promise resolution. The Observable will error if the promise does. This method requires that the Promise has already been created so that it may be passed in as the argument. If you wish to defer Promise creation until the Observable has been subscribed to see defer.
Observable
:
Observable
Observable.fromPromise(myPromise).subscribe(handleCall)
// will also work with the return values of async functions
async function makeCall() {
const response = await fetch(...)
return handleReponse(response)
}
Observable.fromPromise(makeCall()).subscribe(console.log)
Creates an observable that emits an integer count every specified number of milliseconds
Observable
:
Observable stream of integers
Observable.interval(1000) // 0, 1, 2, ...
Observable.interval(500) // 0, 1, 2, ... but twice as fast
Returns a new Observable that emits the result of a provided function applied to each value received from an input observable.
(Observable)
Input Observable
(Function)
Mapping function
Observable
:
New observable with mapped values
map(Observable.of(1, 2, 3), val => val + 1) // 2, 3, 4
// if available on Observable.prototype
Observable.of(1, 2, 3).map(val => `${val} mississippi`)
Combines multiple Observables in to one by passing on each input's emissions. Any error notification from an input will be passed on to the observer immediately. The output stream will terminate when all inputs have terminated.
(...Observable)
Input Observables
Observable
:
Single output Observable
merge( clickEventsFromA, clickEventsFromB, clickEventsFromC ).subscribe(handleEvents)
// if available on Observable.prototype
eventsEvery100ms
.merge(eventsEvery10s)
.forEach(console.log)
Applies a function against an accumulator and each observed value of an input Observable, returning a Promise that resolves with the accumulated value when the input is complete. Similar to scan.
Promise
:
Promise resolving to the accumulated value upon completion
// logs 16
reduce(Observable.of(1, 2, 3), (acc, val) => acc + val, 10)
.then(console.log)
.catch(panic)
// if available on Observable.prototype
// logs 5
Observable.of(1, 5, 2)
.reduce(Math.max, 0)
.then(console.log)
.catch(panic)
Returns a new observable containing incrementally accumulated values, starting with the provided initial value. Similar to reduce.
Observable
:
New Observable of accumulated values
scan(Observable.of(1, 3, 2, 5), Math.max, 0) // 1, 3, 3, 5
// if available on Observable.prototype
Observable.of(1, 2, 3).scan((acc, val) => acc + val, 0) // 1, 3, 6
Skips the first N values of an Observable
(Observable)
Input Observable
(number)
Number of values to skip
Observable
:
New Observable
skip(observable.of(1, 2, 3, 4), 2) // 3, 4
// if available on Observable.prototype
Observable.of('a', 'b', 'c', 'd', 'e').skip(3) // d, e
Leaves off the last N values of an Observable
(Observable)
Input Observable
(number)
Number of values to leave off the end
Observable
:
New Observable
skipLast(observable.of(1, 2, 3, 4), 2) // 1, 2
// if available on Observable.prototype
Observable.of('a', 'b', 'c', 'd', 'e').skipLast(3) // a, b
Subscribes to an input Observable but does not emit values until it receives a value from a signal observable. Emits an empty Observable if the signal terminates without emitting a value. Propagates errors from both input and signal.
(Observable)
Input Observable
(Observable)
Signal Observable
Observable
:
New Observable
// emits consecutive integers until after the user says they care
skipUntil(Observable.interval(100), startEvent)
// if available on Observable.prototype
socketEvents
.skipUntil(startListening)
Start an observable sequence with the provided values
(Observable)
Input Observable
(...any)
Values to start with
Observable
:
new Observable
startWith(Observable.of(1, 2, 3), 0) // 0, 1, 2, 3
// if available on Observable.prototype
Observable.of(1, 2, 3)
.startWith(-1, 0) // -1, 0, 1, 2, 3
Applies a mapping function to the items emitted by an Observable, then transforms the result into an Observable whos outputs are emitted to the subscriber. When a new value of the input stream arrives, the previous Observable is unsubscribed and replaced with the next.
(Observable)
Input Observable
(Function)
Mapping function. May return a value, another Observable, or iterable.
The result will be transformed into an Observable with Observable.from
Observable
:
New Observable that emits the flattened outputs of all mapped values
// using switchMap for an autocomplete fetch stream. When a new fetch is initiated,
// the old result stream will be cancelled
function autocomplete(inputStream) {
return switchMap(
inputStream,
userInput => fromPromise(fetchData(input))
)
}
// same example in fluent style with a debounce thrown in
Observable.fromEvent(input, 'input')
.map(evt => evt.value)
.debounce(300)
.switchMap(input => Observable.fromPromise(autocomplete(input)))
Emits the first N values of an input Observable
(Observable)
Input Observable
(number)
Number of values to take
Observable
:
New Observable
take(observable.of(1, 2, 3, 4), 2) // 1, 2
// if available on Observable.prototype
Observable.of('a', 'b', 'c', 'd', 'e').take(3) // a, b, c
Takes the last N values of an Observable
(Observable)
Input Observable
(number)
Number of values to take at the end
Observable
:
New Observable
takeLast(observable.of(1, 2, 3, 4), 2) // 3, 4
// if available on Observable.prototype
Observable.of('a', 'b', 'c', 'd', 'e').takeLast(3) // c, d, e
Emits values from an Observable sequence until it receives a value from a signal Observable. If the signal emits a value right away, no values will be emitted. If the signal closes before emitting a value, the input Observable will pass through unchanged. Errors from both the signal and input are propagated forward. This operator is particularly useful for stream cancelation by users or other processes.
(Observable)
Input Observable
(Observable)
Signal Observable
Observable
:
New Observable
// emits consecutive integers until the user says to shut up
takeUntil(Observable.interval(100), cancelEvent)
// using takeUtil along with flatMap to create a stream of mouse drag events
// if available on Observable.prototype
const drags = mouseDownEvents
.flatMap(e => mouseMoveEvents(e).takeUntil(mouseUpEvents))
Limits the rate of events to at most one every throttlePeriod. Emits the first event encountered once throttlePeriod milliseconds have passed since the last event.
(Observable)
Input Observable
(number)
Minimum number of milliseconds between events
Observable
:
New Observable
throttle(interval(1000), 3000) // 0 ... 3 ... 6 ...
// limits scroll events, maybe to bookmark scroll position
// If available on Observable.prototype
scrollEvents
.throttle(300)
Returns a promise resolving to an array representation of an input Observable upon completion.
(Observable)
Input Observable
Promise
:
Promise resolving to an array upon completion
toArray(Observable.of(1, 3, 2)) // [1, 3, 2]
// if available on Observable.prototype
Observable.of(2, 3, 4).toArray() // [2, 3, 4]
Returns a new Observable expressed as an operation on the values emitted by a single observable. Useful internally to provide repeated logic for other operations.
(Observable)
Observable stream
(Function)
Operation to perform on input stream
Observable
:
New Observable
// makes an operator that passes along values until a target value is found
const takeUntilValue = (input, targetValue) => transform(input, (observer, value) => {
if (value === targetValue) {
observer.complete(value)
} else {
observer.next(value)
}
})