Skip to content

Creating Observables

DavidMGross edited this page Nov 19, 2013 · 62 revisions

This section explains methods that create Observables.

  • from( ) — convert an Iterable or a Future into an Observable
  • just( ) — convert an object into an Observable that emits that object
  • repeat( ) — create an Observable that emits a particular item or sequence of items repeatedly
  • create( ) — create an Observable from scratch by means of a function
  • defer( ) — do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription
  • range( ) — create an Observable that emits a range of sequential integers
  • interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval
  • empty( ) — create an Observable that emits nothing and then completes
  • error( ) — create an Observable that emits nothing and then signals an error
  • never( ) — create an Observable that emits nothing at all

from( )

convert an Iterable or a Future into an Observable

You can convert an object that supports Iterable into an Observable that emits each iterable item in the object, or an object that supports Future into an Observable that emits the result of the get call, simply by passing the object into the from( ) methods, for example:

myObservable = Observable.from(myIterable);

You can also do this with arrays, for example:

myArray = [1, 2, 3, 4, 5];
myArrayObservable = Observable.from(myArray);

This converts the sequence of values in the iterable object or array into a sequence of items emitted, one at a time, by an Observable.

An empty iterable (or array) can be converted to an Observable in this way. The resulting Observable will invoke onCompleted() without first invoking onNext().

Note that when the from( ) method transforms a Future into an Observable, such an Observable will be effectively blocking, as its underlying Future blocks.

see also:


just( )

convert an object into an Observable that emits that object

To convert any object into an Observable that emits that object, pass that object into the just( ) method.

// Observable emits "some string" as a single item
def observableThatEmitsAString = Observable.just("some string"); 
// Observable emits the list [1, 2, 3, 4, 5] as a single item
def observableThatEmitsAList = Observable.just([1, 2, 3, 4, 5]); 

This has some similarities to the from( ) method, but note that if you pass an iterable to from( ), it will convert an iterable object into an Observable that emits each of the items in the iterable, one at a time, while the just( ) method would convert the iterable into an Observable that emits the entire iterable as a single item.

If you pass nothing or null to just( ), the resulting Observable will not merely call onCompleted( ) without calling onNext( ). It will instead call onNext( null ) before calling onCompleted( ).

see also:


repeat( )

create an Observable that emits a particular item or sequence of items repeatedly

There are also versions of repeat( ) that operate on a particular scheduler, that repeat only a certain number of times before terminating, and that repeat sequences of items emitted by a source Observable rather than one particular item.

see also:


create( )

create an Observable from scratch by means of a function

You can create an Observable from scratch by using the create( ) method. You pass this method a function that accepts as its parameter the Observer that is passed to an Observable’s subscribe( ) method. Write the function you pass to create( ) so that it behaves as an Observable — calling the passed-in Observer’s onNext( ), onError( ), and onCompleted( ) methods appropriately. For example:

def myObservable = Observable.create({ anObserver ->
  try {
    anObserver.onNext('One');
    anObserver.onNext('Two');
    anObserver.onNext('Three');
    anObserver.onNext('Four');
    anObserver.onCompleted();
  } catch(Throwable t) {
    anObserver.onError(t);
  }
})

NOTE: A well-formed Observable must call either the observer’s onCompleted( ) method exactly once or its onError( ) method exactly once, and must not thereafter call any of the observer’s other methods.

see also:


defer( )

do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription

Pass defer( ) an Observable factory function (a function that generates Observables), and defer( ) will return an Observable that will call this function to generate its Observable sequence afresh each time a new Observer subscribes.

see also:


range( )

create an Observable that emits a range of sequential integers

To create an Observable that emits a range of sequential integers, pass the starting integer and the number of integers to emit to the range( ) method.

// myObservable emits the integers 5, 6, and 7 before completing:
def myObservable = Observable.range(5, 3);

In calls to range(n,m), values less than 1 for m will result in no numbers being emitted. n may be any integer that can be represented as a BigDecimal — posititve, negative, or zero.

see also:


interval( )

create an Observable that emits a sequence of integers spaced by a given time interval

To create an Observable that emits items spaced by a particular interval of time, pass the time interval and the units of time that interval is measured in (and, optionally, a scheduler) to the interval( ) method.

see also:


empty( ), error( ), and never( )

Observables that can be useful for testing purposes

  • empty( ) creates an Observable that does not emit any items but instead immediately calls the observer’s onCompleted( ) method.
  • error( ) creates an Observable that does not emit any items but instead immediately calls the observer’s onError( ) method.
  • never( ) creates an Observable that does not emit any items, nor does it call either the observer’s onCompleted( ) or onError( ) methods.
println("*** empty() ***");
Observable.empty().subscribe(
  { println("empty: " + it); },             // onNext
  { println("empty: Error encountered"); }, // onError
  { println("empty: Sequence complete"); }  // onCompleted
);

println("*** error() ***");
Observable.error().subscribe(
  { println("error: " + it); },             // onNext
  { println("error: Error encountered"); }, // onError
  { println("error: Sequence complete"); }  // onCompleted
);

println("*** never() ***");
Observable.never().subscribe(
  { println("never: " + it); },             // onNext
  { println("never: Error encountered"); }, // onError
  { println("never: Sequence complete"); }  // onCompleted
);
println("*** END ***");
*** empty() ***
empty: Sequence complete
*** error() ***
error: Error encountered
*** never() ***
*** END ***

see also: