Subject in RxJava
A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
AsyncSubject
An AsyncSubject
emits the last value (and only the last value) emitted by the source Observable, and only after that source Observable completes. (If the source Observable does not emit any values, the AsyncSubject
also completes without emitting any values.)
BehaviorSubject (Sample: )
When an observer subscribes to a BehaviorSubject
, it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).
PublishSubject (Smaple)
PublishSubject
emits to an observer only those items that are emitted by the source Observable(s) subsequent to the time of the subscription.
Background:
Let’s assume you’d like to emit your own events, but you can’t be sure when those events will be fired or how many of them will be there. Clearly, just() and from() can’t help here and you don’t want to create() an Observable which spins on some state either.
The best would be an object that is both Observable, so clients can chain onto it, and Observer so you can emit values and terminal events as well. The combination of these two is what’s called Subject (or Processor in Reactive-Streams naming).
Subjects are generified and sometimes you want them to emit the same type as received or emit a completely different type.
Flavors
Sometimes, you don’t just want to dispatch your events as you see fit and don’t care about your subscriber audience.
For example, you are a TV network and just throwing out a large amount of TV series and their episodes on a weekly basis. However, your clients can’t keep up with the sheer amount but they don’t want to skip any of the episodes either. Therefore, their smart TV or the cable provider itself offers the capability to cache these episodes, starting at some point, and allow the subscriber to watch it in sequence in its own pace and without leaving out any.
In the programming world, you’d want to emit events and allow clients to run at a different pace but still receive all of your events, perhaps even after you stopped generating those events: a late Subscriber comes in and you’d like to replay all events that has been accumulated during the active times.
This is called a ReplaySubject.
By default, you can create an unbounded ReplaySubject which will cache everything it receives and replays it toSubscribers, including any terminal events.
However, some use cases require limiting the retention time or amount of the cached elements so later Subscribers don’t get everything from the very beginning. The RxJava API offers three additional replay modes:
- createWithSize(n) will retain at most n elements,
- createWithTime(t, u) will retain elements that are younger that than t and
- createWithTimeAndSize(n, t, u) will retain at most n elements which are also younger than t.
This could be enough, but there exist further cases that warrant for an unique Subject implementation.
For example, if one performs an asynchronous computation and wants to emit just a single value followed by a completion event. ReplaySubject does work but is too verbose and the overhead might be inacceptable for such a simple task. Therefore, RxJava offers another Subject called AsyncSubject. It remembers the very last element it received and once onCompleted is called, all currently listening and future listeners will receive just that single value followed by the completion event. But unlike ReplaySubject, if one calls onError on an AsyncSubject, any previously received value is ignored and all Subscribers will just receive the Throwable from then on.
The final case that is covered in RxJava via a Subject variant is when one would like to have a single value stored.Subscribers to it should immediately receive this value and any subsequent value in case a new value is onNext’d on the Subject. It is called BehaviorSubject and some also call it the reactive property. Again, a ReplaySubject of size 1 would also do, but unlike ReplaySubject, sending an onCompleted() will evict the single stored value and subsequent Subscribers will only receive just the onCompleted (or onError) event and not any value. You can create a BehaviorSubject with or without an initial value.