0. Introduction


RxJava is actually to provide a set of asynchronous programming API, this API is based on the observer pattern, and is chained calls, so the logic of the code written using RxJava will be very concise.

 RxJava has the following three basic elements:

  1.  Observable
  2.  Observer
  3. 订阅(subscribe)

 Here’s how all three of these work together:

 First add the dependencies in the gradle file:

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
  1.  Create the watched:
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
});
  1.  Creating an observer:
Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
};

 

observable.subscribe(observer);

  Chained calls can actually be used here:

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});

  There are several types of events sent by the observer, summarized in the following table:

 Type of event
onNext()
When this event is sent, the observer calls back the onNext() method
onError()
When this event is sent, the observer will call back the onError() method, and after this event is sent, no other events will be sent.
onComplete()
When this event is sent, the observer will call back the onComplete() method, and after this event is sent, no other events will be sent.


In fact, you can compare RxJava to making juice. You have a lot of fruits at home (the raw data you want to send), and you want to make some fruit juice, so you have to think about what kind of fruit juice you want to drink. If you want to drink avocado, sorbet, and lemon juice, then you have to mix these three fruits together to make juice (using various operators to transform the data you want to send to the observer), and after you finish juicing, you can drink the juice you want (and send the processed data to the observer).

 Summarized below:

 The following is an explanation of the various common RxJava operators.

 1. Creating operators

 The following is an explanation of the various operators that are used to create a watched.

1.1 create()

 Method Preview:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

  What’s the use:

 Creating an Observed

 How it works:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello Observer");
        e.onComplete();
    }
});


The code above is very simple, create ObservableOnSubscribe and override its subscribe method to send events to the observer via the ObservableEmitter emitter.


The following creates an observer to verify that this observed was created successfully.

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("chan","=============onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d("chan","=============onComplete ");
    }
};
        
observable.subscribe(observer);
        

  Print results:

05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
=============onComplete

1.2 just()

 Method Preview:

public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

  What’s the point?


Create a watched and send events, no more than 10 events can be sent.

 How does it work?

Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});


The above code directly using the chain call, the code is also very simple, not detailed here, look at the printout:

05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 


1.3 The From operator

1.3.1 fromArray()

 Method Preview:

public static <T> Observable<T> fromArray(T... items)

  What’s the point?


This method is similar to just(), except that fromArray can be passed more than 10 variables and can be passed as an array.

 How does it work?

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});


The code is basically the same as just(), and it’s straightforward to see the printout:

05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete 

1.3.2 fromCallable()

 Method Preview:

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

  What’s the point?


Callable here is java.util.concurrent Callable, Callable and Runnable usage is basically the same, except that it will return a result value, the result value is sent to the observer.

 How does it work?

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});

  Print results:

05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1

1.3.3 fromFuture()

 Method Preview:

public static <T> Observable<T> fromFuture(Future<? extends T> future)

  What’s the point?


The Future in the parameter is Future in java.util.concurrent. The role of Future is to add methods such as cancel() to manipulate the Callable, and it can get the value returned by the Callable through the get() method.

 How does it work?

FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
    @Override
    public String call() throws Exception {
        Log.d(TAG, "CallableDemo is Running");
        return "11";
    }
});

Observable.fromFuture(futureTask)
    .doOnSubscribe(new Consumer < Disposable > () {
    @Override
    public void accept(Disposable disposable) throws Exception {
        futureTask.run();
    }
})
.subscribe(new Consumer < String > () {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "================accept " + s);
    }
});


The purpose of doOnSubscribe() is to send events only when you subscribe, which will be explained below.

 Print results:

05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
================accept 11

1.3.4 fromIterable()

 Method Preview:

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

  What’s the point?

 Send a List collection directly to the observer

 How does it work?

List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

  The printout is as follows:

05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
=================onNext 0
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 

1.4 defer()

 Method Preview:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

  What’s the point?


What this method does is it doesn’t create the watched until the watched is subscribed.

 How does it work?

// i 要定义为成员变量
Integer i = 100;
        
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

i = 200;

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

observable.subscribe(observer);

i = 300;

observable.subscribe(observer);

  The printout is as follows:

05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
================onNext 300


Because defer() only creates a new observed when the observer subscribes, it prints once for each subscription, and it all prints the most recent value of i.

1.5 timer()

 Method Preview:

public static Observable<Long> timer(long delay, TimeUnit unit) 
......

  What’s the point?


A value of 0L is sent to the observer when the specified time is reached.

 How does it work?

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "===============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0

1.6 interval()

 Method Preview:

public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......

  What’s the point?


Every so often an event is sent which is a number starting from 0 and incrementing by 1.

 How does it work?

Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5


As you can see from the timing, the event is emitted every 4 seconds to increment the number by 1. Here’s the initialDelay parameter of the third method of interval(), which means the interval between the onSubscribe callback and the onNext callback.

1.7 intervalRange()

 Method Preview:

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

  What’s the point?


You can specify the start value and the number of events to send, otherwise it is the same as interval().

 How does it work?

Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 6


It can be seen that the onNext event is received 5 times and starts from 2.

1.8 range()

 Method Preview:

public static Observable<Integer> range(final int start, final int count)

  What’s the point?

 A sequence of events of a certain range is sent at the same time.

 How does it work?

Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Integer aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe 
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6

