22
33import java .util .concurrent .TimeUnit ;
44
5- import rx .Observable ;
6- import rx .Subscriber ;
7- import rx .schedulers .Schedulers ;
5+ import io .reactivex .Observable ;
6+ import io .reactivex .ObservableEmitter ;
7+ import io .reactivex .schedulers .Schedulers ;
8+
9+
810
911public class FlowControlWindowExample {
1012
1113 public static void main (String args []) {
1214 // buffer every 500ms (using 999999999 to mark start of output)
13- hotStream ().window (500 , TimeUnit .MILLISECONDS ).take (10 ).flatMap (w -> w .startWith (999999999 )).toBlocking (). forEach (System .out ::println );
15+ hotStream ().window (500 , TimeUnit .MILLISECONDS ).take (10 ).flatMap (w -> w .startWith (999999999 )).blockingForEach (System .out ::println );
1416
1517 // buffer 10 items at a time (using 999999999 to mark start of output)
16- hotStream ().window (10 ).take (2 ).flatMap (w -> w .startWith (999999999 )).toBlocking (). forEach (System .out ::println );
18+ hotStream ().window (10 ).take (2 ).flatMap (w -> w .startWith (999999999 )).blockingForEach (System .out ::println );
1719
1820 System .out .println ("Done" );
1921 }
@@ -22,8 +24,8 @@ public static void main(String args[]) {
2224 * This is an artificial source to demonstrate an infinite stream that bursts intermittently
2325 */
2426 public static Observable <Integer > hotStream () {
25- return Observable .create ((Subscriber <? super Integer > s ) -> {
26- while (!s .isUnsubscribed ()) {
27+ return Observable .create ((ObservableEmitter < Integer > s ) -> {
28+ while (!s .isDisposed ()) {
2729 // burst some number of items
2830 for (int i = 0 ; i < Math .random () * 20 ; i ++) {
2931 s .onNext (i );
0 commit comments