Skip to content

Commit 0bb6666

Browse files
Operator Class
- forgot to add earlier
1 parent c601c1a commit 0bb6666

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package rx;
2+
3+
import rx.subscriptions.CompositeSubscription;
4+
5+
public abstract class Operator<T> implements Observer<T>, Subscription {
6+
7+
private final CompositeSubscription cs;
8+
9+
// TODO I'm questioning this API, it could be confusing and misused
10+
protected Operator(Operator<?> op) {
11+
this.cs = op.cs;
12+
}
13+
14+
protected Operator(CompositeSubscription cs) {
15+
this.cs = cs;
16+
}
17+
18+
public static <T> Operator<T> create(final Observer<? super T> o, CompositeSubscription cs) {
19+
if (o == null) {
20+
throw new IllegalArgumentException("Observer can not be null");
21+
}
22+
if (cs == null) {
23+
throw new IllegalArgumentException("CompositeSubscription can not be null");
24+
}
25+
return new Operator<T>(cs) {
26+
27+
@Override
28+
public void onCompleted() {
29+
o.onCompleted();
30+
}
31+
32+
@Override
33+
public void onError(Throwable e) {
34+
o.onError(e);
35+
}
36+
37+
@Override
38+
public void onNext(T v) {
39+
o.onNext(v);
40+
}
41+
42+
};
43+
}
44+
45+
public static <T> Operator<T> create(final Observer<? super T> o, Subscription s) {
46+
if (s == null) {
47+
throw new IllegalArgumentException("Subscription can not be null");
48+
}
49+
CompositeSubscription cs = new CompositeSubscription();
50+
cs.add(s);
51+
52+
return create(o, cs);
53+
}
54+
55+
/**
56+
* Used to register an unsubscribe callback.
57+
*/
58+
public final void add(Subscription s) {
59+
cs.add(s);
60+
}
61+
62+
@Override
63+
public final void unsubscribe() {
64+
cs.unsubscribe();
65+
}
66+
67+
public final boolean isUnsubscribed() {
68+
return cs.isUnsubscribed();
69+
}
70+
}

0 commit comments

Comments
 (0)