Skip to content

Commit 5e00f50

Browse files
committed
Merge remote-tracking branch 'origin/2.x' into 2.x
2 parents 9e0d321 + 108247b commit 5e00f50

File tree

16 files changed

+272
-61
lines changed

16 files changed

+272
-61
lines changed

build.gradle

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlin_version = '1.1.2-2'
2+
ext.kotlin_version = '1.1.3'
33
repositories { jcenter() }
44
dependencies {
55
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',
@@ -15,12 +15,14 @@ apply plugin: 'kotlin'
1515
apply plugin: 'java'
1616

1717
dependencies {
18-
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
18+
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
1919
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
2020
testCompile 'org.funktionale:funktionale-partials:1.0.0-final'
2121
testCompile 'junit:junit:4.12'
2222
testCompile 'org.mockito:mockito-core:1.10.19'
23-
examplesCompile 'com.squareup.retrofit:retrofit:1.9.+'
23+
examplesCompile 'com.squareup.retrofit2:retrofit:2.3.0'
24+
examplesCompile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
25+
examplesCompile 'com.squareup.retrofit2:converter-moshi:2.3.0'
2426
}
2527

2628
task wrapper(type: Wrapper) {

src/examples/kotlin/io/reactivex/rxkotlin/examples/examples.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,8 @@ package io.reactivex.rxkotlin.examples
33
import io.reactivex.Observable
44
import io.reactivex.disposables.CompositeDisposable
55
import io.reactivex.rxkotlin.*
6-
import io.reactivex.rxkotlin.combineLatest
7-
import io.reactivex.rxkotlin.subscribeBy
8-
import io.reactivex.rxkotlin.toObservable
9-
import io.reactivex.rxkotlin.zip
106
import java.net.URL
11-
import java.util.Scanner
7+
import java.util.*
128
import java.util.concurrent.TimeUnit
139
import kotlin.concurrent.thread
1410

src/examples/kotlin/io/reactivex/rxkotlin/examples/retrofit/retrofit.kt

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.reactivex.rxkotlin.examples.retrofit
22

33
import io.reactivex.Observable
4-
import retrofit.RestAdapter
5-
import retrofit.http.GET
6-
import retrofit.http.Query
4+
import retrofit2.Retrofit
5+
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory
6+
import retrofit2.converter.moshi.MoshiConverterFactory
7+
import retrofit2.http.GET
8+
import retrofit2.http.Query
79

810
data class SearchResultEntry(val id : String, val latestVersion : String)
911
data class SearchResults(val docs : List<SearchResultEntry>)
@@ -15,14 +17,15 @@ interface MavenSearchService {
1517
}
1618

1719
fun main(args: Array<String>) {
18-
val service = RestAdapter.Builder().
19-
setEndpoint("http://search.maven.org").
20+
val service = Retrofit.Builder().
21+
baseUrl("http://search.maven.org").
22+
addCallAdapterFactory(RxJava2CallAdapterFactory.create()).
23+
addConverterFactory(MoshiConverterFactory.create()).
2024
build().
2125
create(MavenSearchService::class.java)
2226

2327
service.search("rxkotlin").
2428
flatMapIterable { it.response.docs }.
25-
doAfterTerminate { System.exit(0) }. // we need this otherwise Rx's executor service will shutdown a minute after request completion
2629
subscribe { artifact ->
2730
println("${artifact.id} (${artifact.latestVersion})")
2831
}

src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,26 @@ object Flowables {
1111
Flowable.combineLatest(source1, source2,
1212
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
1313

14+
/**
15+
* Emits `Pair<T1,T2>`
16+
*/
17+
fun <T1,T2> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>) =
18+
Flowable.combineLatest(source1, source2,
19+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
20+
21+
1422
inline fun <T1,T2,T3,R> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
1523
Flowable.combineLatest(source1, source2,source3,
1624
Function3{ t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
1725

26+
/**
27+
* Emits `Triple<T1,T2,T3>`
28+
*/
29+
fun <T1,T2,T3> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>) =
30+
Flowable.combineLatest(source1, source2, source3,
31+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
32+
33+
1834
inline fun <T1,T2,T3,T4,R> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>,
1935
source4: Flowable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
2036
Flowable.combineLatest(source1, source2,source3, source4,
@@ -65,10 +81,25 @@ object Flowables {
6581
Flowable.zip(source1, source2,
6682
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
6783

84+
/**
85+
* Emits `Pair<T1,T2>`
86+
*/
87+
fun <T1,T2> zip(source1: Flowable<T1>, source2: Flowable<T2>) =
88+
Flowable.zip(source1, source2,
89+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
90+
91+
6892
inline fun <T1,T2,T3,R> zip(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
6993
Flowable.zip(source1, source2,source3,
7094
Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
7195

96+
/**
97+
* Emits `Triple<T1,T2,T3>`
98+
*/
99+
fun <T1,T2,T3> zip(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>) =
100+
Flowable.zip(source1, source2, source3,
101+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
102+
72103
inline fun <T1,T2,T3,T4,R> zip(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>, source4: Flowable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
73104
Flowable.zip(source1, source2,source3, source4,
74105
Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1,t2, t3, t4) })!!
@@ -119,12 +150,19 @@ object Flowables {
119150
inline fun <T, U, R> Flowable<T>.withLatestFrom(other: Publisher<U>, crossinline combiner: (T, U) -> R): Flowable<R>
120151
= withLatestFrom(other, BiFunction<T, U, R> { t, u -> combiner.invoke(t, u) })
121152

153+
inline fun <T, U> Flowable<T>.withLatestFrom(other: Publisher<U>): Flowable<Pair<T, U>>
154+
= withLatestFrom(other, BiFunction{ t, u -> Pair(t,u) })
155+
156+
122157
/**
123158
* An alias to [Flowable.withLatestFrom], but allowing for cleaner lambda syntax.
124159
*/
125160
inline fun <T, T1, T2, R> Flowable<T>.withLatestFrom(o1: Publisher<T1>, o2: Publisher<T2>, crossinline combiner: (T, T1, T2) -> R): Flowable<R>
126161
= withLatestFrom(o1, o2, Function3 { t, t1, t2 -> combiner.invoke(t, t1, t2) })
127162

163+
inline fun <T, T1, T2> Flowable<T>.withLatestFrom(o1: Publisher<T1>, o2: Publisher<T2>): Publisher<Triple<T,T1,T2>>
164+
= withLatestFrom(o1, o2, Function3 { t, t1, t2 -> Triple(t, t1, t2) })
165+
128166
/**
129167
* An alias to [Flowable.withLatestFrom], but allowing for cleaner lambda syntax.
130168
*/
@@ -142,3 +180,9 @@ inline fun <T, T1, T2, T3, T4, R> Flowable<T>.withLatestFrom(o1: Publisher<T1>,
142180
*/
143181
inline fun <T, U, R> Flowable<T>.zipWith(other: Publisher<U>, crossinline zipper: (T, U) -> R): Flowable<R>
144182
= zipWith(other, BiFunction { t, u -> zipper.invoke(t, u) })
183+
184+
/**
185+
* Emits a zipped `Pair`
186+
*/
187+
inline fun <T, U> Flowable<T>.zipWith(other: Publisher<U>): Flowable<Pair<T, U>>
188+
= zipWith(other, BiFunction { t, u -> Pair(t,u) })

src/main/kotlin/io/reactivex/rxkotlin/Observables.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,24 @@ object Observables {
1313
Observable.combineLatest(source1, source2,
1414
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
1515

16+
/**
17+
* Emits `Pair<T1,T2>`
18+
*/
19+
fun <T1,T2> combineLatest(source1: Observable<T1>, source2: Observable<T2>) =
20+
Observable.combineLatest(source1, source2,
21+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
22+
1623
inline fun <T1,T2,T3,R> combineLatest(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
1724
Observable.combineLatest(source1, source2,source3,
1825
Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
1926

27+
/**
28+
* Emits `Triple<T1,T2,T3>`
29+
*/
30+
fun <T1,T2,T3> combineLatest(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>) =
31+
Observable.combineLatest(source1, source2, source3,
32+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
33+
2034
inline fun <T1,T2,T3,T4,R> combineLatest(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>,
2135
source4: Observable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
2236
Observable.combineLatest(source1, source2,source3, source4,
@@ -67,10 +81,25 @@ object Observables {
6781
Observable.zip(source1, source2,
6882
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
6983

84+
85+
/**
86+
* Emits `Pair<T1,T2>`
87+
*/
88+
fun <T1,T2> zip(source1: Observable<T1>, source2: Observable<T2>) =
89+
Observable.zip(source1, source2,
90+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
91+
7092
inline fun <T1,T2,T3,R> zip(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
7193
Observable.zip(source1, source2,source3,
7294
Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
7395

96+
/**
97+
* Emits `Triple<T1,T2,T3>`
98+
*/
99+
fun <T1,T2,T3> zip(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>) =
100+
Observable.zip(source1, source2, source3,
101+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
102+
74103
inline fun <T1,T2,T3,T4,R> zip(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>, source4: Observable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
75104
Observable.zip(source1, source2,source3, source4,
76105
Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1,t2, t3, t4) })!!
@@ -122,12 +151,21 @@ object Observables {
122151
inline fun <T, U, R> Observable<T>.withLatestFrom(other: ObservableSource<U>, crossinline combiner: (T, U) -> R): Observable<R>
123152
= withLatestFrom(other, BiFunction<T, U, R> { t, u -> combiner.invoke(t, u) })
124153

154+
/**
155+
* Emits a `Pair`
156+
*/
157+
inline fun <T, U, R> Observable<T>.withLatestFrom(other: ObservableSource<U>): Observable<Pair<T,U>>
158+
= withLatestFrom(other, BiFunction{ t, u -> Pair(t,u) })
159+
125160
/**
126161
* An alias to [Observable.withLatestFrom], but allowing for cleaner lambda syntax.
127162
*/
128163
inline fun <T, T1, T2, R> Observable<T>.withLatestFrom(o1: ObservableSource<T1>, o2: ObservableSource<T2>, crossinline combiner: (T, T1, T2) -> R): Observable<R>
129164
= withLatestFrom(o1, o2, Function3<T, T1, T2, R> { t, t1, t2 -> combiner.invoke(t, t1, t2) })
130165

166+
inline fun <T, T1, T2> Observable<T>.withLatestFrom(o1: ObservableSource<T1>, o2: ObservableSource<T2>): Observable<Triple<T,T1,T2>>
167+
= withLatestFrom(o1, o2, Function3 { t, t1, t2 -> Triple(t, t1, t2) })
168+
131169
/**
132170
* An alias to [Observable.withLatestFrom], but allowing for cleaner lambda syntax.
133171
*/
@@ -146,3 +184,8 @@ inline fun <T, T1, T2, T3, T4, R> Observable<T>.withLatestFrom(o1: ObservableSou
146184
inline fun <T, U, R> Observable<T>.zipWith(other: ObservableSource<U>, crossinline zipper: (T, U) -> R): Observable<R>
147185
= zipWith(other, BiFunction { t, u -> zipper.invoke(t, u) })
148186

187+
/**
188+
* Emits a zipped `Pair`
189+
*/
190+
inline fun <T, U> Observable<T>.zipWith(other: ObservableSource<U>): Observable<Pair<T,U>>
191+
= zipWith(other, BiFunction { t, u -> Pair(t,u) })

src/main/kotlin/io/reactivex/rxkotlin/Singles.kt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,7 @@ package io.reactivex.rxkotlin
22

33
import io.reactivex.Single
44
import io.reactivex.SingleSource
5-
import io.reactivex.functions.BiFunction
6-
import io.reactivex.functions.Function3
7-
import io.reactivex.functions.Function4
8-
import io.reactivex.functions.Function5
9-
import io.reactivex.functions.Function6
10-
import io.reactivex.functions.Function7
11-
import io.reactivex.functions.Function8
12-
import io.reactivex.functions.Function9
5+
import io.reactivex.functions.*
136

147

158
object Singles {

src/main/kotlin/io/reactivex/rxkotlin/flowable.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,14 @@ fun <T : Any> Flowable<Flowable<T>>.switchLatest() = switchMap { it }
9898

9999

100100
fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)
101+
102+
103+
/**
104+
* Collects `Pair` emission into a `Map`
105+
*/
106+
fun <A: Any, B: Any> Flowable<Pair<A, B>>.toMap() = toMap({it.first},{it.second})
107+
108+
/**
109+
* Collects `Pair` emission into a multimap
110+
*/
111+
fun <A: Any, B: Any> Flowable<Pair<A, B>>.toMultimap() = toMultimap({it.first},{it.second})

src/main/kotlin/io/reactivex/rxkotlin/maybe.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.reactivex.Observable
66
import java.util.concurrent.Callable
77
import java.util.concurrent.Future
88

9+
@Deprecated("This will be removed in a future release due to API confusion")
910
fun <T : Any> T?.toMaybe(): Maybe<T> = Maybe.create { s -> if (this != null) s.onSuccess(this); s.onComplete() }
1011
fun <T : Any> Future<T>.toMaybe(): Maybe<T> = Maybe.fromFuture(this)
1112
fun <T : Any> Callable<T>.toMaybe(): Maybe<T> = Maybe.fromCallable(this)

src/main/kotlin/io/reactivex/rxkotlin/observable.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,12 @@ private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
6565
override fun iterator(): Iterator<T> = this@toIterable
6666
}
6767

68-
/**
69-
* Combine latest operator that produces [Pair]
70-
*/
68+
@Deprecated("Use `Observables.combineLatest() factory")
7169
fun <T : Any, R : Any> Observable<T>.combineLatest(observable: Observable<R>): Observable<Pair<T, R>>
7270
= Observable.combineLatest(this, observable, BiFunction(::Pair))
7371

74-
/**
75-
* Combine latest operator that produces [Triple]
76-
*/
72+
73+
@Deprecated("Use `Observables.combineLatest() factory")
7774
fun <T : Any, R : Any, U : Any> Observable<T>.combineLatest(observable1: Observable<R>, observable2: Observable<U>): Observable<Triple<T, R, U>>
7875
= Observable.combineLatest(this, observable1, observable2, Function3(::Triple))
7976

@@ -96,3 +93,12 @@ fun <T : Any> Observable<Observable<T>>.switchLatest() = switchMap { it }
9693

9794
fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)
9895

96+
/**
97+
* Collects `Pair` emission into a `Map`
98+
*/
99+
fun <A: Any, B: Any> Observable<Pair<A,B>>.toMap() = toMap({it.first},{it.second})
100+
101+
/**
102+
* Collects `Pair` emission into a multimap
103+
*/
104+
fun <A: Any, B: Any> Observable<Pair<A,B>>.toMultimap() = toMultimap({it.first},{it.second})

src/main/kotlin/io/reactivex/rxkotlin/single.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.reactivex.Single
66
import java.util.concurrent.Callable
77
import java.util.concurrent.Future
88

9+
@Deprecated("This will be removed in a future release due to API confusion, use Single.just()")
910
fun <T : Any> T.toSingle(): Single<T> = Single.just(this)
1011
fun <T : Any> Future<T>.toSingle(): Single<T> = Single.fromFuture(this)
1112
fun <T : Any> Callable<T>.toSingle(): Single<T> = Single.fromCallable(this)

src/main/kotlin/io/reactivex/rxkotlin/subscribers.kt

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,36 +12,36 @@ private val onCompleteStub: () -> Unit = {}
1212
* Overloaded subscribe function that allows passing named parameters
1313
*/
1414
fun <T : Any> Observable<T>.subscribeBy(
15-
onNext: (T) -> Unit = onNextStub,
1615
onError: (Throwable) -> Unit = onErrorStub,
17-
onComplete: () -> Unit = onCompleteStub
18-
): Disposable = subscribe(onNext, onError, onComplete)
16+
onComplete: () -> Unit = onCompleteStub,
17+
onNext: (T) -> Unit = onNextStub
18+
): Disposable = subscribe(onNext, onError, onComplete)
1919

2020
/**
2121
* Overloaded subscribe function that allows passing named parameters
2222
*/
2323
fun <T : Any> Flowable<T>.subscribeBy(
24-
onNext: (T) -> Unit = onNextStub,
2524
onError: (Throwable) -> Unit = onErrorStub,
26-
onComplete: () -> Unit = onCompleteStub
27-
): Disposable = subscribe(onNext, onError, onComplete)
25+
onComplete: () -> Unit = onCompleteStub,
26+
onNext: (T) -> Unit = onNextStub
27+
): Disposable = subscribe(onNext, onError, onComplete)
2828

2929
/**
3030
* Overloaded subscribe function that allows passing named parameters
3131
*/
3232
fun <T : Any> Single<T>.subscribeBy(
33-
onSuccess: (T) -> Unit = onNextStub,
34-
onError: (Throwable) -> Unit = onErrorStub
35-
): Disposable = subscribe(onSuccess, onError)
33+
onError: (Throwable) -> Unit = onErrorStub,
34+
onSuccess: (T) -> Unit = onNextStub
35+
): Disposable = subscribe(onSuccess, onError)
3636

3737
/**
3838
* Overloaded subscribe function that allows passing named parameters
3939
*/
4040
fun <T : Any> Maybe<T>.subscribeBy(
41-
onSuccess: (T) -> Unit = onNextStub,
4241
onError: (Throwable) -> Unit = onErrorStub,
43-
onComplete: () -> Unit = onCompleteStub
44-
): Disposable = subscribe(onSuccess, onError, onComplete)
42+
onComplete: () -> Unit = onCompleteStub,
43+
onSuccess: (T) -> Unit = onNextStub
44+
): Disposable = subscribe(onSuccess, onError, onComplete)
4545

4646
/**
4747
* Overloaded subscribe function that allows passing named parameters
@@ -55,18 +55,18 @@ fun Completable.subscribeBy(
5555
* Overloaded blockingSubscribe function that allows passing named parameters
5656
*/
5757
fun <T : Any> Observable<T>.blockingSubscribeBy(
58-
onNext: (T) -> Unit = onNextStub,
5958
onError: (Throwable) -> Unit = onErrorStub,
60-
onComplete: () -> Unit = onCompleteStub
61-
) = blockingSubscribe(onNext, onError, onComplete)
59+
onComplete: () -> Unit = onCompleteStub,
60+
onNext: (T) -> Unit = onNextStub
61+
) = blockingSubscribe(onNext, onError, onComplete)
6262

6363
/**
6464
* Overloaded blockingSubscribe function that allows passing named parameters
6565
*/
6666
fun <T : Any> Flowable<T>.blockingSubscribeBy(
67-
onNext: (T) -> Unit = onNextStub,
6867
onError: (Throwable) -> Unit = onErrorStub,
69-
onComplete: () -> Unit = onCompleteStub
70-
) = blockingSubscribe(onNext, onError, onComplete)
68+
onComplete: () -> Unit = onCompleteStub,
69+
onNext: (T) -> Unit = onNextStub
70+
) = blockingSubscribe(onNext, onError, onComplete)
7171

7272
class OnErrorNotImplementedException(e: Throwable) : RuntimeException(e)

0 commit comments

Comments
 (0)