1.9 rangeLong()

 Method Preview:

public static Observable<Long> rangeLong(long start, long count)

  What’s the point?


This works like range(), except that the data type is Long.

 How does it work?


The usage is the same as range(), so I won’t repeat it here.


1.10 empty() & never() & error()

 Method Preview:

public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)

  What’s the point?


  1. empty(): sends the onComplete() event directly.
  2.  never(): do not send any event
  3.  error(): sends onError() event

 How does it work?

Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "==================onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});

  Print results:

05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
==================onComplete


The printout of never():

05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe


to the printout of error():

05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
==================onError java.lang.NullPointerException

  2. Conversion operators

2.1 map()

 Method Preview:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

  What’s the point?


A map can convert the type of data sent by an observer to another type.

 How does it work?


The following code converts data of type Integer to String.

Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return "I'm " + integer;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "===================onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "===================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe
===================onNext I'm 1
===================onNext I'm 2
===================onNext I'm 3

2.2 flatMap()

 Method Preview:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
......

  What’s the point?


This method consolidates and processes the elements of the event sequence and returns a new observed.

 How does it work?


flatMap() is actually similar to map(), but flatMap() returns an Observerable. now let’s use an example to illustrate the usage of flatMap().


Suppose a class Person is defined as follows:

public class Person {

    private String name;
    private List<Plan> planList = new ArrayList<>();

    public Person(String name, List<Plan> planList) {
        this.name = name;
        this.planList = planList;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public List<Plan> getPlanList() {
        return planList;
    }

    public void setPlanList(List<Plan> planList) {
        this.planList = planList;
    }

}


The Person class has two variables, name and planList, which represent the person’s name and the plan list, respectively.

 The Plan class is defined as follows:

public class Plan {

    private String time;
    private String content;
    private List<String> actionList = new ArrayList<>();

    public Plan(String time, String content) {
        this.time = time;
        this.content = content;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public List<String> getActionList() {
        return actionList;
    }

    public void setActionList(List<String> actionList) {
        this.actionList = actionList;
    }
}


Now we have a requirement to print out the action of the Plan in each element of the Person collection. Let’s start with map() to fulfill this requirement:

Observable.fromIterable(personList)
.map(new Function < Person, List < Plan >> () {
    @Override
    public List < Plan > apply(Person person) throws Exception {
        return person.getPlanList();
    }
})
.subscribe(new Observer < List < Plan >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Plan > plans) {
        for (Plan plan: plans) {
            List < String > planActionList = plan.getActionList();
            for (String action: planActionList) {
                Log.d(TAG, "==================action " + action);
            }
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});


You can see that onNext() is implemented with a nested for loop, which may require multiple loops if the logic of the code gets complicated.

 Now look at the implementation using flatMap():

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        return Observable.fromIterable(person.getPlanList());
    }
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Plan plan) throws Exception {
        return Observable.fromIterable(plan.getActionList());
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================action: " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});


As you can see from the code, only two flatMap() are needed to fulfill the requirements, and the code logic is very clear.

2.3 concatMap()

 Method Preview:

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)

  What’s the point?


concatMap() and flatMap() are basically the same, except that concatMap() forwards ordered events, while flatMap() is unordered.

2.4 buffer()

 Method Preview:

public final Observable<List<T>> buffer(int count, int skip)
......

  What’s the point?


Get a certain number of events from the events that need to be sent, and put them into a buffer and send them out


As you can see from the results, each time an event is sent, the pointer moves back one element before fetching the value, until the pointer stops fetching when there are no more elements.

2.5 groupBy()

 Method Preview:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

  What’s the point?


Groups the sent data and returns a watched for each group.

 How does it work?

Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function < Integer, Integer > () {
    @Override
    public Integer apply(Integer integer) throws Exception {
        return integer % 3;
    }
})
.subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "====================onSubscribe ");
    }

    @Override
    public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
        Log.d(TAG, "====================onNext ");
        integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "====================GroupedObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "====================GroupedObservable onNext  groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "====================GroupedObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "====================GroupedObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "====================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "====================onComplete ");
    }
});


In the groupBy() method returns the parameter is the name of the group, each return a value, that means that a group will be created, the above code is the 1 ~ 10 data into 3 groups, to see the print results:

05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe 
05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
====================GroupedObservable onSubscribe     ====================GroupedObservable onNext  groupName: 2 value: 5
====================GroupedObservable onNext  groupName: 2 value: 2
====================onNext 
====================GroupedObservable onSubscribe 
====================GroupedObservable onNext  groupName: 0 value: 3
05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
====================GroupedObservable onSubscribe 
====================GroupedObservable onNext  groupName: 1 value: 4
====================GroupedObservable onNext  groupName: 1 value: 1
====================GroupedObservable onNext  groupName: 0 value: 6
====================GroupedObservable onNext  groupName: 2 value: 8
====================GroupedObservable onNext  groupName: 0 value: 9
====================GroupedObservable onNext  groupName: 1 value: 7
====================GroupedObservable onNext  groupName: 1 value: 10
05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete 
====================GroupedObservable onComplete 
====================GroupedObservable onComplete 
====================onComplete 

  You can see that there are 3 groups in the returned result.

2.6 scan()

 Method Preview:

public final Observable<T> scan(BiFunction<T, T, T> accumulator)

  What’s the point?

 Aggregate data in a certain logic.

 How does it work?

Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        Log.d(TAG, "====================apply ");
        Log.d(TAG, "====================integer " + integer);
        Log.d(TAG, "====================integer2 " + integer2);
        return integer + integer2;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});

  Print results:

05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
====================apply 
====================integer 1
====================integer2 2
====================accept 3
====================apply 
05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
====================integer2 3
====================accept 6
====================apply 
====================integer 6
====================integer2 4
====================accept 10
====================apply 
====================integer 10
====================integer2 5
====================accept 15

2.7 window()

 Method Preview:

public final Observable<Observable<T>> window(long count)
......

  What’s the point?


When sending a specified number of events, the events will be divided into a group. the count parameter in window represents the specified number of events, for example, if count is specified as 2, then every time you send 2 data, the 2 data will be divided into a group.

 How does it work?

Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=====================onSubscribe ");
    }

    @Override
    public void onNext(Observable < Integer > integerObservable) {
        integerObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "=====================integerObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "=====================integerObservable onNext " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "=====================integerObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "=====================integerObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=====================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=====================onComplete ");
    }
});

  Print results:

05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe 
05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe 
05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 5
=====================integerObservable onComplete 
=====================onComplete 


As you can see from the result, window() divides the events from 1 to 5 into 3 groups.

 3. Combination operators

3.1 concat()

 Method Preview:

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......

  What’s the point?


Multiple observers can be grouped together and then events can be sent in the order in which they were previously sent. Note that concat() can only send up to 4 events.

 How does it work?

Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print the following:

05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1
================onNext 2
05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8

3.2 concatArray()

 Method Preview:

public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)

  What’s the point?


Works like concat(), but concatArray() can send more than 4 watchers.

 How does it work?

Observable.concatArray(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8),
Observable.just(9, 10))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
================onNext 9
================onNext 10

3.3 merge()

 Method Preview:

 public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......

  What’s the point?


This method does basically the same thing as concat(), except that concat() sends events serially, whereas merge() sends events in parallel.

 How does it work?


Now let’s demonstrate the difference between concat() and merge().

Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
    @Override
    public String apply(Long aLong) throws Exception {
        return "A" + aLong;
    }
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
    @Override
    public String apply(Long aLong) throws Exception {
        return "B" + aLong;
    }
}))
    .subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "=====================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

  The printout is as follows:

05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
......


As you can see from the results, the event sequences for both A and B can be emitted. Replace the above code with concat() to see the printed results:

05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
......


From the results, we can tell that the second observer will send an event only after the first observer has sent an event.


mergeArray() does the same thing as merge(), except that it can send more than 4 watchers, so I won’t go into that here.


3.4 concatArrayDelayError() & mergeArrayDelayError()

 Method Preview:

public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

  What’s the point?


In the concatArray() and mergeArray() methods, if one of the observers sends an Error event, then it stops sending the event, and if you want the onError() event to be delayed until all the observers have sent their events, you can use the concatArrayDelayError() and mergeArrayDelayError()

 How does it work?


First, use concatArray() to verify that sending the onError() event interrupts other watchers from sending events, as follows:

Observable.concatArray(
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onError(new NumberFormatException());
    }
}), Observable.just(2, 3, 4))
    .subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {

    }
});

  Print results:

05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1
===================onError 


As you can tell from the result, it did break, now switch to concatArrayDelayError() with the following code:

Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onError(new NumberFormatException());
    }
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {

    }
});

  The printout is as follows:

05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 4
===================onError 


As you can see from the results, the onError event is sent only after all the observed events have been sent. mergeArrayDelayError() does the same thing, so I won’t go into it again here.

3.5 zip()

 Method Preview:

public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
......

  What’s the point?


Multiple Observers will be merged and combined one by one according to the order in which each Observer sends events, and the final number of events sent will be the same as the minimum number of events in the source Observable.



We can see that the final number of events received is 5, so why the second Observable didn’t send the 6th event? Because the first Observable has already sent the onComplete event before this, so the second Observable will not send the event again.


3.6 combineLatest() & combineLatestDelayError()

 Method Preview:

public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
....... 

  What’s the point?


combineLatest() works like zip(), but the sequence of events sent by combineLatest() is related to the timeline in which they were sent; when all Observables in combineLatest() have sent an event, as soon as one of them sends an event, that event will be sent in combination with the Whenever one of the Observables sends an event, the event will be combined with the most recent events sent by the other Observables, which may still be a bit abstract, so take a look at the following example code.

Analyzing the above results, we can see that when the A1 event is sent, no binding occurs at all because B does not send any event. When B sends the B1 event, it combines with the most recent event A2 sent by A to form A2B1, so that as soon as an observer sends an event after it, it combines with the most recent event sent by other observers.


Because combineLatestDelayError() is just more delayed sending onError() functionality, I won’t repeat it here.

3.7 reduce()

 Method Preview:

public final Maybe<T> reduce(BiFunction<T, T, T> reducer)

  What’s the point?


The difference between scan() and the scan() operator is that scan() sends an event to the observer every time the data is processed, whereas reduce() aggregates all the data together before sending an event to the observer.

 How does it work?

Observable.just(0, 1, 2, 3)
.reduce(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        int res = integer + integer2;
        Log.d(TAG, "====================integer " + integer);
        Log.d(TAG, "====================integer2 " + integer2);
        Log.d(TAG, "====================res " + res);
        return res;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "==================accept " + integer);
    }
});

  Print results:

05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0
====================integer2 1
====================res 1
====================integer 1
====================integer2 2
====================res 3
====================integer 3
====================integer2 3
====================res 6
==================accept 6


As you can see from the results, it’s actually the first 2 data that are aggregated and then aggregated with the last 1 until there is no more data.

3.8 collect()

 Method Preview:

public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)

  What’s the point?

 Collect data into data structures.

 How does it work?

Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
    @Override
    public ArrayList < Integer > call() throws Exception {
        return new ArrayList < > ();
    }
},
new BiConsumer < ArrayList < Integer > , Integer > () {
    @Override
    public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
        integers.add(integer);
    }
})
.subscribe(new Consumer < ArrayList < Integer >> () {
    @Override
    public void accept(ArrayList < Integer > integers) throws Exception {
        Log.d(TAG, "===============accept " + integers);
    }
});

  Print results:

05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]


3.9 startWith() & startWithArray()

 Method Preview:

public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)

  What’s the point?


Append events before sending, startWith() appends one event, startWithArray() can append multiple events. The appended event will be sent first.

 How does it work?

Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});

  Print results:

05-22 17:08:21.282 4505-4505/com.example.rxjavademo D/chan: ================accept 1
================accept 2
================accept 3
================accept 4
================accept 5
================accept 6
================accept 7

3.10 count()

 Method Preview:

public final Single<Long> count()

  What’s the point?

 Returns the number of events sent by the watched.

 How does it work?

Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "=======================aLong " + aLong);
    }
});

  Print results:

05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 3

  4. Functional operators

4.1 delay()

 Method Preview:

public final Observable<T> delay(long delay, TimeUnit unit)

  What’s the point?

 Delay a period of events to send an event.

 How does it work?

Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=======================onSubscribe");
    }
});

  Here’s a two-second delay before sending the event to see the printout:

05-22 20:53:43.618 16880-16880/com.example.rxjavademo D/chan: =======================onSubscribe
05-22 20:53:45.620 16880-16906/com.example.rxjavademo D/chan: =======================onNext 1
05-22 20:53:45.621 16880-16906/com.example.rxjavademo D/chan: =======================onNext 2
=======================onNext 3
=======================onSubscribe


As you can see from the printout, onSubscribe calls back 2 seconds before onNext calls back.

4.2 doOnEach()

 Method Preview:

public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)

  What’s the point?


Observable calls this method before sending each event.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        //      e.onError(new NumberFormatException());
        e.onComplete();
    }
})
.doOnEach(new Consumer < Notification < Integer >> () {
    @Override
    public void accept(Notification < Integer > integerNotification) throws Exception {
        Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:07:05.547 19867-19867/? D/chan: ==================onSubscribe 
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete 


As you can see from the results, the doOnEach method is called before each event is sent, and the value sent by onNext() can be retrieved.

4.3 doOnNext()

 Method Preview:

public final Observable<T> doOnNext(Consumer<? super T> onNext)

  What’s the point?


Observable calls this method before every onNext() is sent.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnNext(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "==================doOnNext " + integer);
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:09:36.769 20020-20020/com.example.rxjavademo D/chan: ==================onSubscribe 
==================doOnNext 1
==================onNext 1
==================doOnNext 2
==================onNext 2
==================doOnNext 3
==================onNext 3
==================onComplete 

4.4 doAfterNext()

 Method Preview:

public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)

  What’s the point?


Observable calls this method after every onNext().

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doAfterNext(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "==================doAfterNext " + integer);
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:15:49.215 20432-20432/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================doAfterNext 1
==================onNext 2
==================doAfterNext 2
==================onNext 3
==================doAfterNext 3
==================onComplete 

4.5 doOnComplete()

 Method Preview:

public final Observable<T> doOnComplete(Action onComplete)

  What’s the point?


Observable calls this method before every onComplete() is sent.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnComplete ");
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:32:18.031 20751-20751/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnComplete 
==================onComplete 

4.6 doOnError()

 Method Preview:

public final Observable<T> doOnError(Consumer<? super Throwable> onError)

  What’s the point?


Observable calls this method before every onError() is sent.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.doOnError(new Consumer < Throwable > () {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "==================doOnError " + throwable);
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:35:04.150 21051-21051/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnError java.lang.NullPointerException
==================onError 

4.7 doOnSubscribe()

 Method Preview:

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)

  What’s the point?


Observable calls this method before every onSubscribe() is sent.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnSubscribe(new Consumer < Disposable > () {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "==================doOnSubscribe ");
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:39:25.778 21245-21245/? D/chan: ==================doOnSubscribe 
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 

4.8 doOnDispose()

 Method Preview:

public final Observable<T> doOnDispose(Action onDispose)

  What’s the point?


This method is called back after Disposable’s dispose() is called.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnDispose ");
    }
})
.subscribe(new Observer < Integer > () {
    private Disposable d;
    
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 09:55:48.122 22023-22023/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================doOnDispose 

4.9 doOnLifecycle()

 Method Preview:

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)

  What’s the point?


The callback method that calls back the first parameter of the method before calling back onSubscribe can be used to decide whether to unsubscribe.

 How does it work?


The callback method for the second parameter of doOnLifecycle() works the same as doOnDispose(), which is now explained with the following example:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnLifecycle(new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "==================doOnLifecycle accept");
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnLifecycle Action");
    }
})
.doOnDispose(
    new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnDispose Action");
        }
})
.subscribe(new Observer<Integer>() {
    private Disposable d;
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
    
});

  Print results:

