Skip to content

Introduction to raix.reactive

richardszalay edited this page May 20, 2011 · 6 revisions

raix.reactive revolves around IObservable, a collection of data that is asynchronous, meaning the values have not arrived yet. This abstraction means that IObservable can be used for a wide range of asynchronous data types:

  • A future value coming from a remote source (or even representing the end of UI sequence)
  • Events
  • Arrays that are distributed over time
  • Any other data (like timers) that occur over time, even conceptually.

An observer (raix.reactive.IObserver) is the “subscriber” of messages, and does so by exposing three methods: onNext(value), onCompleted() and onError(error)

An observable sequence (raix.reactive.IObservable) is a “publisher” of messages and exposes a subscribeWith method to accept new observers, and calls the observers’ onNext, onCompleted and onError methods at some future time (which might be immediately).

You’ll probably find yourself using the raix.reactive.IObservable.subscribe method, which accepts the onNext, onCompleted and onError methods as Function references. Using subscribe, rather than subscribeWith, allows for closure-oriented programming.

The observable-observer usage contract is (onNext)*(onCompleted|onError)? and operators will maintain this contract, even when applied to a source that does not.

To expand the contract, onNext may be called zero or more times followed by a call to onCompleted or onError (or maybe neither if the sequence never completes). Once onCompleted or onError have been called, no more calls will be received.

IObservable.subscribe (and subscribeWith) returns an raix.reactive.ICancelable which can be used to cancel the subscription and clean up any resources created by them (event listeners, for example). However, if the sequence completes or raises an error, the built in operators will automatically clean up.

Calling ICancelable.cancel() turns out to be quite uncommon, as it’s far simpler to work in the termination of a sequence using other operators:

  • take will terminate after a certain number of values (take(1) is very common for completion events)
  • takeUntil takes a second sequence and will terminate subscriptions to both once the second sequence emits a value or completes
  • takeWhile will evaluate a function everyone a value is received. If the function returns false, the sequence will terminate.

There are many more operators, each with their own termination rules. See Reactive Operators for more information.

Composition

raix.reactive operators are, generally speaking, designing to delay execution until they are subscribed to. This, combined with the fluent interface (most operators return IObservable), mean that the operators can and should be chained.

The the following example:

var mouseDown : IObservable = Observable.fromEvent(stage, MouseEvent.MOUSE_DOWN);
var mouseUp : IObservable = Observable.fromEvent(stage, MouseEvent.MOUSE_UP);
var mouseMove : IObservable = Observable.fromEvent(stage, MouseEvent.MOUSE_MOVE);

var sequenceA : IObservable = mouseMove;

var sequenceB : IObservable = mouseMove.filter(function(me:MouseEvent):Boolean
    {
        return me.stageX > 50;
    });

var sequenceC : IObservable = mouseDown.mapMany(function(down:MouseEvent):IObservable
    {
        return mouseMove.takeUntil(mouseUp);
    });

The three sequence variables are all IObservable sequences that will emit MouseEvent values, but they all represent three different sources:

  • sequenceA contains all mouse movements
  • sequenceB contians mouse movements that occur to the right of the first 50 pixels of the screen
  • sequenceC contains mouse movements that occur between mouse down and mouse up events (ie. a drag)

Glossary

Scheduler An implementation of raix.reactive.scheduling.IScheduler that handles the distribution of workload
Sequence An observable sequence (raix.reactive.IObservable), including one wrapped in another (eg. where, take, select)
Source The sequence on which an instance method was called
Subject An object that is both IObservable and an IObserver. Used as a custom IObservable source
Subscription The raix.reactive.ICancelable returned by IObservable.subscribe or IObservable.subscribeWith
Operator Methods that apply to observable sequences (eg. where, take, select)
Unit raix.reactive.Unit is used to represent void or “no value”
Clone this wiki locally