-
Notifications
You must be signed in to change notification settings - Fork 0
Transforming Observables
This section explains operators with which you can transform items that are emitted by an Observable.
-
map( )
— transform the items emitted by an Observable by applying a function to each of them -
mapMany( )
orflatMap( )
— transform the items emitted by an Observable into Observables, then flatten this into a single Observable -
mapManyDelayError( )
— transform the items emitted by an Observable into Observables, then flatten this into a single Observable, waiting to report errors until all error-free observables have a chance to complete -
scan( )
— apply a function to each item emitted by an Observable, sequentially, and emit each successive value -
groupBy( )
andgroupByUntil( )
— divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key -
buffer( )
— periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time -
window( )
— periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time -
cast( )
— cast all items from the source Observable into a particular type before reemitting them
The map( )
method applies a function of your choosing to every item emitted by an Observable, and returns this transformation as a new Observable. For example, the following code maps a function that squares the incoming value onto the values in numbers
:
numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.map({it * it}).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
4
9
16
25
Sequence complete
- javadoc:
map(func)
- RxJS:
map
- Linq:
Select
- Introduction to Rx: Select
Transform the items emitted by an Observable into Observables, then flatten this into a single Observable
The mapMany( )
method (or flatMap( )
, which has identical behavior) creates a new Observable by applying a function that you supply to each item emitted by the original Observable, where that function is itself an Observable that emits items, and then merges the results of that function applied to every item emitted by the original Observable, emitting these merged results.
This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.
// this closure is an Observable that emits three numbers
numbers = Observable.from([1, 2, 3]);
// this closure is an Observable that emits two numbers based on what number it is passed
multiples = { n -> Observable.from([ n*2, n*3 ]) };
numbers.mapMany(multiples).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
2
3
4
6
6
9
Sequence complete
If any of the individual Observables mapped to the items from the source Observable in mapMany( )
aborts by invoking onError
, the mapMany( )
call itself will immediately abort and invoke onError
. If you would prefer that the map-many operation continue emitting the results of the remaining, error-free Observables before reporting the error, use mapManyDelayError( )
instead.
Because it is possible for more than one of the individual Observables to encounter an error, mapManyDelayError( )
may pass information about multiple errors to the onError
method of its Subscribers (which it will never invoke more than once). For this reason, if you want to know the nature of these errors, you should write your onError
method so that it accepts a parameter of the class CompositeException
.
- javadoc:
mapMany(func)
(and itsflatMap
clone) - RxJS:
selectMany
- Linq:
SelectMany
- Introduction to Rx: SelectMany
The scan( )
method returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. It emits the result of each of these iterations from the returned Observable. This sort of function is sometimes called an “accumulator.”
For example, the following code takes an Observable that emits a consecutive sequence of n integers starting with 1 and converts it into an Observable that emits the first n triangular numbers:
numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.scan({ a, b -> a+b }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
3
6
10
15
Sequence complete
There is also a version of scan( )
to which you can pass a seed item in addition to an accumulator function:
my_observable.scan(initial_seed, accumulator_closure)
Note: if you pass a seed item to scan( )
, it will emit the seed itself as its first item.
Note also that passing a null
seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null
, you will be seeding your scan with null
, and scan( )
will emit null
as its first item.
- javadoc:
scan(accumulator)
- javadoc:
scan(initialValue, accumulator)
- RxJS:
scan
- Linq:
Scan
- Introduction to Rx: Scan
divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key
The groupBy( )
method creates or extracts a key from all of the items emitted by a source Observable. For each unique key created in this way, groupBy( )
creates a GroupedObservable
that emits all of the items emitted by the source Observable that match that key. groupBy( )
then emits each of these Observables, as an Observable. A GroupedObservable
has a method, getKey( )
with which you can retrieve the key that defines the GroupedObservable
.
The following sample code uses groupBy( )
to transform a list of numbers into two lists, grouped by whether or not the numbers are even:
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };
numbers.groupBy(groupFunc).mapMany({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete
There is also a groupByUntil( )
operator. It adds another parameter: an Observable that emits duration markers. When a duration marker is emitted by this Observable, any grouped Observables that have been opened are closed, and groupByUntil( )
will create new grouped Observables for any subsequent emissions by the source Observable.
Another variety of groupByUntil( )
limits the number of groups that can be active at any particular time. If an item is emitted by the source Observable that would cause the number of groups to exceed this maximum, before the new group is emitted, one of the existing groups is closed (that is, the Observable it represents terminates by calling its Subscribers' onCompleted
methods and then expires).
Note that when groupBy( )
or groupByUntil( )
splits up the source Observable into an Observable that emits Observables, it begins to emit items from the source Observable onto these emitted Observables immediately. That is to say, it does not wait for any Subscribers to subscribe. So if you want to ensure that you see all of the items that are emitted on these new Observables, you should take care to subscribe to them right away.
The following illustration shows how this can cause unexpected behavior:
In this illustration, groupBy( )
is used to separate a source Observable that emits the numbers 1 through 6 into an Observable (in red) that emits two Observables: one that emits the odd numbers from the source Observable, and the other that emits the even numbers. Then, this Observable of Observables, shown in red, is zipped with another Observable (shown in blue) that emits the strings "odd" and "even", and in the zip function, it applies this string label to all of the items emitted by the associated Observable emitted by the Observable shown in red.
However, zip( )
does not apply this zip function until it observes an item emitted by each of the red and the blue source Observables. Since "odd" arrives after the first of these Observables has already emitted "1", when zip( )
has not yet subscribed to this Observable, it never observes this "1" and does not apply the zip function to it. A similar thing happens with "even", which arrives after both "2" and "4" are emitted by the Observable emission it is paired with from the red-colored Observable. For this reason, the transformed Observable emitted by zip( )
is missing some of the data from the original Observables.
- javadoc:
groupBy(keySelector)
- javadoc:
groupBy(keySelector, elementSelector)
- javadoc:
groupByUntil(keySelector, durationSelector)
- javadoc:
groupByUntil(keySelector, valueSelector, durationSelector)
- RxJS:
groupBy
- RxJS:
groupByUntil
- Linq:
GroupBy
- Linq:
GroupByUntil
- Introduction to Rx: GroupBy
periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
The buffer( )
method periodically gathers items emitted by a source Observable
into bundles, and emits these bundles as its own emissions. There are a number of ways with which you can regulate how buffer( )
gathers items from the source Observable
into bundles:
-
buffer(bufferOpenings, closingSelector)
This version of
buffer( )
monitors anObservable
, bufferOpenings, that emitsBufferOpening
objects. Each time it observes such an emitted object, it creates a new bundle to begin collecting items emitted by the sourceObservable
and it passes the bufferOpeningsObservable
into the closingSelector function. That function returns anObservable
.buffer( )
monitors thatObservable
and when it detects an emitted object, it closes its bundle and emits it as its own emission.
-
buffer(count)
This version of
buffer( )
emits a new bundle of items for every count items emitted by the sourceObservable
.
-
buffer(count, skip)
This version of
buffer( )
create a new bundle of items for every skip item(s) emitted by the sourceObservable
, each containing count elements. If skip is less than count this means that the bundles will overlap and contain duplicate items. For example:from([1, 2, 3, 4, 5]).buffer(3, 1)
will emit the following bundles:[1, 2, 3]
,[2, 3, 4]
,[3, 4, 5]
.
-
buffer(timespan)
andbuffer(timespan, scheduler)
This version of
buffer( )
emits a new bundle of items periodically, every timespan amount of time, containing all items emitted by the sourceObservable
since the previous bundle emission.
-
buffer(timespan, count)
andbuffer(timespan, count, scheduler)
This version of
buffer( )
emits a new bundle of items for every count items emitted by the sourceObservable
, or, if timespan has elapsed since its last bundle emission, it emits a bundle of however many items the sourceObservable
has emitted in that span, even if this is less than count.
-
buffer(timespan, timeshift)
andbuffer(timespan, timeshift, scheduler)
This version of
buffer( )
creates a new bundle of items every timeshift, and fills this bundle with every item emitted by the sourceObservable
from that time until timespan time has passed since the bundle's creation, before emitting the bundle as its own emission. If timespan is longer than timeshift, the emitted bundles will represent time periods that overlap and so they may contain duplicate items.
- javadoc:
buffer(closingSelector)
- javadoc:
buffer(count)
- javadoc:
buffer(count, skip)
- javadoc:
buffer(timespan, timeshift, unit)
- javadoc:
buffer(timespan, timeshift, unit, scheduler)
- javadoc:
buffer(timespan, unit)
- javadoc:
buffer(timespan, unit, count)
- javadoc:
buffer(timespan, unit, count, scheduler)
- javadoc:
buffer(timespan, unit, scheduler)
- javadoc:
buffer(bufferOpenings, closingSelector)
- RxJS:
buffer
,bufferWithCount
,bufferWithTime
, andbufferWithTimeOrCount
- Linq:
Buffer
- Introduction to Rx: Buffer
- Introduction to Rx: Buffer revisited
periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
Window is similar to buffer( )
, but rather than emitting packets of items from the original Observable
, it emits Observable
s, each one of which emits a subset of items from the original Observable
and then terminates with an onCompleted( )
call.
Like buffer( )
, window( )
has many varieties, each with its own way of subdividing the original Observable
into the resulting Observable
emissions, each one of which contains a "window" onto the original emitted items. In the terminology of the window( )
method, when a window "opens," this means that a new Observable
is emitted and that Observable
will begin emitting items emitted by the source Observable
. When a window "closes," this means that the emitted Observable
stops emitting items from the source Observable
and calls its Subscribers' onCompleted( )
method and terminates.
-
window(source, closingSelector)
This version of
window( )
opens its first window immediately. It closes the currently open window and immediately opens a new one each time it observes an object emitted by theObservable
that is returned from closingSelector. In this way, this version ofwindow( )
emits a series of non-overlapping windows whose collectiveonNext( )
emissions correspond one-to-one with those of the sourceObservable
.
-
window(source, windowOpenings, closingSelector)
This version of
window( )
opens a window whenever it observes the windowOpeningsObservable
emit anOpening
object and at the same time calls closingSelector to generate a closingObservable
associated with that window. When that closingObservable
emits an object,window( )
closes that window. Since the closing of currently open windows and the opening of new windows are activities that are regulated by independentObservable
s, this version ofwindow( )
may create windows that overlap (duplicating items from the sourceObservable
) or that leave gaps (discarding items from the sourceObservable
).
-
window(source, count)
This version of
window( )
opens its first window immediately. It closes the currently open window and immediately opens a new one whenever the current window has emitted count items. It will also close the currently open window if it receives anonCompleted( )
oronError( )
call from the sourceObservable
. This version ofwindow( )
emits a series of non-overlapping windows whose collectiveonNext( )
emissions correspond one-to-one with those of the sourceObservable
.
-
window(source, count, skip)
This version of
window( )
opens its first window immediately. It opens a new window beginning with every skip item from the sourceObservable
(e.g. if skip is 3, then it opens a new window starting with every third item). It closes each window when that window has emitted count items or if it receives anonCompleted( )
oronError( )
call from the sourceObservable
. If skip = count then this behaves the same aswindow(source, count)
; if skip < count this will emit windows that overlap by count - skip items; if skip > count this will emit windows that drop skip - count items from the sourceObservable
between every window.
-
window(source, timespan, unit)
andwindow(source, timespan, unit, scheduler)
This version of
window( )
opens its first window immediately. It closes the currently open window and opens another one every timespan period of time (measured in unit, and optionally on a particular scheduler). It will also close the currently open window if it receives anonCompleted( )
oronError( )
call from the sourceObservable
. This version ofwindow( )
emits a series of non-overlapping windows whose collectiveonNext( )
emissions correspond one-to-one with those of the sourceObservable
.
-
window(source, timespan, unit, count)
andwindow(source, timespan, unit, count, scheduler)
This version of
window( )
opens its first window immediately. It closes the currently open window and opens another one every timespan period of time (measured in unit, and optionally on a particular scheduler) or whenever the currently open window has emitted count items. It will also close the currently open window if it receives anonCompleted( )
oronError( )
call from the sourceObservable
. This version ofwindow( )
emits a series of non-overlapping windows whose collectiveonNext( )
emissions correspond one-to-one with those of the sourceObservable
.
-
window(source, timespan, timeshift, unit)
andwindow(source, timespan, timeshift, unit, scheduler)
This version of
window( )
opens its first window immediately, and thereafter opens a new window every timeshift period of time (measured in unit, and optionally on a particular scheduler). It closes a currently open window after timespan period of time has passed since that window was opened. It will also close any currently open window if it receives anonCompleted( )
oronError( )
call from the sourceObservable
. Depending on how you set timespan and timeshift the windows that result from this operation may overlap or have gaps.
- javadoc:
window(closingSelector)
- javadoc:
window(count)
- javadoc:
window(count, skip)
- javadoc:
window(timespan, timeshift, unit)
- javadoc:
window(timespan, timeshift, unit, scheduler)
- javadoc:
window(timespan, unit)
- javadoc:
window(timespan, unit, count)
- javadoc:
window(timespan, unit, count, scheduler)
- javadoc:
window(timespan, unit, scheduler)
- javadoc:
window(windowOpenings, closingSelector)
- RxJS:
window
,windowWithCount
,windowWithTime
, andwindowWithTimeOrCount
- Linq:
Window
- Introduction to Rx: Window
- javadoc:
cast(class)
- Linq:
Cast
- Introduction to Rx: Cast and OfType
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Twitter @RxJava | Jobs