@@ -269,6 +269,7 @@ private static final class AmbSubscriber<T> extends Subscriber<T> {
269269
270270 private final Subscriber <? super T > subscriber ;
271271 private final Selection <T > selection ;
272+ private boolean chosen ;
272273
273274 private AmbSubscriber (long requested , Subscriber <? super T > subscriber , Selection <T > selection ) {
274275 this .subscriber = subscriber ;
@@ -282,11 +283,11 @@ private final void requestMore(long n) {
282283 }
283284
284285 @ Override
285- public void onNext (T args ) {
286+ public void onNext (T t ) {
286287 if (!isSelected ()) {
287288 return ;
288289 }
289- subscriber .onNext (args );
290+ subscriber .onNext (t );
290291 }
291292
292293 @ Override
@@ -306,12 +307,17 @@ public void onError(Throwable e) {
306307 }
307308
308309 private boolean isSelected () {
310+ if (chosen ) {
311+ return true ;
312+ }
309313 if (selection .choice .get () == this ) {
310314 // fast-path
315+ chosen = true ;
311316 return true ;
312317 } else {
313318 if (selection .choice .compareAndSet (null , this )) {
314319 selection .unsubscribeOthers (this );
320+ chosen = true ;
315321 return true ;
316322 } else {
317323 // we lost so unsubscribe ... and force cleanup again due to possible race conditions
@@ -343,59 +349,97 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343349 }
344350
345351 }
346-
347- private final Iterable <? extends Observable <? extends T >> sources ;
348- private final Selection <T > selection = new Selection <T >();
349-
352+
353+ //give default access instead of private as a micro-optimization
354+ //for access from anonymous classes below
355+ final Iterable <? extends Observable <? extends T >> sources ;
356+ final Selection <T > selection = new Selection <T >();
357+ final AtomicReference <AmbSubscriber <T >> choice = selection .choice ;
358+
350359 private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
351360 this .sources = sources ;
352361 }
353362
354363 @ Override
355364 public void call (final Subscriber <? super T > subscriber ) {
365+
366+ //setup unsubscription of all the subscribers to the sources
356367 subscriber .add (Subscriptions .create (new Action0 () {
357368
358369 @ Override
359370 public void call () {
360- if (selection .choice .get () != null ) {
371+ AmbSubscriber <T > c ;
372+ if ((c = choice .get ()) != null ) {
361373 // there is a single winner so we unsubscribe it
362- selection . choice . get () .unsubscribe ();
374+ c .unsubscribe ();
363375 }
364376 // if we are racing with others still existing, we'll also unsubscribe them
365- if (!selection .ambSubscribers .isEmpty ()) {
366- for (AmbSubscriber <T > other : selection .ambSubscribers ) {
367- other .unsubscribe ();
368- }
369- selection .ambSubscribers .clear ();
370- }
377+ // if subscriptions are occurring as this is happening then this call may not
378+ // unsubscribe everything. We protect ourselves though by doing another unsubscribe check
379+ // after the subscription loop below
380+ unsubscribeAmbSubscribers (selection .ambSubscribers );
371381 }
372-
382+
373383 }));
384+
385+ //need to subscribe to all the sources
386+ for (Observable <? extends T > source : sources ) {
387+ if (subscriber .isUnsubscribed ()) {
388+ break ;
389+ }
390+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(0 , subscriber , selection );
391+ selection .ambSubscribers .add (ambSubscriber );
392+ // check again if choice has been made so can stop subscribing
393+ // if all sources were backpressure aware then this check
394+ // would be pointless given that 0 was requested above from each ambSubscriber
395+ AmbSubscriber <T > c ;
396+ if ((c = choice .get ()) != null ) {
397+ // Already chose one, the rest can be skipped and we can clean up
398+ selection .unsubscribeOthers (c );
399+ return ;
400+ }
401+ source .unsafeSubscribe (ambSubscriber );
402+ }
403+ // while subscribing unsubscription may have occurred so we clean up after
404+ if (subscriber .isUnsubscribed ()) {
405+ unsubscribeAmbSubscribers (selection .ambSubscribers );
406+ }
407+
374408 subscriber .setProducer (new Producer () {
375409
376410 @ Override
377411 public void request (long n ) {
378- if (selection .choice .get () != null ) {
412+ final AmbSubscriber <T > c ;
413+ if ((c = choice .get ()) != null ) {
379414 // propagate the request to that single Subscriber that won
380- selection . choice . get () .requestMore (n );
415+ c .requestMore (n );
381416 } else {
382- for (Observable <? extends T > source : sources ) {
383- if (subscriber .isUnsubscribed ()) {
384- break ;
385- }
386- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387- selection .ambSubscribers .add (ambSubscriber );
388- // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389- if (selection .choice .get () != null ) {
390- // Already chose one, the rest can be skipped and we can clean up
391- selection .unsubscribeOthers (selection .choice .get ());
392- break ;
417+ //propagate the request to all the amb subscribers
418+ for (AmbSubscriber <T > ambSubscriber : selection .ambSubscribers ) {
419+ if (!ambSubscriber .isUnsubscribed ()) {
420+ // make a best endeavours check to not waste requests
421+ // if first emission has already occurred
422+ if (choice .get () == ambSubscriber ) {
423+ ambSubscriber .requestMore (n );
424+ // don't need to request from other subscribers because choice has been made
425+ // and request has gone to choice
426+ return ;
427+ } else {
428+ ambSubscriber .requestMore (n );
429+ }
393430 }
394- source .unsafeSubscribe (ambSubscriber );
395431 }
396432 }
397433 }
398434 });
399435 }
400436
437+ private static <T > void unsubscribeAmbSubscribers (Collection <AmbSubscriber <T >> ambSubscribers ) {
438+ if (!ambSubscribers .isEmpty ()) {
439+ for (AmbSubscriber <T > other : ambSubscribers ) {
440+ other .unsubscribe ();
441+ }
442+ ambSubscribers .clear ();
443+ }
444+ }
401445}
0 commit comments