Skip to content

Commit 94ffb4f

Browse files
authored
More operators for Observable<Single>, Flowable<Single>, etc. (ReactiveX#101)
* add several more operators for Observable<Single>, Flowable<Single>, etc... * remove import io.reactivex.* * rid plural maybes in operator name * rid some operators targeting `Observable<Single>` and `Observable<Maybe>` and Flowable counterparts, out of scope * update version to 2.0.0
1 parent 127fe0e commit 94ffb4f

File tree

7 files changed

+93
-48
lines changed

7 files changed

+93
-48
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
version=2.0.0-RC2
1+
version=2.0.0
22
org.gradle.jvmargs=-Xms256m -Xmx1024m -XX:MaxPermSize=256m

src/main/kotlin/io/reactivex/rxkotlin/completable.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Completable
4+
import io.reactivex.Flowable
5+
import io.reactivex.Observable
46
import io.reactivex.functions.Action
57
import java.util.concurrent.Callable
68
import java.util.concurrent.Future
@@ -9,3 +11,16 @@ fun Action.toCompletable(): Completable = Completable.fromAction(this)
911
fun Callable<out Any>.toCompletable(): Completable = Completable.fromCallable(this)
1012
fun Future<out Any>.toCompletable(): Completable = Completable.fromFuture(this)
1113
fun (() -> Any).toCompletable(): Completable = Completable.fromCallable(this)
14+
15+
16+
// EXTENSION FUNCTION OPERATORS
17+
18+
/**
19+
* Merges the emissions of a Observable<Completable>. Same as calling `flatMapSingle { it }`.
20+
*/
21+
fun Observable<Completable>.mergeAllCompletables() = flatMapCompletable { it }
22+
23+
/**
24+
* Merges the emissions of a Flowable<Completable>. Same as calling `flatMap { it }`.
25+
*/
26+
fun Flowable<Completable>.mergeAllCompletables() = flatMapCompletable { it }

src/main/kotlin/io/reactivex/rxkotlin/flowable.kt

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,26 @@ inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class
6161

6262
private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
6363
override fun iterator(): Iterator<T> = this@toIterable
64-
}
64+
}
65+
66+
//EXTENSION FUNCTION OPERATORS
67+
68+
/**
69+
* Merges the emissions of a Flowable<Flowable<T>>. Same as calling `flatMap { it }`.
70+
*/
71+
fun <T : Any> Flowable<Flowable<T>>.mergeAll() = flatMap { it }
72+
73+
74+
/**
75+
* Concatenates the emissions of an Flowable<Flowable<T>>. Same as calling `concatMap { it }`.
76+
*/
77+
fun <T : Any> Flowable<Flowable<T>>.concatAll() = concatMap { it }
78+
79+
80+
/**
81+
* Emits the latest `Flowable<T>` emitted through an `Flowable<Flowable<T>>`. Same as calling `switchMap { it }`.
82+
*/
83+
fun <T : Any> Flowable<Flowable<T>>.switchLatest() = switchMap { it }
84+
85+
86+
fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)

src/main/kotlin/io/reactivex/rxkotlin/maybe.kt

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.reactivex.rxkotlin
22

3+
import io.reactivex.Flowable
34
import io.reactivex.Maybe
5+
import io.reactivex.Observable
46
import java.util.concurrent.Callable
57
import java.util.concurrent.Future
68

@@ -10,4 +12,18 @@ fun <T : Any> Callable<T>.toMaybe(): Maybe<T> = Maybe.fromCallable(this)
1012
fun <T : Any> (() -> T).toMaybe(): Maybe<T> = Maybe.fromCallable(this)
1113

1214
inline fun <reified R : Any> Maybe<Any>.cast(): Maybe<R> = cast(R::class.java)
13-
inline fun <reified R : Any> Maybe<Any>.ofType(): Maybe<R> = ofType(R::class.java)
15+
inline fun <reified R : Any> Maybe<Any>.ofType(): Maybe<R> = ofType(R::class.java)
16+
17+
18+
19+
// EXTENSION FUNCTION OPERATORS
20+
21+
/**
22+
* Merges the emissions of a Observable<Maybe<T>>. Same as calling `flatMapMaybe { it }`.
23+
*/
24+
fun <T : Any> Observable<Maybe<T>>.mergeAllMaybes() = flatMapMaybe { it }
25+
26+
/**
27+
* Merges the emissions of a Flowable<Maybe<T>>. Same as calling `flatMap { it }`.
28+
*/
29+
fun <T : Any> Flowable<Maybe<T>>.mergeAllMaybes() = flatMapMaybe { it }

src/main/kotlin/io/reactivex/rxkotlin/observable.kt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,25 @@ inline fun <reified R : Any> Observable<*>.ofType(): Observable<R> = ofType(R::c
6161

6262
private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
6363
override fun iterator(): Iterator<T> = this@toIterable
64-
}
64+
}
65+
66+
67+
// EXTENSION FUNCTION OPERATORS
68+
69+
/**
70+
* Merges the emissions of an Observable<Observable<T>>. Same as calling `flatMap { it }`.
71+
*/
72+
fun <T : Any> Observable<Observable<T>>.mergeAll() = flatMap { it }
73+
74+
/**
75+
* Concatenates the emissions of an Observable<Observable<T>>. Same as calling `concatMap { it }`.
76+
*/
77+
fun <T : Any> Observable<Observable<T>>.concatAll() = concatMap { it }
78+
79+
/**
80+
* Emits the latest `Observable<T>` emitted through an `Observable<Observable<T>>`. Same as calling `switchMap { it }`.
81+
*/
82+
fun <T : Any> Observable<Observable<T>>.switchLatest() = switchMap { it }
83+
84+
fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)
85+

src/main/kotlin/io/reactivex/rxkotlin/operators.kt

Lines changed: 0 additions & 44 deletions
This file was deleted.

src/main/kotlin/io/reactivex/rxkotlin/single.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.reactivex.rxkotlin
22

3+
import io.reactivex.Flowable
4+
import io.reactivex.Observable
35
import io.reactivex.Single
46
import java.util.concurrent.Callable
57
import java.util.concurrent.Future
@@ -10,3 +12,16 @@ fun <T : Any> Callable<T>.toSingle(): Single<T> = Single.fromCallable(this)
1012
fun <T : Any> (() -> T).toSingle(): Single<T> = Single.fromCallable(this)
1113

1214
inline fun <reified R : Any> Single<Any>.cast(): Single<R> = cast(R::class.java)
15+
16+
17+
// EXTENSION FUNCTION OPERATORS
18+
19+
/**
20+
* Merges the emissions of a Observable<Single<T>>. Same as calling `flatMapSingle { it }`.
21+
*/
22+
fun <T : Any> Observable<Single<T>>.mergeAllSingles() = flatMapSingle { it }
23+
24+
/**
25+
* Merges the emissions of a Flowable<Single<T>>. Same as calling `flatMap { it }`.
26+
*/
27+
fun <T : Any> Flowable<Single<T>>.mergeAllSingles() = flatMapSingle { it }

0 commit comments

Comments
 (0)