Skip to content

Commit 99e1ede

Browse files
jstewart148jyemin
authored andcommitted
Filter null values for Observables
JAVA-3382
1 parent 59b94e0 commit 99e1ede

File tree

2 files changed

+7
-32
lines changed

2 files changed

+7
-32
lines changed

driver-async/src/main/com/mongodb/async/client/AbstractSubscription.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
abstract class AbstractSubscription<TResult> implements Subscription {
2828
private static final Logger LOGGER = Loggers.getLogger("client");
29-
private static final Object NULL_PLACEHOLDER = new Object();
3029
private final Observer<? super TResult> observer;
3130

3231
/* protected by `this` */
@@ -37,7 +36,7 @@ abstract class AbstractSubscription<TResult> implements Subscription {
3736
private boolean isTerminated = false;
3837
/* protected by `this` */
3938

40-
private final ConcurrentLinkedQueue<Object> resultsQueue = new ConcurrentLinkedQueue<Object>();
39+
private final ConcurrentLinkedQueue<TResult> resultsQueue = new ConcurrentLinkedQueue<TResult>();
4140

4241
AbstractSubscription(final Observer<? super TResult> observer) {
4342
this.observer = observer;
@@ -110,9 +109,7 @@ synchronized long getRequested() {
110109
}
111110

112111
void addToQueue(@Nullable final TResult result) {
113-
if (result == null) {
114-
resultsQueue.add(NULL_PLACEHOLDER);
115-
} else {
112+
if (result != null) {
116113
resultsQueue.add(result);
117114
}
118115
}
@@ -139,11 +136,8 @@ void onError(final Throwable t) {
139136
}
140137
}
141138

142-
private void onNext(@Nullable final TResult next) {
139+
private void onNext(final TResult next) {
143140
if (!isTerminated()) {
144-
if (next == null) {
145-
throw new NullPointerException();
146-
}
147141
try {
148142
observer.onNext(next);
149143
} catch (Throwable t) {
@@ -216,11 +210,11 @@ private void processResultsQueue() {
216210
processedCount = 0;
217211

218212
while (localWanted > 0) {
219-
Object item = resultsQueue.poll();
213+
TResult item = resultsQueue.poll();
220214
if (item == null) {
221215
break;
222216
} else {
223-
onNext(item == NULL_PLACEHOLDER ? null : (TResult) item);
217+
onNext(item);
224218
localWanted -= 1;
225219
processedCount += 1;
226220
}

driver-async/src/test/unit/com/mongodb/async/client/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
257257
observe(new Block<SingleResultCallback<Void>>(){
258258
@Override
259259
void apply(final SingleResultCallback<Void> callback) {
260-
callback.onResult(1, null)
260+
callback.onResult(null, null)
261261
}
262262
}).subscribe(observer)
263263

@@ -266,7 +266,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
266266

267267
then:
268268
observer.assertNoErrors()
269-
observer.assertReceivedOnNext([1])
269+
observer.assertReceivedOnNext([])
270270
observer.assertTerminalEvent()
271271
}
272272

@@ -289,25 +289,6 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
289289
observer.assertErrored()
290290
}
291291

292-
def 'should call onError on a null result'() {
293-
given:
294-
def observer = new TestObserver()
295-
observe(new Block<SingleResultCallback<Void>>() {
296-
@Override
297-
void apply(final SingleResultCallback<Void> callback) {
298-
callback.onResult(null, null);
299-
}
300-
}).subscribe(observer)
301-
302-
when:
303-
observer.requestMore(1)
304-
305-
then:
306-
observer.assertTerminalEvent()
307-
observer.assertErrored()
308-
observer.getOnErrorEvents().first() instanceof NullPointerException
309-
}
310-
311292
def 'should throw the exception if calling onComplete raises one'() {
312293
given:
313294
def observer = new TestObserver(new Observer(){

0 commit comments

Comments
 (0)