Skip to content

Covariant Support with super/extends and OnSubscribeFunc #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
34942ad
making Func0 covariant in its return type, cleaning up a few warnings…
Aug 31, 2013
52342a2
added variance to Func1 (hopefully) everywhere...
Aug 31, 2013
f8cba6a
added variance to Func2, too
Aug 31, 2013
21bc549
added variance to all other Func*; this breaks Scala for good, it see…
Aug 31, 2013
69c6e80
added variance to Action*, too
Aug 31, 2013
0438505
lots of Observer<? super X>
Aug 31, 2013
6b9867c
updated to Scala 2.10.2 again, repaired Scala tests, generalized two …
Aug 31, 2013
6bd2033
UnitTest confirming compilation failure without super/extends and suc…
benjchristensen Aug 31, 2013
98cd711
Need to stay pinned on Scala 2.10.1 still …
benjchristensen Aug 31, 2013
ac6a0a1
Zip: Order of Generics and Artities 5-9
benjchristensen Aug 31, 2013
ebaa616
Merge pull request #1 from benjchristensen/super-extends-additions
Sep 1, 2013
a127b06
adapted RxImplicits tests againt zip to new generics order, renamed t…
Sep 1, 2013
eba4857
generalized everything in Observable that deals with covariance of ob…
Sep 1, 2013
29289d1
Timestamped, Notification and Future are now also treated as covariant
Sep 1, 2013
78a0a1b
added an unnecessary explicit cast because the Jenkins java compiler …
Sep 1, 2013
1ca5900
Generalized all the operators, too
Sep 1, 2013
04f35cd
generalized BlockingObservable and the execution hook further
Sep 1, 2013
e962d00
added a few 'compiler' tests
Sep 1, 2013
68d181b
removed some <? super Throwable>s because that's rather unnecessary
Sep 4, 2013
51dd848
Merged in master so that the gradle pull request build has a chance t…
Sep 4, 2013
94f5fbe
Merge branch 'super-extends' of git://github.com/jmhofer/RxJava into …
benjchristensen Sep 4, 2013
b7de349
Add OnSubscribeFunction and refactor to support it
benjchristensen Sep 4, 2013
a1ad9c4
Adding implicit for OnSubscribeFunc
mattrjacobs Sep 4, 2013
3585570
Change OnSubscribeFunc.call to OnSubscribeFunc.onSubscribe
benjchristensen Sep 4, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.groovy;

