I'm trying to understand how to bridge receiver-based events to the Reactive world. I asked a question on SO and was pointed to a different approach. The author of that answer (David Karnok) is extremely knowledgeable about RxJava, so I trust that his approach is a good one.
My concern is about the order things are done in the last few lines of subscribeActual. I think I've got it right. Here's my reasoning:
If the Disposable is disposed (i.e., the Subscription is canceled) during the call to onSubscribe, it's okay to call onNext with the last known location and potentially receive another location update when the listener is briefly subscribed because of Reactive Streams spec rule 2.8:
A
SubscriberMUST be prepared to receive one or moreonNextsignals after having calledSubscription.cancel(). . .
If the Disposable is disposed during the call to onSubscribe, the LocationListener will be unregistered in disposeActual before it is registered, then unregistered again in subscribeActual. Many Android APIs senselessly throw exceptions instead of simply doing nothing if you try to unregister a listener that isn't registered, but LocationManager isn't one of those. If it were, I could put a try/catch in disposeActual.
Have I omitted anything?
public class GpsLocationObservable extends Observable<Location> {
private final LocationManager mLocationManager;
private final long mMinTime;
private final float mMinDistance;
@RequiresPermission(Manifest.permission.ACCESS_FINE_LOCATION)
public GpsLocationObservable(final LocationManager locationManager, final long minTime, final float minDistance) {
Params.requireNonNull(locationManager);
Params.requireNonNegative(minTime);
Params.requireNonNegative(minDistance);
mLocationManager = locationManager;
mMinTime = minTime;
mMinDistance = minDistance;
}
@Override
protected void subscribeActual(final Observer<? super Location> observer) {
final LocationListener listener = new LocationAdapter() {
@Override
public void onLocationChanged(final Location location) {
observer.onNext(location);
}
};
final Disposable disposable = new AbstractDisposable() {
@Override
void disposeActual() {
mLocationManager.removeUpdates(listener);
}
};
observer.onSubscribe(disposable);
final Location last = mLocationManager.getLastKnownLocation(LocationManager.GPS_PROVIDER);
if(last != null) {
observer.onNext(last);
}
mLocationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, mMinTime, mMinDistance, listener);
if(disposable.isDisposed()) {
mLocationManager.removeUpdates(listener);
}
}
}
And my AbstractDisposable:
abstract public class AbstractDisposable implements Disposable {
private final AtomicBoolean mDisposed = new AtomicBoolean();
@Override
public void dispose() {
if(mDisposed.compareAndSet(false, true)) {
disposeActual();
}
}
@Override
public boolean isDisposed() {
return mDisposed.get();
}
abstract void disposeActual();
}