Skip to content

Commit 6d52b65

Browse files
committed
implement zip() and combineLatest() emitting Pair/Triple, deprecate toSingle() and toMaybe(), deprecate combineLatest() operaotrs
1 parent e0bff13 commit 6d52b65

File tree

11 files changed

+127
-41
lines changed

11 files changed

+127
-41
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlin_version = '1.1.2-2'
2+
ext.kotlin_version = '1.1.3'
33
repositories { jcenter() }
44
dependencies {
55
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',

src/examples/kotlin/io/reactivex/rxkotlin/examples/examples.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,8 @@ package io.reactivex.rxkotlin.examples
33
import io.reactivex.Observable
44
import io.reactivex.disposables.CompositeDisposable
55
import io.reactivex.rxkotlin.*
6-
import io.reactivex.rxkotlin.combineLatest
7-
import io.reactivex.rxkotlin.subscribeBy
8-
import io.reactivex.rxkotlin.toObservable
9-
import io.reactivex.rxkotlin.zip
106
import java.net.URL
11-
import java.util.Scanner
7+
import java.util.*
128
import java.util.concurrent.TimeUnit
139
import kotlin.concurrent.thread
1410

src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,26 @@ object Flowables {
1111
Flowable.combineLatest(source1, source2,
1212
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
1313

14+
/**
15+
* Emits `Pair<T1,T2>`
16+
*/
17+
fun <T1,T2> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>) =
18+
Flowable.combineLatest(source1, source2,
19+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
20+
21+
1422
inline fun <T1,T2,T3,R> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
1523
Flowable.combineLatest(source1, source2,source3,
1624
Function3{ t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
1725

26+
/**
27+
* Emits `Triple<T1,T2,T3>`
28+
*/
29+
fun <T1,T2,T3> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>) =
30+
Flowable.combineLatest(source1, source2, source3,
31+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
32+
33+
1834
inline fun <T1,T2,T3,T4,R> combineLatest(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>,
1935
source4: Flowable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
2036
Flowable.combineLatest(source1, source2,source3, source4,
@@ -65,10 +81,25 @@ object Flowables {
6581
Flowable.zip(source1, source2,
6682
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
6783

84+
/**
85+
* Emits `Pair<T1,T2>`
86+
*/
87+
fun <T1,T2> zip(source1: Flowable<T1>, source2: Flowable<T2>) =
88+
Flowable.zip(source1, source2,
89+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
90+
91+
6892
inline fun <T1,T2,T3,R> zip(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
6993
Flowable.zip(source1, source2,source3,
7094
Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
7195

96+
/**
97+
* Emits `Triple<T1,T2,T3>`
98+
*/
99+
fun <T1,T2,T3> zip(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>) =
100+
Flowable.zip(source1, source2, source3,
101+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
102+
72103
inline fun <T1,T2,T3,T4,R> zip(source1: Flowable<T1>, source2: Flowable<T2>, source3: Flowable<T3>, source4: Flowable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
73104
Flowable.zip(source1, source2,source3, source4,
74105
Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1,t2, t3, t4) })!!

src/main/kotlin/io/reactivex/rxkotlin/Observables.kt

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,24 @@ object Observables {
1313
Observable.combineLatest(source1, source2,
1414
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
1515

16+
/**
17+
* Emits `Pair<T1,T2>`
18+
*/
19+
fun <T1,T2> combineLatest(source1: Observable<T1>, source2: Observable<T2>) =
20+
Observable.combineLatest(source1, source2,
21+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
22+
1623
inline fun <T1,T2,T3,R> combineLatest(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
1724
Observable.combineLatest(source1, source2,source3,
1825
Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
1926

27+
/**
28+
* Emits `Triple<T1,T2,T3>`
29+
*/
30+
fun <T1,T2,T3> combineLatest(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>) =
31+
Observable.combineLatest(source1, source2, source3,
32+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
33+
2034
inline fun <T1,T2,T3,T4,R> combineLatest(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>,
2135
source4: Observable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
2236
Observable.combineLatest(source1, source2,source3, source4,
@@ -67,10 +81,25 @@ object Observables {
6781
Observable.zip(source1, source2,
6882
BiFunction<T1, T2, R> { t1, t2 -> combineFunction(t1,t2) })!!
6983

84+
85+
/**
86+
* Emits `Pair<T1,T2>`
87+
*/
88+
fun <T1,T2> zip(source1: Observable<T1>, source2: Observable<T2>) =
89+
Observable.zip(source1, source2,
90+
BiFunction<T1, T2, Pair<T1,T2>> { t1, t2 -> t1 to t2 })!!
91+
7092
inline fun <T1,T2,T3,R> zip(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>, crossinline combineFunction: (T1,T2, T3) -> R) =
7193
Observable.zip(source1, source2,source3,
7294
Function3 { t1: T1, t2: T2, t3: T3 -> combineFunction(t1,t2, t3) })!!
7395

96+
/**
97+
* Emits `Triple<T1,T2,T3>`
98+
*/
99+
fun <T1,T2,T3> zip(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>) =
100+
Observable.zip(source1, source2, source3,
101+
Function3<T1, T2, T3, Triple<T1,T2,T3>> { t1, t2, t3 -> Triple(t1,t2,t3) })!!
102+
74103
inline fun <T1,T2,T3,T4,R> zip(source1: Observable<T1>, source2: Observable<T2>, source3: Observable<T3>, source4: Observable<T4>, crossinline combineFunction: (T1,T2, T3, T4) -> R) =
75104
Observable.zip(source1, source2,source3, source4,
76105
Function4 { t1: T1, t2: T2, t3: T3, t4: T4 -> combineFunction(t1,t2, t3, t4) })!!

src/main/kotlin/io/reactivex/rxkotlin/Singles.kt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,7 @@ package io.reactivex.rxkotlin
22

33
import io.reactivex.Single
44
import io.reactivex.SingleSource
5-
import io.reactivex.functions.BiFunction
6-
import io.reactivex.functions.Function3
7-
import io.reactivex.functions.Function4
8-
import io.reactivex.functions.Function5
9-
import io.reactivex.functions.Function6
10-
import io.reactivex.functions.Function7
11-
import io.reactivex.functions.Function8
12-
import io.reactivex.functions.Function9
5+
import io.reactivex.functions.*
136

147

158
object Singles {

src/main/kotlin/io/reactivex/rxkotlin/maybe.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.reactivex.Observable
66
import java.util.concurrent.Callable
77
import java.util.concurrent.Future
88

9+
@Deprecated("This will be removed in a future release due to API confusion")
910
fun <T : Any> T?.toMaybe(): Maybe<T> = Maybe.create { s -> if (this != null) s.onSuccess(this); s.onComplete() }
1011
fun <T : Any> Future<T>.toMaybe(): Maybe<T> = Maybe.fromFuture(this)
1112
fun <T : Any> Callable<T>.toMaybe(): Maybe<T> = Maybe.fromCallable(this)

src/main/kotlin/io/reactivex/rxkotlin/observable.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,12 @@ private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
6565
override fun iterator(): Iterator<T> = this@toIterable
6666
}
6767

68-
/**
69-
* Combine latest operator that produces [Pair]
70-
*/
68+
@Deprecated("Use `Observables.combineLatest() factory")
7169
fun <T : Any, R : Any> Observable<T>.combineLatest(observable: Observable<R>): Observable<Pair<T, R>>
7270
= Observable.combineLatest(this, observable, BiFunction(::Pair))
7371

74-
/**
75-
* Combine latest operator that produces [Triple]
76-
*/
72+
73+
@Deprecated("Use `Observables.combineLatest() factory")
7774
fun <T : Any, R : Any, U : Any> Observable<T>.combineLatest(observable1: Observable<R>, observable2: Observable<U>): Observable<Triple<T, R, U>>
7875
= Observable.combineLatest(this, observable1, observable2, Function3(::Triple))
7976

src/main/kotlin/io/reactivex/rxkotlin/single.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.reactivex.Single
66
import java.util.concurrent.Callable
77
import java.util.concurrent.Future
88

9+
@Deprecated("This will be removed in a future release due to API confusion, use Single.just()")
910
fun <T : Any> T.toSingle(): Single<T> = Single.just(this)
1011
fun <T : Any> Future<T>.toSingle(): Single<T> = Single.fromFuture(this)
1112
fun <T : Any> Callable<T>.toSingle(): Single<T> = Single.fromCallable(this)

src/test/kotlin/io/reactivex/rxkotlin/ExtensionTests.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package io.reactivex.rxkotlin
1919
import io.reactivex.Notification
2020
import io.reactivex.Observable
2121
import io.reactivex.ObservableEmitter
22-
import io.reactivex.functions.BiFunction
2322
import io.reactivex.functions.Function3
2423
import io.reactivex.schedulers.TestScheduler
2524
import org.funktionale.partials.invoke

src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package io.reactivex.rxkotlin
22

3-
import io.reactivex.*
3+
import io.reactivex.BackpressureStrategy
4+
import io.reactivex.Flowable
45
import io.reactivex.Flowable.create
6+
import io.reactivex.FlowableEmitter
7+
import io.reactivex.subscribers.TestSubscriber
58
import org.junit.Assert
69
import org.junit.Assert.assertEquals
710
import org.junit.Assert.assertNotNull
811
import org.junit.Ignore
912
import org.junit.Test
10-
import org.mockito.Mockito
1113
import java.util.concurrent.atomic.AtomicInteger
1214
import java.util.concurrent.atomic.AtomicReference
1315

@@ -171,4 +173,30 @@ class FlowableTest {
171173
}
172174
Assert.assertTrue(first.get() == "Alpha")
173175
}
176+
@Test
177+
fun testPairZip() {
178+
179+
val testSubscriber = TestSubscriber<Pair<String,Int>>()
180+
181+
Flowables.zip(
182+
Flowable.just("Alpha", "Beta", "Gamma"),
183+
Flowable.range(1,4)
184+
).subscribe(testSubscriber)
185+
186+
testSubscriber.assertValues(Pair("Alpha",1), Pair("Beta",2), Pair("Gamma",3))
187+
}
188+
189+
@Test
190+
fun testTripleZip() {
191+
192+
val testSubscriber = TestSubscriber<Triple<String,Int,Int>>()
193+
194+
Flowables.zip(
195+
Flowable.just("Alpha", "Beta", "Gamma"),
196+
Flowable.range(1,4),
197+
Flowable.just(100,200,300)
198+
).subscribe(testSubscriber)
199+
200+
testSubscriber.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300))
201+
}
174202
}

src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
4-
import io.reactivex.Single
4+
import io.reactivex.observers.TestObserver
55
import org.junit.Assert.*
66
import org.junit.Ignore
77
import org.junit.Test
8-
import org.mockito.Mockito
98
import java.math.BigDecimal
109
import java.util.concurrent.atomic.AtomicInteger
1110
import java.util.concurrent.atomic.AtomicReference
@@ -175,22 +174,6 @@ class ObservableTest {
175174
.assertComplete()
176175
}
177176

178-
@Test fun combineLatestPair() {
179-
Observable.just(3)
180-
.combineLatest(Observable.just(10))
181-
.map { (x, y) -> x * y }
182-
.test()
183-
.assertValues(30)
184-
}
185-
186-
@Test fun combineLatestTriple() {
187-
Observable.just(3)
188-
.combineLatest(Observable.just(10), Observable.just(20))
189-
.map { (x, y, z) -> x * y * z }
190-
.test()
191-
.assertValues(600)
192-
}
193-
194177
@Test
195178
fun testSubscribeBy() {
196179
val first = AtomicReference<String>()
@@ -212,4 +195,32 @@ class ObservableTest {
212195
}
213196
assertTrue(first.get() == "Alpha")
214197
}
198+
199+
@Test
200+
fun testPairZip() {
201+
202+
val testObserver = TestObserver<Pair<String,Int>>()
203+
204+
Observables.zip(
205+
Observable.just("Alpha", "Beta", "Gamma"),
206+
Observable.range(1,4)
207+
).subscribe(testObserver)
208+
209+
testObserver.assertValues(Pair("Alpha",1), Pair("Beta",2), Pair("Gamma",3))
210+
}
211+
212+
@Test
213+
fun testTripleZip() {
214+
215+
val testObserver = TestObserver<Triple<String,Int,Int>>()
216+
217+
Observables.zip(
218+
Observable.just("Alpha", "Beta", "Gamma"),
219+
Observable.range(1,4),
220+
Observable.just(100,200,300)
221+
).subscribe(testObserver)
222+
223+
testObserver.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300))
224+
}
225+
215226
}

0 commit comments

Comments
 (0)