As WE use RxJava more and more, we discover an uncomfortable truth:

“Most of RxJava’s operators are not thread-safe.”

Misuse of RxJava in some multithreaded scenarios can occur unexpectedly.

The Rx operator is not thread-safe

Many people define RxJava as an asynchronous, responsive framework, since the framework is designed for asynchronous processing is not thread safe?

Yes, supporting asynchrony does not mean supporting concurrency

What happens when threads are restless? Most of the operators in RxJava are thread-unsafe, and when multiple threads simultaneously emit data like a stream, the results of the operators may not be as expected.

Take a common operator take(n) as an example:

@JvmStatic
fun main(args: Array<String>) {
   val numberOfThreads = 10
   repeat(1000) {
       println("Iteration = $it")

       val publishSubject = PublishSubject.create<Int> ()val actuallyReceived = AtomicInteger()

       publishSubject.take(3).subscribe { 
       		actuallyReceived.incrementAndGet() 
       }

       val latch = CountDownLatch(numberOfThreads)
       var threads = listOf<Thread>()

       (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }

        threads.forEach { it.start() }
        latch.await()

        check(actuallyReceived.get() = =3)}}Copy the code

Execute the above code and always exit unexpectedly because the result of the take is not as expected

Take a look at the take source code:

public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
    final long limit;

    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }
    protected void subscribeActual(Observer<? super T> observer) {
        this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        boolean done;
        Disposable upstream;
        long remaining;

        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

   
        public void onNext(T t) {
            if (!this.done && this.remaining-- > 0L) {
                boolean stop = this.remaining == 0L;
                this.downstream.onNext(t);
                if (stop) {
                    this.onComplete();
                }
            }

        }
    }
}
Copy the code

As expected, Remaining — does not have any locking operations to make it thread-safe.


The Observable Contract

Rx explicitly tells us this in the Observable definition:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

Reactivex. IO/documentati…

Happens-before relationship requires that we ensure the order in which stream data is entered to avoid concurrent behavior. According to the official explanation, this avoids some of the performance degradation associated with locking operations and thus ensures thread-safety only when necessary.

Thread-safe operators

So which operators are thread-safe?

RxJava has a wide variety of operators, which are difficult to remember one by one. Basically, they can be distinguished according to this principle:

  • None of the operators that operate on individual Observables are thread-safe, such as the usual take(n), map(), distinctUntilChanged(), etc., except for those with scheduler arguments, such as window(… , the scheduler) and debounce (… The scheduler), etc
  • Operators that operate multiple Observables are thread-safe, such as merge(), combineLatest(), zip()

The code might look something like this:

fun operatorThreadSafety(a) = if (operator.worksWithOneObservable() &&  
    operator.supportsScheduling == false) {
    Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
} else {
    Operator.MOST_LIKELY_THREAD_SAFE
}
Copy the code


Subject thread safety

I think the use of Subject requires more attention than the thread-safety of the operator. None of the commonly used subjects are thread-safe (except SerializedSubject), and it is Subject that is most prone to concurrent operations. For example, we often use Subject as a repeater and asynchronous onNext sends data to Subject. The previous take example is such a scenario.

Even worse, we often work with observeOn for thread switching, and observeOn itself is not thread-safe. If you look at its source code, observeOn uses a thread-safe queue to cut threads

queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
Copy the code

Therefore, the following code is bound to have problems in a concurrent environment:

@JvmStatic
fun main(args: Array<String>) {
    val numberOfThreads = 10000

 	  val publishSubject = PublishSubject.create<Int> ()val actuallyReceived = AtomicInteger()

    publishSubject
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            actuallyReceived.incrementAndGet()
        }

    val latch = CountDownLatch(numberOfThreads)
    var threads = listOf<Thread>()

    (0..numberOfThreads).forEach {
        threads += thread(start = false) {
            publishSubject.onNext(it)
            latch.countDown()
        }
    }

    threads.forEach { it.start() }
    latch.await()

    print("actuallyReceived: $actuallyReceived")}Copy the code

Because observeOn cuts the thread, it always ends up missing some data


Using SerializedSubject

In a concurrent environment, the conversion of toSerialized to SerializedSubject can avoid the above problems

The last

RxJava is considered in design and implementation. Many operators and subjects are not thread-safe. As a developer, it is necessary to comply with The Observable Contract and avoid using it in concurrent environment. Use Observable.serialize() or subject.toserialized () to keep threads safe.

The observables Contract: reactivex. IO/documentati…