Skip to content

Commit 00f4488

Browse files
Get OperationReplay working via OnSubscribeFunc to Action1 bridge
It had not been successfully migrated before … this now passes unit tests. This is the bridge until we port it to the new “bind” model.
1 parent bcf9807 commit 00f4488

File tree

1 file changed

+36
-19
lines changed

1 file changed

+36
-19
lines changed

rxjava-core/src/main/java/rx/operators/OperationReplay.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@
2626
import java.util.concurrent.locks.ReentrantLock;
2727

2828
import rx.Observable;
29+
import rx.Observable.OnSubscribeFunc;
2930
import rx.Observer;
3031
import rx.Operator;
3132
import rx.Scheduler;
3233
import rx.Subscription;
3334
import rx.subjects.Subject;
35+
import rx.subscriptions.Subscriptions;
3436
import rx.util.Timestamped;
3537
import rx.util.functions.Action0;
3638
import rx.util.functions.Action1;
37-
import rx.util.functions.Action2;
3839
import rx.util.functions.Func1;
3940
import rx.util.functions.Functions;
4041

@@ -62,8 +63,16 @@ public static <T> Subject<T, T> replayBuffered(int bufferSize) {
6263
* propagated through the given wrapped subject.
6364
*/
6465
public static <T> Subject<T, T> createScheduledSubject(Subject<T, T> subject, Scheduler scheduler) {
65-
Observable<T> observedOn = subject.observeOn(scheduler);
66-
SubjectWrapper<T> s = new SubjectWrapper<T>(subscriberOf(observedOn), subject);
66+
final Observable<T> observedOn = subject.observeOn(scheduler);
67+
SubjectWrapper<T> s = new SubjectWrapper<T>(new Action1<Operator<? super T>>() {
68+
69+
@Override
70+
public void call(Operator<? super T> o) {
71+
// TODO HACK between OnSubscribeFunc and Action1
72+
subscriberOf(observedOn).onSubscribe(o);
73+
}
74+
75+
}, subject);
6776
return s;
6877
}
6978

@@ -129,19 +138,20 @@ public void call() {
129138
state.onSubscription = state.onValueAdded;
130139

131140
final CustomReplaySubject<T, Timestamped<T>, T> brs = new CustomReplaySubject<T, Timestamped<T>, T>(
132-
new CustomReplaySubjectSubscribeFunc<Timestamped<T>, T>(state), state, timestamp);
141+
new CustomReplaySubjectSubscribeFunc<Timestamped<T>, T>(state), state, timestamp
142+
);
133143

134144
return brs;
135145
}
136146

137147
/**
138148
* Return an OnSubscribeFunc which delegates the subscription to the given observable.
139149
*/
140-
public static <T> Action1<Operator<? super T>> subscriberOf(final Observable<T> target) {
141-
return new Action1<Operator<? super T>>() {
150+
public static <T> OnSubscribeFunc<T> subscriberOf(final Observable<T> target) {
151+
return new OnSubscribeFunc<T>() {
142152
@Override
143-
public void call(Operator<? super T> t1) {
144-
target.subscribe(t1);
153+
public Subscription onSubscribe(Observer<? super T> t1) {
154+
return target.subscribe(t1);
145155
}
146156
};
147157
}
@@ -708,10 +718,17 @@ public static <T> CustomReplaySubject<T, T, T> create(int maxSize) {
708718
protected final Func1<? super TInput, ? extends TIntermediate> intermediateSelector;
709719

710720
private CustomReplaySubject(
711-
Action1<Operator<? super TResult>> onSubscribe,
721+
final Observable.OnSubscribeFunc<TResult> onSubscribe,
712722
ReplayState<TIntermediate, TResult> state,
713723
Func1<? super TInput, ? extends TIntermediate> intermediateSelector) {
714-
super(onSubscribe);
724+
super(new Action1<Operator<? super TResult>>() {
725+
726+
@Override
727+
public void call(Operator<? super TResult> o) {
728+
//TODO hack from OnSubscribeFunc to Action0
729+
onSubscribe.onSubscribe(o);
730+
}
731+
});
715732
this.state = state;
716733
this.intermediateSelector = intermediateSelector;
717734
}
@@ -782,7 +799,7 @@ protected void replayValues() {
782799
* the value type of the observers subscribing to this subject
783800
*/
784801
protected static final class CustomReplaySubjectSubscribeFunc<TIntermediate, TResult>
785-
implements Action1<Operator<? super TResult>> {
802+
implements Observable.OnSubscribeFunc<TResult> {
786803

787804
private final ReplayState<TIntermediate, TResult> state;
788805

@@ -791,14 +808,14 @@ protected CustomReplaySubjectSubscribeFunc(ReplayState<TIntermediate, TResult> s
791808
}
792809

793810
@Override
794-
public void call(Operator<? super TResult> op) {
811+
public Subscription onSubscribe(Observer<? super TResult> t1) {
795812
VirtualList<TIntermediate> values;
796813
Throwable error;
797814
state.lock();
798815
try {
799816
if (!state.done) {
800817
state.onSubscription.call();
801-
op.add(state.addReplayer(op));
818+
return state.addReplayer(t1);
802819
}
803820
values = state.values;
804821
error = state.error;
@@ -808,18 +825,18 @@ public void call(Operator<? super TResult> op) {
808825
// fully replay the subject
809826
for (int i = values.start(); i < values.end(); i++) {
810827
try {
811-
op.onNext(state.resultSelector.call(values.get(i)));
828+
t1.onNext(state.resultSelector.call(values.get(i)));
812829
} catch (Throwable t) {
813-
op.onError(t);
814-
return;
830+
t1.onError(t);
831+
return Subscriptions.empty();
815832
}
816833
}
817834
if (error != null) {
818-
op.onError(error);
835+
t1.onError(error);
819836
} else {
820-
op.onCompleted();
837+
t1.onCompleted();
821838
}
822-
return;
839+
return Subscriptions.empty();
823840
}
824841
}
825842
}

0 commit comments

Comments
 (0)