Skip to content

Commit 978825e

Browse files
authored
1.x: optimize collect, reduce and takeLast(1) (ReactiveX#4176)
* 1.x: optimize collect, reduce and takeLast(1) * Fix regression in last and reduce * Forgot that reduce() has to signal error if empty * Update header year.
1 parent 20ef857 commit 978825e

10 files changed

+907
-208
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4737,15 +4737,13 @@ public final <R> Observable<R> cast(final Class<R> klass) {
47374737
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
47384738
*/
47394739
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
4740-
Func2<R, T, R> accumulator = InternalObservableUtils.createCollectorCaller(collector);
4741-
47424740
/*
47434741
* Discussion and confirmation of implementation at
47444742
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
47454743
*
47464744
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
47474745
*/
4748-
return lift(new OperatorScan<R, T>(stateFactory, accumulator)).last();
4746+
return create(new OnSubscribeCollect<T, R>(this, stateFactory, collector));
47494747
}
47504748

47514749
/**
@@ -7899,7 +7897,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
78997897
*
79007898
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
79017899
*/
7902-
return scan(accumulator).last();
7900+
return create(new OnSubscribeReduce<T>(this, accumulator));
79037901
}
79047902

79057903
/**
@@ -7947,7 +7945,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
79477945
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
79487946
*/
79497947
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
7950-
return scan(initialValue, accumulator).takeLast(1);
7948+
return create(new OnSubscribeReduceSeed<T, R>(this, initialValue, accumulator));
79517949
}
79527950

