Skip to content

Commit d855a29

Browse files
davidmotenakarnokd
authored andcommitted
add groupBy overload with evictingMapFactory (ReactiveX#3931)
1 parent c110f69 commit d855a29

File tree

4 files changed

+344
-10
lines changed

4 files changed

+344
-10
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525

2626
testCompile 'junit:junit:4.12'
2727
testCompile 'org.mockito:mockito-core:1.10.19'
28+
testCompile 'com.google.guava:guava:19.0'
2829

2930
perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
3031
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.11.3'

src/main/java/rx/Observable.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6356,6 +6356,65 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
63566356
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
63576357
}
63586358

6359+
/**
6360+
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
6361+
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single
6362+
* {@link Subscriber} during its lifetime and if this {@code Subscriber} unsubscribes before the
6363+
* source terminates, the next emission by the source having the same key will trigger a new
6364+
* {@code GroupedObservable} emission.
6365+
* <p>
6366+
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
6367+
* <p>
6368+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
6369+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
6370+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
6371+
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
6372+
* <dl>
6373+
* <dt><b>Scheduler:</b></dt>
6374+
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
6375+
* </dl>
6376+
*
6377+
* @param keySelector
6378+
* a function that extracts the key for each item
6379+
* @param elementSelector
6380+
* a function that extracts the return element for each item
6381+
* @param evictingMapFactory
6382+
* a function that given an eviction action returns a {@link Map} instance that will be used to assign
6383+
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe
6384+
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously).
6385+
* This can be used to limit the size of the map by evicting keys by maximum size or access time for
6386+
* instance. Here's an example using Guava's {@code CacheBuilder} from v19.0:
6387+
* <pre>
6388+
* {@code
6389+
* Func1<Action1<K>, Map<K, Object>> mapFactory
6390+
* = action -> CacheBuilder.newBuilder()
6391+
* .maximumSize(1000)
6392+
* .expireAfterAccess(12, TimeUnit.HOURS)
6393+
* .removalListener(notification -> action.call(notification.getKey()))
6394+
* .<K, Object> build().asMap();
6395+
* }
6396+
* </pre>
6397+
*
6398+
* @param <K>
6399+
* the key type
6400+
* @param <R>
6401+
* the element type
6402+
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
6403+
* unique key value and each of which emits those items from the source Observable that share that
6404+
* key value
6405+
* @throws NullPointerException
6406+
* if {@code evictingMapFactory} is null
6407+
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
6408+
*/
6409+
@Experimental
6410+
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector,
6411+
final Func1<? super T, ? extends R> elementSelector, final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
6412+
if (evictingMapFactory == null) {
6413+
throw new NullPointerException("evictingMapFactory cannot be null");
6414+
}
6415+
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector, evictingMapFactory));
6416+
}
6417+
63596418
/**
63606419
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
63616420
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121

2222
import rx.*;
2323
import rx.Observable.*;
24+
import rx.exceptions.Exceptions;
2425
import rx.functions.*;
2526
import rx.internal.producers.ProducerArbiter;
2627
import rx.internal.util.*;
2728
import rx.observables.GroupedObservable;
2829
import rx.plugins.RxJavaHooks;
30+
import rx.observers.Subscribers;
2931
import rx.subscriptions.Subscriptions;
3032

3133
/**
@@ -46,35 +48,50 @@ public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservabl
4648
final Func1<? super T, ? extends V> valueSelector;
4749
final int bufferSize;
4850
final boolean delayError;
51+
final Func1<Action1<K>, Map<K, Object>> mapFactory; //nullable
4952

5053
@SuppressWarnings({ "unchecked", "rawtypes" })
5154
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector) {
52-
this(keySelector, (Func1)UtilityFunctions.<T>identity(), RxRingBuffer.SIZE, false);
55+
this(keySelector, (Func1)UtilityFunctions.<T>identity(), RxRingBuffer.SIZE, false, null);
5356
}
5457

5558
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
56-
this(keySelector, valueSelector, RxRingBuffer.SIZE, false);
59+
this(keySelector, valueSelector, RxRingBuffer.SIZE, false, null);
60+
}
61+
62+
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func1<Action1<K>, Map<K, Object>> mapFactory) {
63+
this(keySelector, valueSelector, RxRingBuffer.SIZE, false, mapFactory);
5764
}
5865

59-
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
66+
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError, Func1<Action1<K>, Map<K, Object>> mapFactory) {
6067
this.keySelector = keySelector;
6168
this.valueSelector = valueSelector;
6269
this.bufferSize = bufferSize;
6370
this.delayError = delayError;
71+
this.mapFactory = mapFactory;
6472
}
6573

6674
@Override
67-
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> t) {
68-
final GroupBySubscriber<T, K, V> parent = new GroupBySubscriber<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError);
75+
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> child) {
76+
final GroupBySubscriber<T, K, V> parent;
77+
try {
78+
parent = new GroupBySubscriber<T, K, V>(child, keySelector, valueSelector, bufferSize, delayError, mapFactory);
79+
} catch (Throwable ex) {
80+
//Can reach here because mapFactory.call() may throw in constructor of GroupBySubscriber
81+
Exceptions.throwOrReport(ex, child);
82+
Subscriber<? super T> parent2 = Subscribers.empty();
83+
parent2.unsubscribe();
84+
return parent2;
85+
}
6986

70-
t.add(Subscriptions.create(new Action0() {
87+
child.add(Subscriptions.create(new Action0() {
7188
@Override
7289
public void call() {
7390
parent.cancel();
7491
}
7592
}));
7693

77-
t.setProducer(parent.producer);
94+
child.setProducer(parent.producer);
7895

7996
return parent;
8097
}
@@ -101,6 +118,7 @@ public static final class GroupBySubscriber<T, K, V>
101118
final Map<Object, GroupedUnicast<K, V>> groups;
102119
final Queue<GroupedObservable<K, V>> queue;
103120
final GroupByProducer producer;
121+
final Queue<K> evictedKeys;
104122

105123
static final Object NULL_KEY = new Object();
106124

@@ -117,13 +135,14 @@ public static final class GroupBySubscriber<T, K, V>
117135

118136
final AtomicInteger wip;
119137

120-
public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
138+
public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector,
139+
Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
140+
Func1<Action1<K>, Map<K, Object>> mapFactory) {
121141
this.actual = actual;
122142
this.keySelector = keySelector;
123143
this.valueSelector = valueSelector;
124144
this.bufferSize = bufferSize;
125145
this.delayError = delayError;
126-
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
127146
this.queue = new ConcurrentLinkedQueue<GroupedObservable<K, V>>();
128147
this.s = new ProducerArbiter();
129148
this.s.request(bufferSize);
@@ -132,6 +151,32 @@ public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Fun
132151
this.requested = new AtomicLong();
133152
this.groupCount = new AtomicInteger(1);
134153
this.wip = new AtomicInteger();
154+
if (mapFactory == null) {
155+
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
156+
this.evictedKeys = null;
157+
} else {
158+
this.evictedKeys = new ConcurrentLinkedQueue<K>();
159+
this.groups = createMap(mapFactory, new EvictionAction<K>(evictedKeys));
160+
}
161+
}
162+
163+
static class EvictionAction<K> implements Action1<K> {
164+
165+
final Queue<K> evictedKeys;
166+
167+
EvictionAction(Queue<K> evictedKeys) {
168+
this.evictedKeys = evictedKeys;
169+
}
170+
171+
@Override
172+
public void call(K key) {
173+
evictedKeys.offer(key);
174+
}
175+
}
176+
177+
@SuppressWarnings("unchecked")
178+
private Map<Object, GroupedUnicast<K, V>> createMap(Func1<Action1<K>, Map<K, Object>> mapFactory, Action1<K> evictionAction) {
179+
return (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.call(evictionAction);
135180
}
136181

137182
@Override
@@ -187,6 +232,16 @@ public void onNext(T t) {
187232
}
188233

189234
group.onNext(v);
235+
236+
if (evictedKeys != null) {
237+
K evictedKey;
238+
while ((evictedKey = evictedKeys.poll()) != null) {
239+
GroupedUnicast<K, V> g = groups.get(evictedKey);
240+
if (g != null) {
241+
g.onComplete();
242+
}
243+
}
244+
}
190245

191246
if (notNew) {
192247
s.request(1);
@@ -215,6 +270,9 @@ public void onCompleted() {
215270
e.onComplete();
216271
}
217272
groups.clear();
273+
if (evictedKeys != null) {
274+
evictedKeys.clear();
275+
}
218276

219277
done = true;
220278
groupCount.decrementAndGet();
@@ -306,6 +364,9 @@ void errorAll(Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q, Throwab
306364
q.clear();
307365
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
308366
groups.clear();
367+
if (evictedKeys != null) {
368+
evictedKeys.clear();
369+
}
309370

310371
for (GroupedUnicast<K, V> e : list) {
311372
e.onError(ex);

0 commit comments

Comments
 (0)