Order of events received by subscribers of a Publish Subject

509 views Asked by At

I have a publish subject with multiple subscribers:

Here is the class:

class Real {

    private val publisher: PublishSubject<String> = PublishSubject.create()

    fun doPublish() {
        for (i in 1 until 20) {
            publisher.onNext("$i Hello")
        }
        publisher.onComplete()
    }

    fun doSubscribe() {
        publisher.subscribe {
            println("Subscriber1 $it")
        }

        publisher.subscribe {
            println("Subscriber2 $it")
        }

        publisher.subscribe {
            println("Subscriber3 $it")
        }

    }
}

I call doSubscribe() before I call doPublish() The output is as follows:

 Task :Main.main()
Subscriber1 1 Hello
Subscriber2 1 Hello
Subscriber3 1 Hello
Subscriber1 2 Hello
Subscriber2 2 Hello
Subscriber3 2 Hello
Subscriber1 3 Hello
Subscriber2 3 Hello
Subscriber3 3 Hello
Subscriber1 4 Hello
Subscriber2 4 Hello
Subscriber3 4 Hello
Subscriber1 5 Hello
Subscriber2 5 Hello
Subscriber3 5 Hello
Subscriber1 6 Hello
Subscriber2 6 Hello
Subscriber3 6 Hello
Subscriber1 7 Hello
Subscriber2 7 Hello
Subscriber3 7 Hello
Subscriber1 8 Hello
Subscriber2 8 Hello
Subscriber3 8 Hello
Subscriber1 9 Hello
Subscriber2 9 Hello
Subscriber3 9 Hello
Subscriber1 10 Hello
Subscriber2 10 Hello
Subscriber3 10 Hello
Subscriber1 11 Hello
Subscriber2 11 Hello
Subscriber3 11 Hello
Subscriber1 12 Hello
Subscriber2 12 Hello
Subscriber3 12 Hello
Subscriber1 13 Hello
Subscriber2 13 Hello
Subscriber3 13 Hello
Subscriber1 14 Hello
Subscriber2 14 Hello
Subscriber3 14 Hello
Subscriber1 15 Hello
Subscriber2 15 Hello
Subscriber3 15 Hello
Subscriber1 16 Hello
Subscriber2 16 Hello
Subscriber3 16 Hello
Subscriber1 17 Hello
Subscriber2 17 Hello
Subscriber3 17 Hello
Subscriber1 18 Hello
Subscriber2 18 Hello
Subscriber3 18 Hello
Subscriber1 19 Hello
Subscriber2 19 Hello
Subscriber3 19 Hello

According to above program the first subscriber receives the event first followed by second and third, this is exactly as per the order of subscription.

Is this order of execution guaranteed? As I am not able to find relevant documentation regarding this.

1

There are 1 answers

0
Sergej Isbrecht On

Please have a look at the PublishSubject implementation:

What happens on subscription?

A PublishDisposable is created and added to an array of "subscribers" via add-method (b[n] = ps;)

Now the PublishSubject has an Array of subscribers, which honours the insertion-order

@Override
protected void subscribeActual(Observer<? super T> t) {
    PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
    t.onSubscribe(ps);
    if (add(ps)) {
        // if cancellation happened while a successful add, the remove() didn't work
        // so we need to do it again
        if (ps.isDisposed()) {
            remove(ps);
        }
    } else {
        ...
    }
}

boolean add(PublishDisposable<T> ps) {
    for (;;) {
        PublishDisposable<T>[] a = subscribers.get();
        if (a == TERMINATED) {
            return false;
        }

        int n = a.length;
        @SuppressWarnings("unchecked")
        PublishDisposable<T>[] b = new PublishDisposable[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = ps;

        if (subscribers.compareAndSet(a, b)) {
            return true;
        }
    }
}

The source now emits a new value via onNext. The method onNext shows the invocation of onNext-invocation of the subscribers. The array of subscribers is iterated over from 0...n. Therefore the subscribers are called in insertion-order, because by contract onNext must be called serially.

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. (http://reactivex.io/documentation/contract.html)

@Override
public void onNext(T t) {
    ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
    for (PublishDisposable<T> pd : subscribers.get()) {
        pd.onNext(t);
    }
}