79537951
/**
@@ -10154,7 +10152,7 @@ public final Observable<T> takeLast(final int count) {
1015410152
if (count == 0) {
1015510153
return ignoreElements();
1015610154
} else if (count == 1) {
10157-
return lift(OperatorTakeLastOne.<T>instance());
10155+
return create(new OnSubscribeTakeLastOne<T>(this));
1015810156
} else {
1015910157
return lift(new OperatorTakeLast<T>(count));
1016010158
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import rx.*;
22+
23+
/**
24+
* Base class for Subscribers that consume the entire upstream and signal
25+
* zero or one element (or an error) in a backpressure honoring fashion.
26+
* <p>
27+
* Store any temporary value in {@link #value} and indicate there is
28+
* a value available when completing by setting {@link #hasValue}.
29+
* <p.
30+
* Use {@link #subscribeTo(Observable)} to properly setup the link between this and the downstream
31+
* subscriber.
32+
*
33+
* @param <T> the source value type
34+
* @param <R> the result value type
35+
*/
36+
public abstract class DeferredScalarSubscriber<T, R> extends Subscriber<T> {
37+
38+
/** The downstream subscriber. */
39+
protected final Subscriber<? super R> actual;
40+
41+
/** Indicates there is a value available in value. */
42+
protected boolean hasValue;
43+
44+
/** The holder of the single value. */
45+
protected R value;
46+
47+
/** The state, see the constants below. */
48+
final AtomicInteger state;
49+
50+
/** Initial state. */
51+
static final int NO_REQUEST_NO_VALUE = 0;
52+
/** Request came first. */
53+
static final int HAS_REQUEST_NO_VALUE = 1;
54+
/** Value came first. */
55+
static final int NO_REQUEST_HAS_VALUE = 2;
56+
/** Value will be emitted. */
57+
static final int HAS_REQUEST_HAS_VALUE = 3;
58+
59+
public DeferredScalarSubscriber(Subscriber<? super R> actual) {
60+
this.actual = actual;
61+
this.state = new AtomicInteger();
62+
}
63+
64+
@Override
65+
public void onError(Throwable ex) {
66+
value = null;
67+
actual.onError(ex);
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
if (hasValue) {
73+
complete(value);
74+
} else {
75+
complete();
76+
}
77+
}
78+
79+
/**
80+
* Signals onCompleted() to the downstream subscriber.
81+
*/
82+
protected final void complete() {
83+
actual.onCompleted();
84+
}
85+
86+
/**
87+
* Atomically switches to the terminal state and emits the value if
88+
* there is a request for it or stores it for retrieval by {@link #downstreamRequest(long)}.
89+
* @param value the value to complete with
90+
*/
91+
protected final void complete(R value) {
92+
Subscriber<? super R> a = actual;
93+
for (;;) {
94+
int s = state.get();
95+
96+
if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE || a.isUnsubscribed()) {
97+
return;
98+
}
99+
if (s == HAS_REQUEST_NO_VALUE) {
100+
a.onNext(value);
101+
if (!a.isUnsubscribed()) {
102+
a.onCompleted();
103+
}
104+
state.lazySet(HAS_REQUEST_HAS_VALUE);
105+
return;
106+
}
107+
this.value = value;
108+
if (state.compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
109+
return;
110+
}
111+
}
112+
}
113+
114+
final void downstreamRequest(long n) {
115+
if (n < 0L) {
116+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
117+
}
118+
if (n != 0L) {
119+
Subscriber<? super R> a = actual;
120+
for (;;) {
121+
int s = state.get();
122+
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE || a.isUnsubscribed()) {
123+
return;
124+
}
125+
if (s == NO_REQUEST_HAS_VALUE) {
126+
if (state.compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
127+
a.onNext(value);
128+
if (!a.isUnsubscribed()) {
129+
a.onCompleted();
130+
}
131+
}
132+
return;
133+
}
134+
if (state.compareAndSet(NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) {
135+
return;
136+
}
137+
}
138+
}
139+
}
140+
141+
@Override
142+
public final void setProducer(Producer p) {
143+
p.request(Long.MAX_VALUE);
144+
}
145+
146+
/**
147+
* Links up with the downstream Subscriber (cancellation, backpressure) and
148+
* subscribes to the source Observable.
149+
* @param source the source Observable
150+
*/
151+
public final void subscribeTo(Observable<? extends T> source) {
152+
setupDownstream();
153+
source.unsafeSubscribe(this);
154+
}
155+
156+
/* test */ final void setupDownstream() {
157+
Subscriber<? super R> a = actual;
158+
a.add(this);
159+
a.setProducer(new InnerProducer(this));
160+
}
161+
162+
/**
163+
* Redirects the downstream request amount bach to the DeferredScalarSubscriber.
164+
*/
165+
static final class InnerProducer implements Producer {
166+
final DeferredScalarSubscriber<?, ?> parent;
167+
168+
public InnerProducer(DeferredScalarSubscriber<?, ?> parent) {
169+
this.parent = parent;
170+
}
171+
172+
@Override
173+
public void request(long n) {
174+
parent.downstreamRequest(n);
175+
}
176+
}
177+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.*;
20+
import rx.Observable.OnSubscribe;
21+
import rx.exceptions.Exceptions;
22+
import rx.functions.*;
23+
24+
public final class OnSubscribeCollect<T, R> implements OnSubscribe<R> {
25+
26+
final Observable<T> source;
27+
28+
final Func0<R> collectionFactory;
29+
30+
final Action2<R, ? super T> collector;
31+
32+
public OnSubscribeCollect(Observable<T> source, Func0<R> collectionFactory, Action2<R, ? super T> collector) {
33+
this.source = source;
34+
this.collectionFactory = collectionFactory;
35+
this.collector = collector;
36+
}
37+
38+
@Override
39+
public void call(Subscriber<? super R> t) {
40+
R initialValue;
41+
42+
try {
43+
initialValue = collectionFactory.call();
44+
} catch (Throwable ex) {
45+
Exceptions.throwIfFatal(ex);
46+
t.onError(ex);
47+
return;
48+
}
49+
50+
new CollectSubscriber<T, R>(t, initialValue, collector).subscribeTo(source);
51+
}
52+
53+
static final class CollectSubscriber<T, R> extends DeferredScalarSubscriber<T, R> {
54+
55+
final Action2<R, ? super T> collector;
56+
57+
public CollectSubscriber(Subscriber<? super R> actual, R initialValue, Action2<R, ? super T> collector) {
58+
super(actual);
59+
this.value = initialValue;
60+
this.hasValue = true;
61+
this.collector = collector;
62+
}
63+
64+
@Override
65+
public void onNext(T t) {
66+
try {
67+
collector.call(value, t);
68+
} catch (Throwable ex) {
69+
Exceptions.throwIfFatal(ex);
70+
unsubscribe();
71+
actual.onError(ex);
72+
}
73+
}
74+
75+
}
76+
}

0 commit comments

Comments
 (0)