05-23 10:20:36.345 23922-23922/? D/chan: ==================doOnLifecycle accept
==================onSubscribe 
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action


You can see that doOnDispose() and doOnLifecycle() are both called back when an unsubscribe operation is performed in the onNext() method.


If you use doOnLifecycle to unsubscribe, take a look at the printout:

05-23 10:32:20.014 24652-24652/com.example.rxjavademo D/chan: ==================doOnLifecycle accept
==================onSubscribe 


You can see that neither doOnDispose Action nor doOnLifecycle Action is called back.


4.10 doOnTerminate() & doAfterTerminate()

 Method Preview:

public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)

  What’s the point?


doOnTerminate is called before onError or onComplete is sent, and doAfterTerminate is called after onError or onComplete is sent.

 How does it work?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
//      e.onError(new NullPointerException());
        e.onComplete();
    }
})
.doOnTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnTerminate ");
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
    
});

  Print results:

05-23 10:00:39.503 22398-22398/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
05-23 10:00:39.504 22398-22398/com.example.rxjavademo D/chan: ==================onNext 3
==================doOnTerminate 
==================onComplete 


doAfterTerminate is similar, so I won’t repeat it here.

4.11 doFinally()

 Method Preview:

public final Observable<T> doFinally(Action onFinally)

  What’s the point?

 Callback this method after all events have been sent.

 How does it work?


The question you may have is what is the difference between doFinally() and doAfterTerminate()? The difference is that doAfterTerminate() will not be called if you unsubscribe, whereas doFinally() will be called no matter what and will be at the end of the event sequence.

 Now illustrate with the following example:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doFinally(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doFinally ");
    }
})
.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnDispose ");
    }
})
.doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doAfterTerminate ");
    }
})
.subscribe(new Observer<Integer>() {
    private Disposable d;
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 10:10:10.469 23196-23196/? D/chan: ==================onSubscribe 
05-23 10:10:10.470 23196-23196/? D/chan: ==================onNext 1
==================doOnDispose 
==================doFinally 


You can see that doAfterTerminate() will not be called back if the dispose() method is called.


Now try commenting out dispose() and see what prints:

05-23 10:13:34.537 23439-23439/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 
==================doAfterTerminate 
==================doFinally 


doAfterTerminate() has been called back successfully, doFinally() will still be at the end of the event sequence.

4.12 onErrorReturn()

 Method Preview:

public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)

  What’s the point?


When called back after receiving an onError() event, the returned value calls back the onNext() method and ends the event sequence normally.

 How does it work?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorReturn(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) throws Exception {
        Log.d(TAG, "==================onErrorReturn " + throwable);
        return 404;
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 18:35:18.175 19239-19239/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorReturn java.lang.NullPointerException
==================onNext 404
==================onComplete 

4.13 onErrorResumeNext()

 Method Preview:

public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)

  What’s the point?


When the onError() event is received, a new Observable is returned and the event sequence is ended normally.

 How does it work?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
        Log.d(TAG, "==================onErrorResumeNext " + throwable);
        return Observable.just(4, 5, 6);
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 18:43:10.910 26469-26469/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorResumeNext java.lang.NullPointerException
==================onNext 4
==================onNext 5
==================onNext 6
==================onComplete 

4.14 onExceptionResumeNext()

 Method Preview:

public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)

  What’s the point?


This is basically the same as onErrorResumeNext(), but this method only catches Exception.

 How does it work?


Let’s try onExceptionResumeNext() first to see if it catches Errors.

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Error("404"));
    }
})
.onExceptionResumeNext(new Observable<Integer>() {
    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observer.onNext(333);
        observer.onComplete();
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 22:23:08.873 1062-1062/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
05-23 22:23:08.874 1062-1062/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onError 


As you can see from the printout, the observer receives the onError() event, proving that onErrorResumeNext() cannot catch Error events.


Change e.onError(new Error(“404”)) to e.onError(new Exception(“404”)) for the observed, and now see if you can catch the Exception event:

05-23 22:32:14.563 10487-10487/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 333
==================onComplete 


As you can see from the printout, this method successfully catches the Exception event.

4.15 retry()

 Method Preview:

public final Observable<T> retry(long times)
......

  What’s the point?


If an error event occurs, all event sequences will be resent. times is the number of times it will be resent.

 How does it work?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retry(2)
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError 

4.16 retryUntil()

 Method Preview:

public final Observable<T> retryUntil(final BooleanSupplier stop)

  What’s the point?


After an error event occurs, you can use this method to determine whether to continue sending the event.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retryUntil(new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
        if (i == 6) {
            return true;
        }
        return false;
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-23 22:57:32.905 23063-23063/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
05-23 22:57:32.906 23063-23063/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onError 

4.17 retryWhen()

 Method Preview:

public final void safeSubscribe(Observer<? super T> s)

  What’s the point?


This method is called when an exception or error event is received by an observer, and returns a new observer. If the returned observer sends an Error event, the previous observer will not continue to send events; if it sends a normal event, the previous observer will continue to retry sending events.

 How does it work?

Observable.create(new ObservableOnSubscribe < String > () {
    @Override
    public void subscribe(ObservableEmitter < String > e) throws Exception {
        e.onNext("chan");
        e.onNext("ze");
        e.onNext("de");
        e.onError(new Exception("404"));
        e.onNext("haha");
    }
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
            @Override
            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                    return Observable.just("可以忽略的异常");
                } else {
                    return Observable.error(new Throwable("终止啦"));
                }
            }
        });
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ==================onSubscribe 
05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ==================onNext chan
==================onNext ze
==================onNext de
05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ==================onError java.lang.Throwable: 终止啦


Change onError(new Exception(“404”)) to onError(new Exception(“303”)) and see what prints:

==================onNext chan
05-24 09:54:08.653 29694-29694/? D/chan: ==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......

  As you can see from the results, the message will be sent over and over again.

4.18 repeat()

 Method Preview:

public final Observable<T> repeat(long times)
......

  What’s the point?


Repeatedly sends events to the watched, times is the number of times it has been sent.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.repeat(2)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});

  Print results:

