@@ -1693,7 +1693,11 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
16931693 * {@code source} Observable
16941694 * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16951695 */
1696+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
16961697 public final static <T > Observable <T > merge (Observable <? extends Observable <? extends T >> source ) {
1698+ if (source .getClass () == ScalarSynchronousObservable .class ) {
1699+ return ((ScalarSynchronousObservable <T >)source ).scalarFlatMap ((Func1 )UtilityFunctions .identity ());
1700+ }
16971701 return source .lift (OperatorMerge .<T >instance (false ));
16981702 }
16991703
@@ -1721,8 +1725,13 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
17211725 * if {@code maxConcurrent} is less than or equal to 0
17221726 * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
17231727 */
1728+ @ Experimental
1729+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
17241730 public final static <T > Observable <T > merge (Observable <? extends Observable <? extends T >> source , int maxConcurrent ) {
1725- return source .lift (new OperatorMergeMaxConcurrent <T >(maxConcurrent ));
1731+ if (source .getClass () == ScalarSynchronousObservable .class ) {
1732+ return ((ScalarSynchronousObservable <T >)source ).scalarFlatMap ((Func1 )UtilityFunctions .identity ());
1733+ }
1734+ return source .lift (OperatorMerge .<T >instance (false , maxConcurrent ));
17261735 }
17271736
17281737 /**
@@ -1993,7 +2002,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19932002 public final static <T > Observable <T > merge (Observable <? extends T >[] sequences ) {
19942003 return merge (from (sequences ));
19952004 }
1996-
2005+
2006+ /**
2007+ * Flattens an Array of Observables into one Observable, without any transformation, while limiting the
2008+ * number of concurrent subscriptions to these Observables.
2009+ * <p>
2010+ * <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
2011+ * <p>
2012+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by
2013+ * using the {@code merge} method.
2014+ * <dl>
2015+ * <dt><b>Scheduler:</b></dt>
2016+ * <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
2017+ * </dl>
2018+ *
2019+ * @param sequences
2020+ * the Array of Observables
2021+ * @param maxConcurrent
2022+ * the maximum number of Observables that may be subscribed to concurrently
2023+ * @return an Observable that emits all of the items emitted by the Observables in the Array
2024+ * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2025+ */
2026+ @ Experimental
2027+ public final static <T > Observable <T > merge (Observable <? extends T >[] sequences , int maxConcurrent ) {
2028+ return merge (from (sequences ), maxConcurrent );
2029+ }
19972030 /**
19982031 * Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
19992032 * receive all successfully emitted items from all of the source Observables without being interrupted by
@@ -2021,6 +2054,37 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
20212054 public final static <T > Observable <T > mergeDelayError (Observable <? extends Observable <? extends T >> source ) {
20222055 return source .lift (OperatorMerge .<T >instance (true ));
20232056 }
2057+ /**
2058+ * Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
2059+ * receive all successfully emitted items from all of the source Observables without being interrupted by
2060+ * an error notification from one of them, while limiting the
2061+ * number of concurrent subscriptions to these Observables.
2062+ * <p>
2063+ * This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
2064+ * error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
2065+ * error notification until all of the merged Observables have finished emitting items.
2066+ * <p>
2067+ * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2068+ * <p>
2069+ * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2070+ * invoke the {@code onError} method of its Observers once.
2071+ * <dl>
2072+ * <dt><b>Scheduler:</b></dt>
2073+ * <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2074+ * </dl>
2075+ *
2076+ * @param source
2077+ * an Observable that emits Observables
2078+ * @param maxConcurrent
2079+ * the maximum number of Observables that may be subscribed to concurrently
2080+ * @return an Observable that emits all of the items emitted by the Observables emitted by the
2081+ * {@code source} Observable
2082+ * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2083+ */
2084+ @ Experimental
2085+ public final static <T > Observable <T > mergeDelayError (Observable <? extends Observable <? extends T >> source , int maxConcurrent ) {
2086+ return source .lift (OperatorMerge .<T >instance (true , maxConcurrent ));
2087+ }
20242088
20252089 /**
20262090 * Flattens two Observables into one Observable, in a way that allows an Observer to receive all
@@ -4618,6 +4682,9 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
46184682 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
46194683 */
46204684 public final <R > Observable <R > flatMap (Func1 <? super T , ? extends Observable <? extends R >> func ) {
4685+ if (getClass () == ScalarSynchronousObservable .class ) {
4686+ return ((ScalarSynchronousObservable <T >)this ).scalarFlatMap (func );
4687+ }
46214688 return merge (map (func ));
46224689 }
46234690
@@ -4646,6 +4713,9 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
46464713 */
46474714 @ Beta
46484715 public final <R > Observable <R > flatMap (Func1 <? super T , ? extends Observable <? extends R >> func , int maxConcurrent ) {
4716+ if (getClass () == ScalarSynchronousObservable .class ) {
4717+ return ((ScalarSynchronousObservable <T >)this ).scalarFlatMap (func );
4718+ }
46494719 return merge (map (func ), maxConcurrent );
46504720 }
46514721
0 commit comments