Skip to content

Covariant Support with super/extends and OnSubscribeFunc #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 4, 2013
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
34942ad
making Func0 covariant in its return type, cleaning up a few warnings…
Aug 31, 2013
52342a2
added variance to Func1 (hopefully) everywhere...
Aug 31, 2013
f8cba6a
added variance to Func2, too
Aug 31, 2013
21bc549
added variance to all other Func*; this breaks Scala for good, it see…
Aug 31, 2013
69c6e80
added variance to Action*, too
Aug 31, 2013
0438505
lots of Observer<? super X>
Aug 31, 2013
6b9867c
updated to Scala 2.10.2 again, repaired Scala tests, generalized two …
Aug 31, 2013
6bd2033
UnitTest confirming compilation failure without super/extends and suc…
benjchristensen Aug 31, 2013
98cd711
Need to stay pinned on Scala 2.10.1 still …
benjchristensen Aug 31, 2013
ac6a0a1
Zip: Order of Generics and Artities 5-9
benjchristensen Aug 31, 2013
ebaa616
Merge pull request #1 from benjchristensen/super-extends-additions
Sep 1, 2013
a127b06
adapted RxImplicits tests againt zip to new generics order, renamed t…
Sep 1, 2013
eba4857
generalized everything in Observable that deals with covariance of ob…
Sep 1, 2013
29289d1
Timestamped, Notification and Future are now also treated as covariant
Sep 1, 2013
78a0a1b
added an unnecessary explicit cast because the Jenkins java compiler …
Sep 1, 2013
1ca5900
Generalized all the operators, too
Sep 1, 2013
04f35cd
generalized BlockingObservable and the execution hook further
Sep 1, 2013
e962d00
added a few 'compiler' tests
Sep 1, 2013
68d181b
removed some <? super Throwable>s because that's rather unnecessary
Sep 4, 2013
51dd848
Merged in master so that the gradle pull request build has a chance t…
Sep 4, 2013
94f5fbe
Merge branch 'super-extends' of git://github.com/jmhofer/RxJava into …
benjchristensen Sep 4, 2013
b7de349
Add OnSubscribeFunction and refactor to support it
benjchristensen Sep 4, 2013
a1ad9c4
Adding implicit for OnSubscribeFunc
mattrjacobs Sep 4, 2013
3585570
Change OnSubscribeFunc.call to OnSubscribeFunc.onSubscribe
benjchristensen Sep 4, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ sourceSets {
}

dependencies {
compile 'org.scala-lang:scala-library:2.10.2'
// pinning to 2.10.1 as having issues with 2.10.2
compile 'org.scala-lang:scala-library:2.10.1'

compile project(':rxjava-core')

Expand Down
257 changes: 225 additions & 32 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.Func5;
import rx.util.functions.Func6;
import rx.util.functions.Func7;
import rx.util.functions.Func8;
import rx.util.functions.Func9;
import rx.util.functions.FuncN;
import rx.util.functions.Function;

Expand Down Expand Up @@ -908,18 +913,17 @@ public static <T> Observable<T> from(Future<T> future, long timeout, TimeUnit un
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* @param o2
* another source Observable
* @param reduceFunction
* a function that, when applied to a pair of items, each emitted by one of the two
* source Observables, results in an item that will be emitted by the resulting
* Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Func2<? super T0, ? super T1, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, reduceFunction));
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, zipFunction));
}

/**
Expand Down Expand Up @@ -981,19 +985,19 @@ public static <T> Observable<Boolean> sequenceEqual(Observable<T> first, Observa
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* another source Observable
* @param w2
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param function
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> function) {
return create(OperationZip.zip(w0, w1, w2, function));
public static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, zipFunction));
}

/**
Expand All @@ -1010,21 +1014,210 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Obs
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* another source Observable
* @param w2
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param w3
* @param o4
* a fourth source Observable
* @param reduceFunction
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param o8
* an eighth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param o8
* an eighth source Observable
* @param o9
* a ninth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
}

/**
Expand All @@ -1033,30 +1226,30 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0,
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
* @param w0
* @param o1
* The first source observable.
* @param w1
* @param o2
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
public static <T1, T2, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
public static <T1, T2, T3, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Observable<T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
public static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Observable<T3> o3, Observable<T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction));
}

/**
Expand Down Expand Up @@ -1307,7 +1500,7 @@ public Observable<R> call(List<Observable<?>> wsList) {
*
* @param ws
* A collection of source Observables
* @param reduceFunction
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
Expand Down
Loading