Skip to content

Commit c451491

Browse files
committed
Merge pull request ReactiveX#3154 from artem-zinnatullin/defer-for-humans
Add Observable.fromCallable() as a companion for Observable.defer()
2 parents d60f0c0 + 0c8b250 commit c451491

File tree

3 files changed

+201
-0
lines changed

3 files changed

+201
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,6 +1250,29 @@ public final static <T> Observable<T> from(T[] array) {
12501250
return from(Arrays.asList(array));
12511251
}
12521252

1253+
/**
1254+
* Returns an Observable that invokes passed function and emits its result for each new Observer that subscribes.
1255+
* <p>
1256+
* Allows you to defer execution of passed function until Observer subscribes to the Observable.
1257+
* It makes passed function "lazy".
1258+
* Result of the function invocation will be emitted by the Observable.
1259+
* <dl>
1260+
* <dt><b>Scheduler:</b></dt>
1261+
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
1262+
* </dl>
1263+
*
1264+
* @param func
1265+
* function which execution should be deferred, it will be invoked when Observer will subscribe to the Observable
1266+
* @param <T>
1267+
* the type of the item emitted by the Observable
1268+
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
1269+
* @see #defer(Func0)
1270+
*/
1271+
@Experimental
1272+
public static <T> Observable<T> fromCallable(Callable<? extends T> func) {
1273+
return create(new OnSubscribeFromCallable<T>(func));
1274+
}
1275+
12531276
/**
12541277
* Returns an Observable that emits a sequential number every specified interval of time.
12551278
* <p>
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package rx.internal.operators;
2+
3+
import rx.Observable;
4+
import rx.Subscriber;
5+
import rx.exceptions.Exceptions;
6+
import rx.internal.producers.SingleDelayedProducer;
7+
8+
import java.util.concurrent.Callable;
9+
10+
/**
11+
* Do not invoke the function until an Observer subscribes; Invokes function on each
12+
* subscription.
13+
* <p>
14+
* Pass {@code fromCallable} a function, and {@code fromCallable} will call this function to emit result of invocation
15+
* afresh each time a new Observer subscribes.
16+
*/
17+
public final class OnSubscribeFromCallable<T> implements Observable.OnSubscribe<T> {
18+
19+
private final Callable<? extends T> resultFactory;
20+
21+
public OnSubscribeFromCallable(Callable<? extends T> resultFactory) {
22+
this.resultFactory = resultFactory;
23+
}
24+
25+
@Override
26+
public void call(Subscriber<? super T> subscriber) {
27+
final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<T>(subscriber);
28+
29+
subscriber.setProducer(singleDelayedProducer);
30+
31+
try {
32+
singleDelayedProducer.setValue(resultFactory.call());
33+
} catch (Throwable t) {
34+
Exceptions.throwIfFatal(t);
35+
subscriber.onError(t);
36+
}
37+
}
38+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package rx.internal.operators;
2+
3+
import org.junit.Test;
4+
import org.mockito.invocation.InvocationOnMock;
5+
import org.mockito.stubbing.Answer;
6+
import rx.Observable;
7+
import rx.Observer;
8+
import rx.Subscription;
9+
10+
import java.util.concurrent.Callable;
11+
import java.util.concurrent.CountDownLatch;
12+
13+
import static org.mockito.Mockito.*;
14+
import static rx.schedulers.Schedulers.computation;
15+
16+
public class OnSubscribeFromCallableTest {
17+
18+
@SuppressWarnings("unchecked")
19+
@Test
20+
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
21+
Callable<Object> func = mock(Callable.class);
22+
23+
when(func.call()).thenReturn(new Object());
24+
25+
Observable<Object> fromCallableObservable = Observable.fromCallable(func);
26+
27+
verifyZeroInteractions(func);
28+
29+
fromCallableObservable.subscribe();
30+
31+
verify(func).call();
32+
}
33+
34+
@SuppressWarnings("unchecked")
35+
@Test
36+
public void shouldCallOnNextAndOnCompleted() throws Exception {
37+
Callable<String> func = mock(Callable.class);
38+
39+
when(func.call()).thenReturn("test_value");
40+
41+
Observable<String> fromCallableObservable = Observable.fromCallable(func);
42+
43+
Observer<String> observer = mock(Observer.class);
44+
45+
fromCallableObservable.subscribe(observer);
46+
47+
verify(observer).onNext("test_value");
48+
verify(observer).onCompleted();
49+
verify(observer, never()).onError(any(Throwable.class));
50+
}
51+
52+
@SuppressWarnings("unchecked")
53+
@Test
54+
public void shouldCallOnError() throws Exception {
55+
Callable<Object> func = mock(Callable.class);
56+
57+
Throwable throwable = new IllegalStateException("Test exception");
58+
when(func.call()).thenThrow(throwable);
59+
60+
Observable<Object> fromCallableObservable = Observable.fromCallable(func);
61+
62+
Observer<Object> observer = mock(Observer.class);
63+
64+
fromCallableObservable.subscribe(observer);
65+
66+
verify(observer, never()).onNext(anyObject());
67+
verify(observer, never()).onCompleted();
68+
verify(observer).onError(throwable);
69+
}
70+
71+
@SuppressWarnings("unchecked")
72+
@Test
73+
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
74+
Callable<String> func = mock(Callable.class);
75+
76+
final CountDownLatch funcLatch = new CountDownLatch(1);
77+
final CountDownLatch observerLatch = new CountDownLatch(1);
78+
79+
when(func.call()).thenAnswer(new Answer<String>() {
80+
@Override
81+
public String answer(InvocationOnMock invocation) throws Throwable {
82+
observerLatch.countDown();
83+
84+
try {
85+
funcLatch.await();
86+
} catch (InterruptedException e) {
87+
// It's okay, unsubscription causes Thread interruption
88+
89+
// Restoring interruption status of the Thread
90+
Thread.currentThread().interrupt();
91+
}
92+
93+
return "should_not_be_delivered";
94+
}
95+
});
96+
97+
Observable<String> fromCallableObservable = Observable.fromCallable(func);
98+
99+
Observer<String> observer = mock(Observer.class);
100+
101+
Subscription subscription = fromCallableObservable
102+
.subscribeOn(computation())
103+
.subscribe(observer);
104+
105+
// Wait until func will be invoked
106+
observerLatch.await();
107+
108+
// Unsubscribing before emission
109+
subscription.unsubscribe();
110+
111+
// Emitting result
112+
funcLatch.countDown();
113+
114+
// func must be invoked
115+
verify(func).call();
116+
117+
// Observer must not be notified at all
118+
verifyZeroInteractions(observer);
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
@Test
123+
public void shouldAllowToThrowCheckedException() {
124+
final Exception checkedException = new Exception("test exception");
125+
126+
Observable<Object> fromCallableObservable = Observable.fromCallable(new Callable<Object>() {
127+
@Override
128+
public Object call() throws Exception {
129+
throw checkedException;
130+
}
131+
});
132+
133+
Observer<Object> observer = mock(Observer.class);
134+
135+
fromCallableObservable.subscribe(observer);
136+
137+
verify(observer).onError(checkedException);
138+
verifyNoMoreInteractions(observer);
139+
}
140+
}

0 commit comments

Comments
 (0)