05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onSubscribe 
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 1
===================onNext 2
===================onNext 3
05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onComplete 

  As you can see from the results, the event was sent twice.

4.19 repeatWhen()

 Method Preview:

public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

  What’s the point?


This method can return a new watched to set up some logic to decide whether to send the event repeatedly.

 How does it work?


There are three cases, if the new watched returns onComplete or onError event, the old watched will not continue to send events. If the watched returns any other event, the event will be sent again.


Now experiment with sending the onComplete event with the following code:

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {
        return Observable.empty();
    //  return Observable.error(new Exception("404"));
    //  return Observable.just(4); null;
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});

  Print results:

05-24 11:44:33.486 9379-9379/com.example.rxjavademo D/chan: ===================onSubscribe 
05-24 11:44:33.487 9379-9379/com.example.rxjavademo D/chan: ===================onComplete 


Here’s a direct look at the printout of the onError event and other events sent.

 Send onError to print the result:

05-24 11:46:29.507 9561-9561/com.example.rxjavademo D/chan: ===================onSubscribe 
05-24 11:46:29.508 9561-9561/com.example.rxjavademo D/chan: ===================onError 

  Sends printouts of other events:

05-24 11:48:35.844 9752-9752/com.example.rxjavademo D/chan: ===================onSubscribe 
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete 

4.20 subscribeOn()

 Method Preview:

public final Observable<T> subscribeOn(Scheduler scheduler)

  What’s the point?


Specifies the thread of the watched. Note that if this method is called multiple times, only the first time is valid.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
//.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});


Now, without calling the subscribeOn() method, let’s see the printout:

05-26 10:40:42.246 21466-21466/? D/chan: ======================onSubscribe
05-26 10:40:42.247 21466-21466/? D/chan: =========================currentThread name: main
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete

  You can see that the name of the thread that prints the watched is the main thread.


Then call subscribeOn(Schedulers.newThread()) to see the printout:

05-26 10:43:26.964 22530-22530/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:43:26.966 22530-22569/com.example.rxjavademo D/chan: =========================currentThread name: RxNewThreadScheduler-1
05-26 10:43:26.967 22530-22569/com.example.rxjavademo D/chan: ======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete

  You can see that the printout is being watched in a new thread.

 Now let’s see if multiple calls will work, the code is as follows:

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {@Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});

  Print results:

05-26 10:47:20.925 23590-23590/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:47:20.930 23590-23629/com.example.rxjavademo D/chan: =========================currentThread name: RxComputationThreadPool-1
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete


You can see that the second mobilization of subscribeOn(Schedulers.newThread()) has no effect.

4.21 observeOn()

 Method Preview:

public final Observable<T> observeOn(Scheduler scheduler)

  What’s the point?

 Specifies the thread of the observer, which takes effect every time it is specified.

 How does it work?

Observable.just(1, 2, 3)
.observeOn(Schedulers.newThread())
.flatMap(new Function < Integer, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Integer integer) throws Exception {
        Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
        return Observable.just("chan" + integer);
    }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
        Log.d(TAG, "======================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});

  Print results:

05-26 10:58:04.593 25717-25717/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:58:04.594 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
05-26 10:58:04.595 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
05-26 10:58:04.617 25717-25717/com.example.rxjavademo D/chan: ======================onNext Thread name main
======================onNext chan1
======================onNext Thread name main
======================onNext chan2
======================onNext Thread name main
======================onNext chan3
05-26 10:58:04.618 25717-25717/com.example.rxjavademo D/chan: ======================onComplete


As you can see from the printout, observeOn successfully switched threads.

 The following table summarizes the schedulers in RxJava:

Schedulers.computation( ) For use with computational tasks such as event loops and callback processing
Schedulers.immediate( ) current thread
Schedulers.io( )
Used for IO-intensive tasks if asynchronous blocking IO operations.
Schedulers.newThread( ) Create a new thread
AndroidSchedulers.mainThread()
Android’s UI thread for manipulating the UI.

 5. Filtering operators

5.1 filter()

 Method Preview:

public final Observable<T> filter(Predicate<? super T> predicate)

  What’s the point?


Filters the events sent by the watched by some logic, if it returns true then the event will be sent, otherwise it will not be sent.

 How does it work?

 Observable.just(1, 2, 3)
    .filter(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 2;
        }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});


