Fixed
Status Update
Comments
se...@google.com <se...@google.com> #2
Thanks for the thorough report - you're right about the problem, and the workaround looks good - it even handles the fast cases for initialization and setList(null) nicely.
Fix and tests submitted internally, should go out with next paging release.
Fix and tests submitted internally, should go out with next paging release.
Description
Version used: 1.0.0-beta2
Devices/Android versions reproduced on: N/A
The implementation of toPublisher() and fromPublisher() both violate the Reactive-Streams specification.
1) toPublisher()
The returned Publisher has shared state via the mXXX fields that would lead to concurrent use and/or clashing of multiple Subscribers. There one way to implement it without such issues:
static final class LiveDataPublisher<T> implements Publisher<T> {
final LifecycleOwner lifecycle;
final LiveData<T> liveData;
LiveDataPublisher(final LifecycleOwner lifecycle, final LiveData<T> liveData) {
this.lifecycle = lifecycle;
this.liveData = liveData;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new LiveDataSubscription<T>(subscriber, lifecycle, liveData));
}
static final class LiveDataSubscription<T> implements Subscription, Observer<T> {
final Subscriber<? super T> subscriber;
final LifecycleOwner lifecycle;
final LiveData<T> liveData;
volatile boolean mCanceled;
boolean mObserving;
long mRequested;
@Nullable
T mLatest;
LiveDataSubscription(final Subscriber<? super T> subscriber, final LifecycleOwner lifecycle, final LiveData<T> liveData) {
this.subscriber = subscriber;
this.lifecycle = lifecycle;
this.liveData = liveData;
}
@Override
public void onChanged(T t) {
if (mCanceled) {
return;
}
if (mRequested > 0) {
mLatest = null;
subscriber.onNext(t);
if (mRequested != Long.MAX_VALUE) {
mRequested--;
}
} else {
mLatest = t;
}
}
@Override
public void request(final long n) {
if (mCanceled) {
return;
}
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
if (mCanceled) {
return;
}
if (n <= 0L) {
mCanceled = true;
if (mObserving) {
liveData.removeObserver(LiveDataSubscription.this);
mObserving = false;
}
mLatest = null;
subscriber.onError(new IllegalArgumentException("Non-positive request"));
return;
}
// Prevent overflowage.
mRequested = mRequested + n >= mRequested
? mRequested + n : Long.MAX_VALUE;
if (!mObserving) {
mObserving = true;
liveData.observe(lifecycle, LiveDataSubscription.this);
} else if (mLatest != null) {
onChanged(mLatest);
mLatest = null;
}
}
});
}
@Override
public void cancel() {
if (mCanceled) {
return;
}
mCanceled = true;
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
if (mObserving) {
liveData.removeObserver(LiveDataSubscription.this);
mObserving = false;
}
mLatest = null;
}
});
}
}
}
2) fromPublisher()
This implementation can lose the Subscription if the Publisher signals onSubscribe asynchronously while there is an onActive -> onInactive -> onActive call happening on the main thread. This could be fixed via the following implementation:
static final class PublisherLiveData<T> extends LiveData<T> {
final Publisher<T> mPublisher;
final AtomicReference<SubscriberLiveData> mSubscriber;
PublisherLiveData(@NonNull final Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<SubscriberLiveData>();
}
@Override
protected void onActive() {
super.onActive();
SubscriberLiveData s = new SubscriberLiveData();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() {
super.onInactive();
SubscriberLiveData s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancel();
}
}
final class SubscriberLiveData
extends AtomicReference<Subscription>
implements Subscriber<T>, Subscription {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(Throwable ex) {
lazySet(this);
mSubscriber.compareAndSet(this, null);
ex.printStackTrace();
}
@Override
public void onComplete() {
lazySet(this);
mSubscriber.compareAndSet(this, null);
}
@Override
public void cancel() {
Subscription s = getAndSet(this);
if (s != null && s != this) {
s.cancel();
}
}
@Override
public void request(long n) {
// never called
}
}
}