import groovy.lang.Closure;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Concrete wrapper that accepts a {@link Closure} and produces a {@link OnSubscribeFunc}.
*
* @param <T>
*/
public class GroovyOnSubscribeFuncWrapper<T> implements OnSubscribeFunc<T> {

private final Closure<Subscription> closure;

public GroovyOnSubscribeFuncWrapper(Closure<Subscription> closure) {
this.closure = closure;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return closure.call(observer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.observables.BlockingObservable;
import rx.util.functions.Action;
import rx.util.functions.Function;
Expand All @@ -52,33 +53,6 @@ public RxGroovyExtensionModule() {
super("RxGroovyExtensionModule", "1.0");
}

/**
* Keeping this code around a little while as it was hard to figure out ... and I'm still messing with it while debugging.
*
* Once the rest of this ExtensionModule stuff is working I'll delete this method.
*
* This is used for manually initializing rather than going via the org.codehaus.groovy.runtime.ExtensionModule properties file.
*/
public static void initializeManuallyForTesting() {
System.out.println("initialize");
MetaClassRegistryImpl mcRegistry = ((MetaClassRegistryImpl) GroovySystem.getMetaClassRegistry());
// RxGroovyExtensionModule em = new RxGroovyExtensionModule();

Properties p = new Properties();
p.setProperty("moduleFactory", "rx.lang.groovy.RxGroovyPropertiesModuleFactory");
Map<CachedClass, List<MetaMethod>> metaMethods = new HashMap<CachedClass, List<MetaMethod>>();
mcRegistry.registerExtensionModuleFromProperties(p, RxGroovyExtensionModule.class.getClassLoader(), metaMethods);

for (ExtensionModule m : mcRegistry.getModuleRegistry().getModules()) {
System.out.println("Module: " + m.getName());
}

for (CachedClass cc : metaMethods.keySet()) {
System.out.println("Adding MetaMethods to CachedClass: " + cc);
cc.addNewMopMethods(metaMethods.get(cc));
}
}

@SuppressWarnings("rawtypes")
@Override
public List<MetaMethod> getMetaMethods() {
Expand Down Expand Up @@ -135,6 +109,8 @@ public Object invoke(Object object, Object[] arguments) {
if (o instanceof Closure) {
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyActionWrapper((Closure) o);
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
} else {
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.mockito.MockitoAnnotations;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
Expand Down Expand Up @@ -296,9 +297,9 @@ def class ObservableTests {
}


def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
def class AsyncObservable implements OnSubscribeFunc {

public Subscription call(final Observer<Integer> observer) {
public Subscription onSubscribe(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object RxImplicits {
import java.{ lang => jlang }
import language.implicitConversions

import rx.Observable
import rx.{ Observable, Observer, Subscription }
import rx.Observable.OnSubscribeFunc
import rx.observables.BlockingObservable
import rx.util.functions._

Expand Down Expand Up @@ -56,7 +57,7 @@ object RxImplicits {
}

/**
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
* Converts a function shaped like compareTo into the equivalent Rx Func2
*/
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
new Func2[A, A, jlang.Integer] {
Expand Down Expand Up @@ -100,13 +101,18 @@ object RxImplicits {
def call(a: A, b: B, c: C, d: D) = f(a, b, c, d)
}

implicit def onSubscribeFunc[A](f: (Observer[_ >: A]) => Subscription): OnSubscribeFunc[A] =
new OnSubscribeFunc[A] {
override def onSubscribe(a: Observer[_ >: A]) = f(a)
}

/**
* This implicit class implements all of the methods necessary for including Observables in a
* for-comprehension. Note that return type is always Observable, so that the ScalaObservable
* type never escapes the for-comprehension
*/
implicit class ScalaObservable[A](wrapped: Observable[A]) {
def map[B](f: A => B): Observable[B] = wrapped.map(f)
def map[B](f: A => B): Observable[B] = wrapped.map[B](f)
def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f)
def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f)
def withFilter(p: A => Boolean): WithFilter = new WithFilter(p)
Expand All @@ -131,7 +137,9 @@ class UnitTestSuite extends JUnitSuite {
import org.mockito.Mockito._
import org.mockito.{ MockitoAnnotations, Mock }
import rx.{ Notification, Observer, Observable, Subscription }
import rx.Observable.OnSubscribeFunc
import rx.observables.GroupedObservable
import rx.subscriptions.Subscriptions
import collection.mutable.ArrayBuffer
import collection.JavaConverters._

Expand All @@ -147,7 +155,7 @@ class UnitTestSuite extends JUnitSuite {
class ObservableWithException(s: Subscription, values: String*) extends Observable[String] {
var t: Thread = null

override def subscribe(observer: Observer[String]): Subscription = {
override def subscribe(observer: Observer[_ >: String]): Subscription = {
println("ObservableWithException subscribed to ...")
t = new Thread(new Runnable() {
override def run() {
Expand Down Expand Up @@ -175,7 +183,6 @@ class UnitTestSuite extends JUnitSuite {
}

// tests of static methods

@Test def testSingle {
assertEquals(1, Observable.from(1).toBlockingObservable.single)
}
Expand Down Expand Up @@ -208,6 +215,11 @@ class UnitTestSuite extends JUnitSuite {
case ex: Throwable => fail("Caught unexpected exception " + ex.getCause + ", expected IllegalStateException")
}
}

@Test def testCreateFromOnSubscribeFunc {
val created = Observable.create((o: Observer[_ >: Integer]) => Subscriptions.empty)
//no assertions on subscription, just testing the implicit
}

@Test def testFromJavaInterop {
val observable = Observable.from(List(1, 2, 3).asJava)
Expand Down Expand Up @@ -248,7 +260,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testFlattenMerge {
val observable = Observable.from(Observable.from(1, 2, 3))
val merged = Observable.merge(observable)
val merged = Observable.merge[Int](observable)
assertSubscribeReceives(merged)(1, 2, 3)
}

Expand All @@ -272,6 +284,18 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(synchronized)(1, 2, 3)
}

@Test def testZip2() {
val colors: Observable[String] = Observable.from("red", "green", "blue")
val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro")

case class Character(color: String, name: String)

val cheetara = Character("green", "cheetara")
val panthro = Character("blue", "panthro")
val characters = Observable.zip[String, String, Character](colors, names, Character.apply _)
assertSubscribeReceives(characters)(cheetara, panthro)
}

@Test def testZip3() {
val numbers = Observable.from(1, 2, 3)
val colors = Observable.from("red", "green", "blue")
Expand All @@ -283,7 +307,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara")
val panthro = Character(3, "blue", "panthro")

val characters = Observable.zip(numbers, colors, names, Character.apply _)
val characters = Observable.zip[Int, String, String, Character](numbers, colors, names, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand All @@ -299,7 +323,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara", false)
val panthro = Character(3, "blue", "panthro", false)

val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _)
val characters = Observable.zip[Int, String, String, Boolean, Character](numbers, colors, names, isLeader, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand Down Expand Up @@ -338,7 +362,8 @@ class UnitTestSuite extends JUnitSuite {
@Test def testMap {
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
val mappedNumbers = ArrayBuffer.empty[Int]
numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => {
val mapped: Observable[Int] = numbers map ((x: Int) => x * x)
mapped.subscribe((squareVal: Int) => {
mappedNumbers.append(squareVal)
})
assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList)
Expand Down Expand Up @@ -458,18 +483,9 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(skipped)(3, 4)
}

/**
* Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
* observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
* it should produce onNext(first), onNext(second), and 1 onCompleted
*
* Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
*/
@Test def testTake {
import rx.operators._

val observable = Observable.from(1, 2, 3, 4, 5)
val took = Observable.create(OperationTake.take(observable, 2))
val took = observable.take(2)
assertSubscribeReceives(took)(1, 2)
}

Expand All @@ -479,11 +495,11 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(took)(1, 3, 5)
}

/*@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4)
assertSubscribeReceives(took)(9, 11)
}*/
@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8)
assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11)
}

@Test def testTakeLast {
val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
Expand Down Expand Up @@ -559,7 +575,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testFilterInForComprehension {
val doubler = (i: Int) => Observable.from(i, i)
val filteredObservable = for {
val filteredObservable: Observable[Int] = for {
i: Int <- Observable.from(1, 2, 3, 4)
j: Int <- doubler(i) if isOdd(i)
} yield j
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package rx.android.concurrency;

import android.os.Handler;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
Expand Down Expand Up @@ -39,7 +41,7 @@ public HandlerThreadScheduler(Handler handler) {
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
}

Expand All @@ -56,7 +58,7 @@ public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscr
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
handler.postDelayed(new Runnable() {
Expand All @@ -76,6 +78,7 @@ public static final class UnitTest {
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
Expand All @@ -94,6 +97,7 @@ public void shouldScheduleImmediateActionOnHandlerThread() {
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private SwingScheduler() {
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
EventQueue.invokeLater(new Runnable() {
@Override
Expand All @@ -75,7 +75,7 @@ public void call() {
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
long delay = unit.toMillis(dueTime);
assertThatTheDelayIsValidForTheSwingTimer(delay);
Expand Down Expand Up @@ -113,7 +113,7 @@ public void call() {
}

@Override
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
final AtomicReference<Timer> timer = new AtomicReference<Timer>();

final long delay = unit.toMillis(period);
Expand Down
Loading