The above code only sends events less than 2. Take a look at the printout:

05-24 22:57:32.562 12776-12776/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onComplete 

5.2 ofType()

 Method Preview:

public final <U> Observable<U> ofType(final Class<U> clazz)

  What’s the point?

 You can filter events that do not match this type

 How does it work?

Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-24 23:04:24.752 13229-13229/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
05-24 23:04:24.753 13229-13229/? D/chan: ==================onComplete 

5.3 skip()

 Method Preview:

public final Observable<T> skip(long count)
.......

  What’s the point?


Skip some events in positive order, count represents the number of events skipped.

 How does it work?

Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-24 23:13:50.448 13831-13831/? D/chan: ==================onSubscribe 
05-24 23:13:50.449 13831-13831/? D/chan: ==================onNext 3
==================onComplete 


skipLast() is also used to skip certain events, but it is used to skip later events in a positive order, so we won’t explain it here.

5.4 distinct()

 Method Preview:

public final Observable<T> distinct() 

  What’s the point?

 Filters duplicate events in the event sequence.

 How does it work?

Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print Result.

05-24 23:19:44.334 14206-14206/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 

5.5 distinctUntilChanged()

 Method Preview:

public final Observable<T> distinctUntilChanged()

  What’s the point?

 Filtering out successive repeating events

 How does it work?

Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results.

05-24 23:22:35.985 14424-14424/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 2
==================onNext 1
==================onComplete 


Because there are two consecutive occurrences of 3 in the event sequence, the second 3 is not issued.

5.6 take()

 Method Preview:

public final Observable<T> take(long count)
......

  What’s the point?

 Controls the number of events received by the observer.

 How does it work?

Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

  Print results:

05-24 23:28:32.899 14704-14704/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 


The purpose of takeLast() is to control the observer to accept only the later things in the event sequence, so I won’t explain it here, you can try it yourself.

5.7 debounce()

 Method Preview:

public final Observable<T> debounce(long timeout, TimeUnit unit)
......

  What’s the point?


If the time interval between two events is less than the set time interval then the previous event is not sent to the observer.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        Thread.sleep(900);
        e.onNext(2);
    }
})
.debounce(1, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});

  Print results:

05-25 20:39:10.512 17441-17441/com.example.rxjavademo D/chan: ===================onSubscribe 
05-25 20:39:12.413 17441-17478/com.example.rxjavademo D/chan: ===================onNext 2


You can see that event 1 was not sent out, now change the interval to 1000 and see the printout:

05-25 20:42:10.874 18196-18196/com.example.rxjavademo D/chan: ===================onSubscribe 
05-25 20:42:11.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 1
05-25 20:42:12.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 2


throttleWithTimeout() does the same thing as this method, so I won’t repeat it here.


5.8 firstElement() && lastElement()

 Method Preview:
public final Maybe<T> firstElement()
public final Maybe<T> lastElement()

  What’s the point?


firstElement() takes the first element of the event sequence and lastElement() takes the last element of the event sequence.

 How does it work?

Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================firstElement " + integer);
    }
});

Observable.just(1, 2, 3, 4)
.lastElement()
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================lastElement " + integer);
    }
});

  Print results:

05-25 20:47:22.189 19909-19909/? D/chan: ====================firstElement 1
====================lastElement 4


5.9 elementAt() & elementAtOrError()

 Method Preview:

public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)

  What’s the point?


elementAt() can be specified to retrieve events from a sequence of events, but nothing happens if the index entered exceeds the total number of events in the sequence. In this case, use elementAtOrError() if you want to raise an exception.

 How does it work?

Observable.just(1, 2, 3, 4)
.elementAt(0)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});

  Print results:

05-25 20:56:22.266 23346-23346/com.example.rxjavademo D/chan: ====================accept 1


Changing the value of elementAt() to 5 is not printing results because there is no element that satisfies the condition.


Replace elementAt() with elementAtOrError() with the following code:

Observable.just(1, 2, 3, 4)
.elementAtOrError(5)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});

  Print results:

io.reactivex.exceptions.OnErrorNotImplementedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
at io.reactivex.Observable.subscribe(Observable.java: 10903)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
at io.reactivex.Single.subscribe(Single.java: 2707)
at io.reactivex.Single.subscribe(Single.java: 2693)
at io.reactivex.Single.subscribe(Single.java: 2664)
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
at android.app.Activity.performCreate(Activity.java: 6942)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
at android.app.ActivityThread. - wrap14(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
at android.os.Handler.dispatchMessage(Handler.java: 102)
at android.os.Looper.loop(Looper.java: 154)
at android.app.ActivityThread.main(ActivityThread.java: 6682)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
Caused by: java.util.NoSuchElementException
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117) 
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110) 
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36) 
at io.reactivex.Observable.subscribe(Observable.java: 10903) 
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37) 
at io.reactivex.Single.subscribe(Single.java: 2707) 
at io.reactivex.Single.subscribe(Single.java: 2693) 
at io.reactivex.Single.subscribe(Single.java: 2664) 
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103) 
at android.app.Activity.performCreate(Activity.java: 6942) 
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126) 
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880) 
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988) 
at android.app.ActivityThread. - wrap14(ActivityThread.java) 
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631) 
at android.os.Handler.dispatchMessage(Handler.java: 102) 
at android.os.Looper.loop(Looper.java: 154) 
at android.app.ActivityThread.main(ActivityThread.java: 6682) 
at java.lang.reflect.Method.invoke(Native Method) 
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520) 
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410) 


