-
Notifications
You must be signed in to change notification settings - Fork 0
Creating Observables
This section explains methods that create Observables.
-
from( )
— convert an Iterable or a Future into an Observable -
fromFuture( )
— convert a Future into an Observable -
startFuture( )
— convert a Future into an Observable -
deferFuture( )
— convert a Future into an Observable, but do not attempt to get the Future's value until an Observer subscribes -
fromCancellableFuture( )
,startCancellableFuture( )
, anddeferCancellableFuture( )
— versions of Future-to-Observable converters that monitor the subscription status of the Observable to determine whether to halt work on the Future -
forIterable( )
— apply a function to the elements of an Iterable to create Observables which are then concatenated -
toAsync( )
— convert a function into an Observable that executes the function and emits its return value -
just( )
— convert an object into an Observable that emits that object -
repeat( )
— create an Observable that emits a particular item or sequence of items repeatedly -
create( )
— create an Observable from scratch by means of a function -
start( )
— create an Observable that emits the return value of a function -
defer( )
— do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription -
range( )
— create an Observable that emits a range of sequential integers -
interval( )
— create an Observable that emits a sequence of integers spaced by a given time interval -
timer( )
— create an Observable that emits a single item after a given delay -
generate( )
andgenerateAbsoluteTime( )
— create an Observable that emits a sequence of items as generated by a function of your choosing -
empty( )
— create an Observable that emits nothing and then completes -
error( )
— create an Observable that emits nothing and then signals an error -
never( )
— create an Observable that emits nothing at all
You can convert an object that supports Iterable
into an Observable that emits each iterable item in the object, or an object that supports Future
into an Observable that emits the result of the get
call, simply by passing the object into the from( )
methods, for example:
myObservable = Observable.from(myIterable);
You can also do this with arrays, for example:
myArray = [1, 2, 3, 4, 5];
myArrayObservable = Observable.from(myArray);
This converts the sequence of values in the iterable object or array into a sequence of items emitted, one at a time, by an Observable.
An empty iterable (or array) can be converted to an Observable in this way. The resulting Observable will invoke onCompleted()
without first invoking onNext()
.
Note that when the from( )
method transforms a Future
into an Observable, such an Observable will be effectively blocking, as its underlying Future
blocks.
- javadoc:
from(future)
- javadoc:
from(future, timeout, unit)
- javadoc:
from(future, scheduler)
- javadoc:
from(iterable)
- javadoc:
from(array)
- RxJS:
fromArray
- RxJS:
fromPromise
- Linq:
ToObservable
versions of Future-to-Observable converters that monitor the subscription status of the Observable to determine whether to halt work on the Future
If the a subscriber to the Observable that results when a Future is converted to an Observable later unsubscribes from that Observable, it can be useful to have the ability to stop attempting to retrieve items from the Future. The "cancellable" Future enables you do do this. These three methods will return Observables that, when unsubscribed to, will also "unsubscribe" from the underlying Futures.
convert a Future into an Observable, but do not attempt to get the Future's value until an Observer subscribes
You can also choose to convert a Future into an Observable in such a way that the Future is not invoked until an observer subscribes to the resulting Observable, with the deferFuture( )
operator.
forIterable( )
is similar to from(Iterable )
but instead of the resulting Observable emitting the elements of the Iterable as its own emitted items, it applies a specified function to each of these elements to generate one Observable per element, and then concatenates the emissions of these Observables to be its own sequence of emitted items.
With toAsync( )
you can create an Observable that, when it is subscribed to, executes a function of your choosing and emits its return value before completing. In the case of an Action
, it will emit null
before completing. Note that even if the resulting Observable is subscribed to more than once, the function will only be executed once, and its sole return value will be emitted to all future observers.
To convert any object into an Observable that emits that object, pass that object into the just( )
method.
// Observable emits "some string" as a single item
def observableThatEmitsAString = Observable.just("some string");
// Observable emits the list [1, 2, 3, 4, 5] as a single item
def observableThatEmitsAList = Observable.just([1, 2, 3, 4, 5]);
This has some similarities to the from( )
method, but note that if you pass an iterable to from( )
, it will convert an iterable object into an Observable that emits each of the items in the iterable, one at a time, while the just( )
method would convert the iterable into an Observable that emits the entire iterable as a single item.
If you pass nothing or null
to just( )
, the resulting Observable will not merely call onCompleted( )
without calling onNext( )
. It will instead call onNext( null )
before calling onCompleted( )
.
- javadoc:
just(value)
There are also versions of repeat( )
that operate on a particular scheduler, that repeat only a certain number of times before terminating, and that repeat sequences of items emitted by a source Observable rather than one particular item.
You can create an Observable from scratch by using the create( )
method. You pass this method a function that accepts as its parameter the Observer that is passed to an Observable’s subscribe( )
method. Write the function you pass to create( )
so that it behaves as an Observable — calling the passed-in Observer’s onNext( )
, onError( )
, and onCompleted( )
methods appropriately. For example:
def myObservable = Observable.create({ anObserver ->
try {
anObserver.onNext('One');
anObserver.onNext('Two');
anObserver.onNext('Three');
anObserver.onNext('Four');
anObserver.onCompleted();
} catch(Throwable t) {
anObserver.onError(t);
}
})
NOTE: A well-formed Observable must call either the observer’s onCompleted( )
method exactly once or its onError( )
method exactly once, and must not thereafter call any of the observer’s other methods.
- javadoc:
create(func)
- RxJS:
create
- Linq:
Create
Pass the start( )
method a function that returns a value, and start( )
will execute that function asynchronously and return an Observable that will emit that value to any subsequent Observers.
do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription
Pass defer( )
an Observable factory function (a function that generates Observables), and defer( )
will return an Observable that will call this function to generate its Observable sequence afresh each time a new Observer subscribes.
- javadoc:
defer(observableFactory)
- RxJS:
defer
- Linq:
Defer
To create an Observable that emits a range of sequential integers, pass the starting integer and the number of integers to emit to the range( )
method.
// myObservable emits the integers 5, 6, and 7 before completing:
def myObservable = Observable.range(5, 3);
In calls to range(n,m)
, values less than 1 for m will result in no numbers being emitted. n may be any integer that can be represented as a BigDecimal
— posititve, negative, or zero.
- javadoc:
range(start, count)
- RxJS:
range
- Linq:
Range
- Introduction to Rx: Range
To create an Observable that emits items spaced by a particular interval of time, pass the time interval and the units of time that interval is measured in (and, optionally, a scheduler) to the interval( )
method.
- javadoc:
interval(interval,unit)
- javadoc:
interval(interval,unit,scheduler)
- RxJS:
interval
- Linq:
Interval
- Introduction to Rx: Interval
The timer( )
method returns an Observable that, when subscribed to, waits for a span of time that you have defined, then emits a single zero and completes.
There is also a version of timer( )
that emits a single zero after a specified delay, and then emits incrementally increasing numbers periodically thereafter on a specified periodicity:
For both of these versions of timer( )
you can optionally specify a Scheduler on which the timing will take place.
- RxJS:
timer
- Linq:
Timer
- Introduction to Rx: Timer
The basic form of generate( )
takes four parameters. These are initialState
and three functions: iterate( )
, condition( )
, and resultSelector( )
. generate( )
uses these four parameters to generate an Observable sequence, which is its return value. It does so in the following way.
generate( )
creates each emission from the sequence by applying the resultSelector( )
function to the current state and emitting the resulting item. The first state, which determines the first emitted item, is initialState
. generate( )
determines each subsequent state by applying iterate( )
to the current state. Before emitting an item, generate( )
tests the result of condition( )
applied to the current state. If the result of this test is false
, instead of calling resultSelector( )
and emitting the resulting value, generate( )
terminates the sequence and stops iterating the state.
There are also versions of generate( )
that allow you to do the work of generating the sequence on a particular Scheduler
and that allow you to set the time interval between emissions by applying a function to the current state. The generateAbsoluteTime( )
allows you to control the time at which an item is emitted by applying a function to the state to get an absolute system clock time (rather than an interval from the previous emission).
- Introduction to Rx: Generate
- Linq:
Generate
- RxJS:
generate
,generateWithAbsoluteTime
, andgenerateWithRelativeTime
-
empty( )
creates an Observable that does not emit any items but instead immediately calls the observer’sonCompleted( )
method. -
error( )
creates an Observable that does not emit any items but instead immediately calls the observer’sonError( )
method. -
never( )
creates an Observable that does not emit any items, nor does it call either the observer’sonCompleted( )
oronError( )
methods.
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
println("*** empty() ***");
Observable.empty().subscribe(
{ println("empty: " + it); }, // onNext
{ println("empty: error - " + it.getMessage()); }, // onError
{ println("empty: Sequence complete"); } // onCompleted
);
println("*** error() ***");
Observable.error(new Throwable("badness")).subscribe(
{ println("error: " + it); }, // onNext
{ println("error: error - " + it.getMessage()); }, // onError
{ println("error: Sequence complete"); } // onCompleted
);
println("*** never() ***");
Observable.never().subscribe(
{ println("never: " + it); }, // onNext
{ println("never: error - " + it.getMessage()); }, // onError
{ println("never: Sequence complete"); } // onCompleted
);
println("*** END ***");
*** empty() ***
empty: Sequence complete
*** error() ***
error: error - badness
*** never() ***
*** END ***
- javadoc:
empty()
- javadoc:
error(exception)
- javadoc:
never()
- RxJS:
empty
andnever
- Linq:
Empty
andNever
- Introduction to Rx: Simple factory methods
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Twitter @RxJava | Jobs