Skip to content

Commit 0a2f070

Browse files
committed
Merge pull request #14 from abersnaze/FixConversions
Fixed scheduler leak and lack of backpressure support
2 parents 7cc5b11 + ad5ad00 commit 0a2f070

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

src/main/java/rx/observable/ListenableFutureObservable.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.Observable;
2222
import rx.Observable.OnSubscribe;
2323
import rx.functions.Action0;
24+
import rx.internal.operators.SingleDelayedProducer;
2425
import rx.Observer;
2526
import rx.Scheduler;
2627
import rx.Scheduler.Worker;
@@ -45,7 +46,11 @@ public void execute(final Runnable command) {
4546
worker.schedule(new Action0() {
4647
@Override
4748
public void call() {
48-
command.run();
49+
try {
50+
command.run();
51+
} finally {
52+
worker.unsubscribe();
53+
}
4954
}
5055
});
5156
}
@@ -63,18 +68,21 @@ public static <T> Observable<T> from(final ListenableFuture<T> future, final Exe
6368
return Observable.create(new OnSubscribe<T>() {
6469
@Override
6570
public void call(final Subscriber<? super T> subscriber) {
71+
final SingleDelayedProducer<T> sdp = new SingleDelayedProducer<T>(subscriber);
72+
subscriber.setProducer(sdp);
73+
6674
future.addListener(new Runnable() {
6775
@Override
6876
public void run() {
6977
try {
7078
T t = future.get();
71-
subscriber.onNext(t);
72-
subscriber.onCompleted();
79+
sdp.set(t);
7380
} catch (Exception e) {
7481
subscriber.onError(e);
7582
}
7683
}
7784
}, executor);
85+
7886
}
7987
});
8088
}

0 commit comments

Comments
 (0)