Skip to content

Commit 02699f9

Browse files
committed
Rename to sliding and tumbling
1 parent 58c26d5 commit 02699f9

File tree

3 files changed

+64
-26
lines changed

3 files changed

+64
-26
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,18 +182,23 @@ class RxScalaDemo extends JUnitSuite {
182182
).toObservable.flatten(2).toBlocking.foreach(println(_))
183183
}
184184

185-
@Test def rangeAndBufferExample() {
185+
@Test def tumblingBufferExample() {
186186
val o = Observable.from(1 to 18)
187-
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
187+
o.tumblingBuffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
188188
}
189189

190-
@Test def tumblingBufferExample() {
190+
@Test def tumblingBufferExample2() {
191191
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
192192
val boundary = Observable.interval(500 millis)
193193
o.tumblingBuffer(boundary).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
194194
}
195195

196196
@Test def slidingBufferExample() {
197+
val o = Observable.from(1 to 18).slidingBuffer(4, 2)
198+
o.subscribe(println(_))
199+
}
200+
201+
@Test def slidingBufferExample2() {
197202
val open = Observable.interval(300 millis)
198203
val closing = Observable.interval(600 millis)
199204
val o = Observable.interval(100 millis).take(20).slidingBuffer(open)(_ => closing)
@@ -202,13 +207,18 @@ class RxScalaDemo extends JUnitSuite {
202207
}
203208
}
204209

205-
@Test def windowExample() {
206-
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
210+
@Test def slidingBufferExample3() {
211+
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
212+
o.slidingBuffer(500 millis, 200 millis).toBlocking.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
213+
}
214+
215+
@Test def tumblingExample() {
216+
(for ((o, i) <- Observable.from(1 to 18).tumbling(5).zipWithIndex; n <- o)
207217
yield s"Observable#$i emits $n"
208218
).subscribe(output(_))
209219
}
210220

211-
@Test def tumblingExample() {
221+
@Test def tumblingExample2() {
212222
val windowObservable = Observable.interval(500 millis)
213223
val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1)
214224
(for ((o, i) <- o.tumbling(windowObservable).zipWithIndex; n <- o)
@@ -217,6 +227,22 @@ class RxScalaDemo extends JUnitSuite {
217227
}
218228

219229
@Test def slidingExample() {
230+
val o = Observable.from(1 to 18).sliding(4, 2)
231+
(for ((o, i) <- o.zipWithIndex;
232+
n <- o)
233+
yield s"Observable#$i emits $n"
234+
).toBlocking.foreach(println)
235+
}
236+
237+
@Test def slidingExample2() {
238+
val o = Observable.interval(100 millis).take(20).sliding(500 millis, 200 millis)
239+
(for ((o, i) <- o.zipWithIndex;
240+
n <- o)
241+
yield s"Observable#$i emits $n"
242+
).toBlocking.foreach(println)
243+
}
244+
245+
@Test def slidingExample3() {
220246
val open = Observable.interval(300 millis)
221247
val closing = Observable.interval(600 millis)
222248
val o = Observable.interval(100 millis).take(20).sliding(open)(_ => closing)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ trait Observable[+T]
470470
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers containing at most
471471
* `count` produced values.
472472
*/
473-
def buffer(count: Int): Observable[Seq[T]] = {
473+
def tumblingBuffer(count: Int): Observable[Seq[T]] = {
474474
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(count)
475475
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
476476
}
@@ -491,7 +491,7 @@ trait Observable[+T]
491491
* An [[rx.lang.scala.Observable]] which produces buffers every `skip` values containing at most
492492
* `count` produced values.
493493
*/
494-
def buffer(count: Int, skip: Int): Observable[Seq[T]] = {
494+
def slidingBuffer(count: Int, skip: Int): Observable[Seq[T]] = {
495495
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(count, skip)
496496
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
497497
}
@@ -509,7 +509,7 @@ trait Observable[+T]
509509
* @return
510510
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers with a fixed duration.
511511
*/
512-
def buffer(timespan: Duration): Observable[Seq[T]] = {
512+
def tumblingBuffer(timespan: Duration): Observable[Seq[T]] = {
513513
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit)
514514
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
515515
}
@@ -529,7 +529,7 @@ trait Observable[+T]
529529
* @return
530530
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers with a fixed duration.
531531
*/
532-
def buffer(timespan: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
532+
def tumblingBuffer(timespan: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
533533
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, scheduler)
534534
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
535535
}
@@ -549,7 +549,7 @@ trait Observable[+T]
549549
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after
550550
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
551551
*/
552-
def buffer(timespan: Duration, count: Int): Observable[Seq[T]] = {
552+
def tumblingBuffer(timespan: Duration, count: Int): Observable[Seq[T]] = {
553553
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count)
554554
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
555555
}
@@ -571,7 +571,7 @@ trait Observable[+T]
571571
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after
572572
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
573573
*/
574-
def buffer(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Seq[T]] = {
574+
def tumblingBuffer(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Seq[T]] = {
575575
val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count, scheduler)
576576
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
577577
}
@@ -590,7 +590,7 @@ trait Observable[+T]
590590
* An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after
591591
* a fixed timespan has elapsed.
592592
*/
593-
def buffer(timespan: Duration, timeshift: Duration): Observable[Seq[T]] = {
593+
def slidingBuffer(timespan: Duration, timeshift: Duration): Observable[Seq[T]] = {
594594
val span: Long = timespan.length
595595
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
596596
val unit: TimeUnit = timespan.unit
@@ -614,7 +614,7 @@ trait Observable[+T]
614614
* An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after
615615
* a fixed timespan has elapsed.
616616
*/
617-
def buffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
617+
def slidingBuffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
618618
val span: Long = timespan.length
619619
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
620620
val unit: TimeUnit = timespan.unit
@@ -714,7 +714,7 @@ trait Observable[+T]
714714
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows containing at most
715715
* `count` produced values.
716716
*/
717-
def window(count: Int): Observable[Observable[T]] = {
717+
def tumbling(count: Int): Observable[Observable[T]] = {
718718
// this unnecessary ascription is needed because of this bug (without, compiler crashes):
719719
// https://issues.scala-lang.org/browse/SI-7818
720720
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(count)) : Observable[Observable[T]]
@@ -734,7 +734,7 @@ trait Observable[+T]
734734
* An [[rx.lang.scala.Observable]] which produces windows every `skip` values containing at most
735735
* `count` produced values.
736736
*/
737-
def window(count: Int, skip: Int): Observable[Observable[T]] = {
737+
def sliding(count: Int, skip: Int): Observable[Observable[T]] = {
738738
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(count, skip))
739739
: Observable[Observable[T]] // SI-7818
740740
}
@@ -750,7 +750,7 @@ trait Observable[+T]
750750
* @return
751751
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration.
752752
*/
753-
def window(timespan: Duration): Observable[Observable[T]] = {
753+
def tumbling(timespan: Duration): Observable[Observable[T]] = {
754754
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit))
755755
: Observable[Observable[T]] // SI-7818
756756
}
@@ -768,7 +768,7 @@ trait Observable[+T]
768768
* @return
769769
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration.
770770
*/
771-
def window(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
771+
def tumbling(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
772772
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, scheduler))
773773
: Observable[Observable[T]] // SI-7818
774774
}
@@ -788,7 +788,7 @@ trait Observable[+T]
788788
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows which are emitted after
789789
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
790790
*/
791-
def window(timespan: Duration, count: Int): Observable[Observable[T]] = {
791+
def tumbling(timespan: Duration, count: Int): Observable[Observable[T]] = {
792792
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, count))
793793
: Observable[Observable[T]] // SI-7818
794794
}
@@ -810,7 +810,7 @@ trait Observable[+T]
810810
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows which are emitted after
811811
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
812812
*/
813-
def window(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
813+
def tumbling(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
814814
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, count, scheduler))
815815
: Observable[Observable[T]] // SI-7818
816816
}
@@ -829,7 +829,7 @@ trait Observable[+T]
829829
* An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after
830830
* a fixed timespan has elapsed.
831831
*/
832-
def window(timespan: Duration, timeshift: Duration): Observable[Observable[T]] = {
832+
def sliding(timespan: Duration, timeshift: Duration): Observable[Observable[T]] = {
833833
val span: Long = timespan.length
834834
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
835835
val unit: TimeUnit = timespan.unit
@@ -853,7 +853,7 @@ trait Observable[+T]
853853
* An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after
854854
* a fixed timespan has elapsed.
855855
*/
856-
def window(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
856+
def sliding(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
857857
val span: Long = timespan.length
858858
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
859859
val unit: TimeUnit = timespan.unit

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,14 @@ class CompletenessTest extends JUnitSuite {
7171
"aggregate(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
7272
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
7373
"asObservable()" -> unnecessary,
74-
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
75-
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
74+
"buffer(Int)" -> "tumblingBuffer(Int)",
75+
"buffer(Int, Int)" -> "slidingBuffer(Int, Int)",
76+
"buffer(Long, TimeUnit)" -> "tumblingBuffer(Duration)",
77+
"buffer(Long, TimeUnit, Int)" -> "tumblingBuffer(Duration, Int)",
78+
"buffer(Long, TimeUnit, Int, Scheduler)" -> "tumblingBuffer(Duration, Int, Scheduler)",
79+
"buffer(Long, TimeUnit, Scheduler)" -> "tumblingBuffer(Duration, Scheduler)",
80+
"buffer(Long, Long, TimeUnit)" -> "slidingBuffer(Duration, Duration)",
81+
"buffer(Long, Long, TimeUnit, Scheduler)" -> "slidingBuffer(Duration, Duration, Scheduler)",
7682
"buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "tumblingBuffer(=> Observable[Any])",
7783
"buffer(Observable[B])" -> "tumblingBuffer(=> Observable[Any])",
7884
"buffer(Observable[B], Int)" -> "tumblingBuffer(Observable[Any], Int)",
@@ -181,11 +187,17 @@ class CompletenessTest extends JUnitSuite {
181187
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> "toMultimap(T => K, T => V, () => M, K => B)",
182188
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
183189
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
190+
"window(Int)" -> "tumbling(Int)",
191+
"window(Int, Int)" -> "sliding(Int, Int)",
192+
"window(Long, TimeUnit)" -> "tumbling(Duration)",
193+
"window(Long, TimeUnit, Int)" -> "tumbling(Duration, Int)",
194+
"window(Long, TimeUnit, Int, Scheduler)" -> "tumbling(Duration, Int, Scheduler)",
195+
"window(Long, TimeUnit, Scheduler)" -> "tumbling(Duration, Scheduler)",
184196
"window(Observable[U])" -> "tumbling(=> Observable[Any])",
185197
"window(Func0[_ <: Observable[_ <: TClosing]])" -> "tumbling(=> Observable[Any])",
186198
"window(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "sliding(Observable[Opening])(Opening => Observable[Any])",
187-
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
188-
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
199+
"window(Long, Long, TimeUnit)" -> "sliding(Duration, Duration)",
200+
"window(Long, Long, TimeUnit, Scheduler)" -> "sliding(Duration, Duration, Scheduler)",
189201

190202
// manually added entries for Java static methods
191203
"amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)",

0 commit comments

Comments
 (0)