Fixed
Status Update
Comments
se...@google.com <se...@google.com> #2
Thanks for filing the issue.
This is a particularly hard device to come by - do you happen to have access to the device? If so could you provide us with the output of: adb shell dumpsys media.camera > info.txt
Thanks!
This is a particularly hard device to come by - do you happen to have access to the device? If so could you provide us with the output of: adb shell dumpsys media.camera > info.txt
Thanks!
Description
Version used: 1.0.0-beta2
Devices/Android versions reproduced on: N/A
The implementation of toPublisher() and fromPublisher() both violate the Reactive-Streams specification.
1) toPublisher()
The returned Publisher has shared state via the mXXX fields that would lead to concurrent use and/or clashing of multiple Subscribers. There one way to implement it without such issues:
static final class LiveDataPublisher<T> implements Publisher<T> {
final LifecycleOwner lifecycle;
final LiveData<T> liveData;
LiveDataPublisher(final LifecycleOwner lifecycle, final LiveData<T> liveData) {
this.lifecycle = lifecycle;
this.liveData = liveData;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new LiveDataSubscription<T>(subscriber, lifecycle, liveData));
}
static final class LiveDataSubscription<T> implements Subscription, Observer<T> {
final Subscriber<? super T> subscriber;
final LifecycleOwner lifecycle;
final LiveData<T> liveData;
volatile boolean mCanceled;
boolean mObserving;
long mRequested;
@Nullable
T mLatest;
LiveDataSubscription(final Subscriber<? super T> subscriber, final LifecycleOwner lifecycle, final LiveData<T> liveData) {
this.subscriber = subscriber;
this.lifecycle = lifecycle;
this.liveData = liveData;
}
@Override
public void onChanged(T t) {
if (mCanceled) {
return;
}
if (mRequested > 0) {
mLatest = null;
subscriber.onNext(t);
if (mRequested != Long.MAX_VALUE) {
mRequested--;
}
} else {
mLatest = t;
}
}
@Override
public void request(final long n) {
if (mCanceled) {
return;
}
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
if (mCanceled) {
return;
}
if (n <= 0L) {
mCanceled = true;
if (mObserving) {
liveData.removeObserver(LiveDataSubscription.this);
mObserving = false;
}
mLatest = null;
subscriber.onError(new IllegalArgumentException("Non-positive request"));
return;
}
// Prevent overflowage.
mRequested = mRequested + n >= mRequested
? mRequested + n : Long.MAX_VALUE;
if (!mObserving) {
mObserving = true;
liveData.observe(lifecycle, LiveDataSubscription.this);
} else if (mLatest != null) {
onChanged(mLatest);
mLatest = null;
}
}
});
}
@Override
public void cancel() {
if (mCanceled) {
return;
}
mCanceled = true;
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
if (mObserving) {
liveData.removeObserver(LiveDataSubscription.this);
mObserving = false;
}
mLatest = null;
}
});
}
}
}
2) fromPublisher()
This implementation can lose the Subscription if the Publisher signals onSubscribe asynchronously while there is an onActive -> onInactive -> onActive call happening on the main thread. This could be fixed via the following implementation:
static final class PublisherLiveData<T> extends LiveData<T> {
final Publisher<T> mPublisher;
final AtomicReference<SubscriberLiveData> mSubscriber;
PublisherLiveData(@NonNull final Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<SubscriberLiveData>();
}
@Override
protected void onActive() {
super.onActive();
SubscriberLiveData s = new SubscriberLiveData();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() {
super.onInactive();
SubscriberLiveData s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancel();
}
}
final class SubscriberLiveData
extends AtomicReference<Subscription>
implements Subscriber<T>, Subscription {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(Throwable ex) {
lazySet(this);
mSubscriber.compareAndSet(this, null);
ex.printStackTrace();
}
@Override
public void onComplete() {
lazySet(this);
mSubscriber.compareAndSet(this, null);
}
@Override
public void cancel() {
Subscription s = getAndSet(this);
if (s != null && s != this) {
s.cancel();
}
}
@Override
public void request(long n) {
// never called
}
}
}