A NoSuchElementException is thrown.

 6. Conditional operators

6.1 all()

 Method Preview:

public final Observable<T> ambWith(ObservableSource<? extends T> other)

  What’s the point?


Determines whether the sequence of events all satisfy a certain event, if all satisfy then return true, otherwise return false.

 How does it work?

Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 5;
    }
})
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "==================aBoolean " + aBoolean);
    }
});

  Print results:

05-26 09:39:51.644 1482-1482/com.example.rxjavademo D/chan: ==================aBoolean true

6.2 takeWhile()

 Method Preview:

public final Observable<T> takeWhile(Predicate<? super T> predicate)

  What’s the point?


Conditions can be set so that a piece of data is sent when it meets the conditions, and vice versa.

 How does it work?

Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});

  Print results:

05-26 09:43:14.634 3648-3648/com.example.rxjavademo D/chan: ========================integer 1
========================integer 2

6.3 skipWhile()

 Method Preview:

public final Observable<T> skipWhile(Predicate<? super T> predicate)

  What’s the point?


Conditions can be set so that a certain data is not sent when the condition is met, and vice versa.

 How does it work?

Observable.just(1, 2, 3, 4)
.skipWhile(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});

  Print results:

05-26 09:47:32.653 4861-4861/com.example.rxjavademo D/chan: ========================integer 3
========================integer 4

6.4 takeUntil()

 Method Preview:

public final Observable<T> takeUntil(Predicate<? super T> stopPredicate

  What’s the point?


A condition can be set so that when the event meets this condition, the next event will not be sent.

 How does it work?

Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer > 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});

  Print results:

05-26 09:55:12.918 7933-7933/com.example.rxjavademo D/chan: ========================integer 1
========================integer 2
05-26 09:55:12.919 7933-7933/com.example.rxjavademo D/chan: ========================integer 3
========================integer 4

6.5 skipUntil()

 Method Preview:

public final <U> Observable<T> skipUntil(ObservableSource<U> other)

  What’s the point?


When the Observable in skipUntil() has sent an event, the original Observable will only send the event to the observer.

 How does it work?

Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "========================onSubscribe ");
    }

    @Override
    public void onNext(Long along) {
        Log.d(TAG, "========================onNext " + along);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "========================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "========================onComplete ");
    }
});

  Print results:

05-26 10:08:50.574 13023-13023/com.example.rxjavademo D/chan: ========================onSubscribe 
05-26 10:08:53.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 4
05-26 10:08:54.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 5
========================onComplete 


As you can see from the results, the Observable in skipUntil() does not send events to the observer.

6.6 sequenceEqual()

 Method Preview:

public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
......

  What’s the point?


Determines if the events sent by both Observables are the same.

 How does it work?

Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1, 2, 3))
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "========================onNext " + aBoolean);
    }
});

  Print results:

05-26 10:11:45.975 14157-14157/? D/chan: ========================onNext true

6.7 contains()

 Method Preview:

public final Single<Boolean> contains(final Object element)

  What’s the point?


Determines if the event sequence contains an element, returns true if it does and false if it doesn’t.

 How does it work?

Observable.just(1, 2, 3)
.contains(3)
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "========================onNext " + aBoolean);
    }
});

  Print results:

05-26 10:14:23.522 15085-15085/com.example.rxjavademo D/chan: ========================onNext true

6.8 isEmpty()

 Method Preview:

public final Single<Boolean> isEmpty()

  What’s the point?

 Determines if the event sequence is empty.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onComplete();
    }
})
.isEmpty()
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "========================onNext " + aBoolean);
    }
});

  Print results:

05-26 10:17:16.725 16109-16109/com.example.rxjavademo D/chan: ========================onNext true

6.9 amb()

 Method Preview:

public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)

  What’s the point?


amb() is passed a collection of Observables, but only events from the first Observable that sends an event will be sent, the rest of the Observables will be discarded.

 How does it work?

ArrayList < Observable < Long >> list = new ArrayList < > ();

list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));

Observable.amb(list)
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "========================aLong " + aLong);
    }
});

  Print results:

05-26 10:21:29.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 6
05-26 10:21:30.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 7
05-26 10:21:31.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 8
05-26 10:21:32.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 9
05-26 10:21:33.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 10

6.10 defaultIfEmpty()

 Method Preview:

public final Observable<T> defaultIfEmpty(T defaultItem)

  What’s the point?


This method can be utilized to send a value if the observer is only sending an onComplete() event.

 How does it work?

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onComplete();
    }
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================onNext " + integer);
    }
});

  Print results:

05-26 10:26:56.376 19249-19249/com.example.rxjavademo D/chan: ========================onNext 666


RxJava common use has been introduced almost, I believe that if you have mastered the use of these operators, then the use of RxJava will no longer be a problem.

By lzz

Leave a Reply

Your email address will not be published. Required fields are marked *