Rx Java headsup for Interview

kapil sharma
6 min readJun 27, 2018

--

  1. Difference b/w Observable and Flowable: Flowable has backpressure because of request method of Subscription where as Observable does not have backpressure. Flowable also has reactive stream.

Reactive Stream ( Stream: we receive a continuous flow of data — a stream)

Reactive Stream is an initiative to provide standard for asynchronous stream processing with non-blocking backpressure.

RxJava Source: Ways to create Source Observables

  1. Single: Either succeed with an item or error. It does not have backpressure.
  2. Completable: Either complete or error. It same as method with void return type. It is like reactive runnable. It does not have backpressure.
  3. Maybe: It is like optional which either succeed with an item, complete or error. It does not have backpressure.

Comosite Disposable:

  1. Backpressure: Sometimes, an Observable produces events so quickly that an Observer downstream can’t keep up with them. When this happens, you’ll often experience a MissingBackpressureException

RxJava offers a few ways to manage backpressure but picking the right one depends on your situation.

Approach 1: Reducing Number of Items

If your application cares about only latest items at a given point of time and doesn’t need all the items emitted by an observable then applying this backpressure strategy of reducing number of items is the right choice, which can be implemented using RxJava operators like take, debounce, range etc.

Approach 2: Collecting Item

In this strategy of controlling backpressure, items are not reduced or dropped, but items are collected and emitted as a collection using RxJava operators.

This way consumer receives collection of items which are emitted by source observable and it is up to consumer to decide which item to process out of the collection.

Observable<Observable<Integer>> observable = Observable.range(1, 133).window(3);
observable.subscribe(s -> { LOGGER.info(“next window “);
s.subscribe(i -> LOGGER.info(“items in window “+i));});

Approach 3: Backpressure Mode

BackpressureMode.NONE and BackpressureMode.ERROR
In both of these modes, emitted events aren’t backpressured. A MissingBackpressureException is thrown when observeOn’s internal 16-element sized buffer overflows.

BackpressureMode.BUFFER
In this mode, an unbounded buffer with an initial size of 128 is created. Items emitted too quickly are buffered unboundedly. If the buffer isn’t drained, items continue to accumulate until memory is exhausted. This results in an OutOfMemoryException.

BackpressureMode.DROP
This mode uses a fixed buffer of size 1. If the downstream observable can’t keep up, the first item is buffered and subsequent emissions are dropped. When the consumer is ready to take the next value, it receives the first value emitted by the source Observable.

BackpressureMode.LATEST
This mode is similar to BackpressureMode.DROP because it also uses a fixed buffer of size 1. However, rather than buffering the first item and dropping subsequent items, BackpressureMode.LATEST replaces the item in the buffer with the latest emission. When the consumer is ready to take the next value, it receives the latest value emitted by the source Observable.

Approach 4: Reactive Pull

using reactive pull strategy is the right choice. In reactive pull, subscriber requests required number of items from observable by calling request().

In RxJava2, Flowable needs to be used to utilize reactive pull backpressure as observable in RxJava2 is non-backpressured.

Flowable<Integer> observable = Flowable.range(1, 133);
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override public void onStart() {
request(1);
}
@Override public void onNext(Integer t) {
LOGGER.info(“item “+t);
request(1);
}
@Override public void onError(Throwable t) {
LOGGER.info(“”+t);
}
@Override public void onComplete() {
LOGGER.info(“complete”);
}
});

For example, you can use window operator on source observable, which emits a collection with specified number of items in it.

  1. Difference b/w map and concatMap

Both map and flatMap apply a transformational function on each item emitted by an Observable.

Map: if you want to change the emitted item to some format by some operation and return single item again to subscriber.

Flatmap: if emitted single item can be further broken down into multiple items by some operation but subscriber want single-single items only.

In this example, the map operator applies the split function to each string and emits one item containing an array of strings. Use this when you want to transform one emitted item into another.

Sometimes, the function we apply returns multiple items, and we want to add them to a single stream. In this instance, flatMap is a good candidate. In the example above the flatMap operator “flattens” the array of words emitted into a single sequence.

ConcatMap: In above figure of flatMap, sequence of finally emitted items are not guaranteed. ConcatMap can be use to overcome from this situation.

Callable: A static helper that’s great for wrapping a simple synchronous API and transforming it into a reactive one. As an added bonus, fromCallable also handles checked exceptions.

  1. Debounce: meaning wait before emitting data like debounce(400, TimeUnit.MILLISECONDS)
  2. Cold Observable : Consider an API which returns an rx-java Observable. It need a subscription to start emitting items.
  3. Hot Observable: Like View Click events. Hot Observable on the other hand does not really need a subscription to start emitting items.

RxJava Schedulers

Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.

Let’s summarize available Scheduler types and their common uses:

  1. Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.
  2. Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.
  3. Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.
  4. Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.
  5. Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.
  6. Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.
  7. Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.

WARNING: Be careful writing multi-threaded code using unbounded thread Schedulers such as Schedulers.io() and Schedulers.newThread(). Depending on your data stream and the transformations you apply to it, it’s easier than you think to flood your system with threads.

What is difference between zip and combineLatest?

What is difference between merge and concat?

--

--

No responses yet