Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: qwang-sj/RxJava
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 1.x
Choose a base ref
...
head repository: qwang-sj/RxJava
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 2.x
Choose a head ref

Commits on Aug 21, 2015

  1. 2.x Initialization

    - clearing out rx.* packages to start building 2.x in io.reactivex
    - starting fresh since the fundamental design will change to adopt ReactiveStreams interfaces and require touching virtually every file
    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    639cbc5 View commit details
  2. Travis 2.x Branch

    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    31995f4 View commit details
  3. Clear for 2.x

    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    9da2523 View commit details
  4. Java 8

    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    a004e04 View commit details
  5. Java 8

    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    a2d3ab8 View commit details
  6. 2.0.0-DP0-SNAPSHOT

    Trying to make the build pick up this version for snapshots on the 2.x branch.
    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    bf47c42 View commit details
  7. 2.0.0-DP0

    Override inferred version.
    benjchristensen committed Aug 21, 2015
    Copy the full SHA
    6879a9d View commit details
  8. Copy the full SHA
    e3afde5 View commit details
  9. Copy the full SHA
    6928fcb View commit details
  10. Javac workaround.

    akarnokd committed Aug 21, 2015
    Copy the full SHA
    50f9171 View commit details
  11. Copy the full SHA
    fa62fe7 View commit details
  12. Merge pull request ReactiveX#3175 from akarnokd/InternalInfrastructure

    2.x: Some safe queue implementations
    akarnokd committed Aug 21, 2015
    Copy the full SHA
    c7d655b View commit details

Commits on Aug 25, 2015

  1. Merge pull request ReactiveX#3172 from akarnokd/RsDependency

    RS dependency, some basic Observable/Observer methods.
    akarnokd committed Aug 25, 2015
    Copy the full SHA
    0aa4ccb View commit details
  2. Copy the full SHA
    9891ed5 View commit details
  3. Merge pull request ReactiveX#3185 from akarnokd/PublishSubject2x

    PublishSubject, AsyncSubject and a few helper classes
    akarnokd committed Aug 25, 2015
    Copy the full SHA
    936a7ef View commit details
  4. Copy the full SHA
    f946a34 View commit details
  5. Merge pull request ReactiveX#3186 from akarnokd/InternalDisposables

    Disposable interface and a bunch of general resource containers.
    akarnokd committed Aug 25, 2015
    Copy the full SHA
    8c3ccfc View commit details
  6. Schedulers (a few) + API, slight modifications to other classes,

    incomplete.
    akarnokd committed Aug 25, 2015
    Copy the full SHA
    e616cd3 View commit details

Commits on Aug 26, 2015

  1. Merge pull request ReactiveX#3188 from akarnokd/SchedulersBaseAPI

    Schedulers (a few) + API, slight modifications to other classes,
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    a6e90cf View commit details
  2. Added a bunch of basic event sources and helper Subscription

    implementations.
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    98b4601 View commit details
  3. Merge pull request ReactiveX#3189 from akarnokd/BasicPublisherSources

    Added a bunch of basic event sources and helper Subscription
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    90a1b90 View commit details
  4. Copy the full SHA
    3c47643 View commit details
  5. Merge pull request ReactiveX#3190 from akarnokd/BasicSubscribers

    Basic Subscribers and plugin class
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    7d87790 View commit details
  6. Operators map and flatMap

    akarnokd committed Aug 26, 2015
    Copy the full SHA
    9d9c190 View commit details
  7. Merge pull request ReactiveX#3191 from akarnokd/OperatorFlatMap2x

    Operators map and flatMap
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    6bbbb17 View commit details
  8. Copy the full SHA
    1a362f0 View commit details
  9. Merge pull request ReactiveX#3193 from akarnokd/ConnectableObservable2x

    ConnectableObservable and autoConnect
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    a2f3c29 View commit details
  10. Copy the full SHA
    46412b6 View commit details
  11. Merge pull request ReactiveX#3194 from akarnokd/OperatorTake2x

    Operators of take (untimed), plugin error reporting fix for other
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    e6c3d6f View commit details
  12. Copy the full SHA
    339dad2 View commit details
  13. Merge pull request ReactiveX#3195 from akarnokd/OperatorSkip2x

    Operators of skip (untimed), filter; fix to takeUntil
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    cc3f291 View commit details
  14. Copy the full SHA
    a6bee14 View commit details
  15. Merge pull request ReactiveX#3196 from akarnokd/OperatorSkip2xMethods

    The skips and filter operators added to Observable
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    0358eac View commit details
  16. Copy the full SHA
    7bb67d3 View commit details
  17. Merge pull request ReactiveX#3197 from akarnokd/OperatorToList2x

    Operator toList and toSortedList
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    f4629d9 View commit details
  18. Copy the full SHA
    c4c3140 View commit details
  19. Copy the full SHA
    b22f24f View commit details
  20. Merge pull request ReactiveX#3198 from akarnokd/OperatorsPublishReplay

    Operators publish, replay and cache.
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    235d900 View commit details
  21. Operators all, any, count, elementAt, single

    akarnokd committed Aug 26, 2015
    Copy the full SHA
    7730e2f View commit details
  22. Merge pull request ReactiveX#3199 from akarnokd/OperatorsAnyAllCount2x

    Operators all, any, count, elementAt, single
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    ea88c24 View commit details
  23. All standard schedulers ported and cleaned up.

    akarnokd committed Aug 26, 2015
    Copy the full SHA
    98074a2 View commit details
  24. Merge pull request ReactiveX#3200 from akarnokd/SchedulersRest2x

    All standard schedulers ported and cleaned up.
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    df22f13 View commit details
  25. Operators observeOn, subscribeOn and unsubscribeOn

    akarnokd committed Aug 26, 2015
    Copy the full SHA
    087542c View commit details
  26. Merge pull request ReactiveX#3202 from akarnokd/ScheduleOn2x

    Operators observeOn, subscribeOn and unsubscribeOn
    akarnokd committed Aug 26, 2015
    Copy the full SHA
    3130b9a View commit details

Commits on Aug 27, 2015

  1. Copy the full SHA
    72a7035 View commit details
  2. Merge pull request ReactiveX#3204 from akarnokd/OperatorConcatMap2x

    Operator concatMap and related convenience methods
    akarnokd committed Aug 27, 2015
    Copy the full SHA
    a67607c View commit details
  3. Copy the full SHA
    9303307 View commit details
  4. Merge pull request ReactiveX#3205 from akarnokd/LambdaSubscribe2x

    Lambda-based subscribe and lifecycle tracking methods
    akarnokd committed Aug 27, 2015
    Copy the full SHA
    7d9e394 View commit details
  5. TestSubscriber implemented

    akarnokd committed Aug 27, 2015
    Copy the full SHA
    f392aaf View commit details
  6. Merge pull request ReactiveX#3206 from akarnokd/TestSubscriber2x

    TestSubscriber implemented
    akarnokd committed Aug 27, 2015
    Copy the full SHA
    6370ef7 View commit details
Showing 1,085 changed files with 155,885 additions and 99,523 deletions.
9 changes: 8 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
language: java
jdk:
- oraclejdk7
- oraclejdk8

# force upgrade Java8 as per https://github.com/travis-ci/travis-ci/issues/4042 (fixes compilation issue)
addons:
apt:
packages:
- oracle-java8-installer

sudo: false
# as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/

3,568 changes: 1 addition & 3,567 deletions CHANGES.md

Large diffs are not rendered by default.

149 changes: 149 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
## RxJava v2 Design

Terminology, principles, contracts, and other aspects of the design of RxJava v2.

### Terminology & Definitions

##### Hot

When used to refer to a data source (such as an `Observable`), it means it does not have side-effects when subscribed to.

For example, an `Observable` of mouse events. Subscribing to that `Observable` does not cause the mouse events, but starts receiving them.

(Note: Yes, there are *some* side-effects of adding a listener, but they are inconsequential as far as the 'hot' usage is concerned).

##### Cold

When used to refer to a data source (such as an `Observable`), it means it has side-effects when subscribed to.

For example, an `Observable` of data from a remote API (such as an RPC call). Each time that `Observable` is subscribed to causes a new network call to occur.

##### Reactive

Producer is in charge. Consumer has to do whatever it needs to keep up.

##### Interactive

Consumer is in charge. Producer has to do whatever it needs to keep up.

##### Push

Producer emits when it wishes to. Related to "reactive". Callbacks are an instance of push.

##### Pull

Consumer requests data when it wishes to. Related to "interactive". An `Iterable` is an instance of pull.

##### Async Pull

Consumer requests data when it wishes, and the data is then pushed when the producer wishes to. The Reactive Streams `Publisher` is an instance of "async pull", as is the 'AsyncEnumerable' in .Net.

### RxJava & Related Types

##### Observable

Stream that supports async and synchronous push. It does not support interactive flow control (`request(n)`).

Usable for:

- hot and cold sources
- sync or async
- push
- 0, 1, many or infinite items

Flow control support:

- buffering, sampling, throttling, windowing, dropping, etc
- temporal and count-based strategies

##### Flowable

Stream that supports async and synchronous push and pull. It supports interactive flow control (`request(n)`).

Usable for:

- hot and cold sources
- sync or async
- push
- pull
- 0, 1, many or infinite items

Flow control support:

- buffering, sampling, throttling, windowing, dropping, etc
- temporal and count-based strategies
- `request(n)` consumer demand signal
- for pull-based sources, this allows batched "async pull"
- for push-based sources, this allows backpressure signals to conditionally apply strategies (i.e. drop, buffer, sample, fail, etc)

##### Observer

Consumer of events without flow control.

##### Publisher

[Reactive Streams producer](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#1-publisher-code) of data

##### Subscriber

[Reactive Streams consumer](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#2-subscriber-code) of data.

##### Subscription

[Reactive Streams state](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#3-subscription-code) of subscription supporting flow control and cancellation.

##### Processor

[Reactive Streams operator](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#4processor-code) for defining behavior between `Publisher` and `Subscriber`. It must obey the contracts of `Publisher` and `Subscriber`, meaning it is sequential, serialized, and must obey `request(n)` flow control.

##### Subject

A "hot", push-based data source that allows a producer to emit events to it and consumers to subscribe to events in a multicast manner. It is "hot" because consumers subscribing to it does not cause side-effects, or affect the data flow in any way. It is push-based and reactive because the producer is fully in charge.

Relation to Reactive Streams

- It can not implement Reactive Streams `Publisher` unless it is created with a default flow control strategy.
- It can not implement `Processor` since a `Processor` must compose `request(n)` which can not be done with multicasting or pure push.

Flow control support:

- buffering, sampling, throttling, windowing, dropping, etc
- temporal and count-based strategies
- It does not support pull-based consumer-driven flow control.

##### Disposable

A type representing work that can be cancelled or disposed.

### Behavior

##### Creation

Creation of a stream falls into the following use cases, all of which should be catered to in API design.

- async, hot, push (ie. system or user events)
- async, cold, push (ie. events resulting from remote system via network connection)
- sync, cold, pull (ie. iterable, file, range)
- async, cold, pull (ie. RPC/REST network call, cross-thread queue draining)

Unknown:

- hot, pull (what is an example of this?)

Flow control support:

- If `request(n)` behavior is supported in the stream implementation, then:
- pull-based creation must support `request(n)` semantics
- push-based creation must provide a default *onBackpressure* strategy
- If `request(n)` behavior is not supported in the stream implementation, then:
- push-based creation can push without consideration of a backpressure strategy
- pull-based creation should be discouraged

##### Destruction

A producer can terminate a stream by emitting `onComplete` or `onError`. A consumer can terminate a stream by calling `cancel`.

Any resource cleanup of the source or operators must account for any of these three termination events. In other words, if an operator needs cleanup, then it should register the cleanup callback with `cancel`, `onError` and `onComplete`.

The final `subscribe` will *not* invoke `cancel` after receiving an `onComplete` or `onError`.

25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,18 @@ RxJava is a Java VM implementation of [Reactive Extensions](http://reactivex.io)

It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern) to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

#### Version 2.x

Version 2.x and 1.x will live side-by-side for several years. They will have different namespaces (io.reactivex vs rx).

The purpose for 2.x is:

- leverage Java 8+ features
- [Reactive Streams](http://www.reactive-streams.org) compatibility
- performance gains through design changes learned through the 1.x cycle

#### Version 1.x

- Zero Dependencies
- < 800KB Jar
- Java 6+ & [Android](https://github.com/ReactiveX/RxAndroid) 2.3+
@@ -17,7 +29,7 @@ Learn more about RxJava on the <a href="https://github.com/ReactiveX/RxJava/wiki

## Master Build Status

<a href='https://travis-ci.org/ReactiveX/RxJava/builds'><img src='https://travis-ci.org/ReactiveX/RxJava.svg?branch=1.x'></a>
<a href='https://travis-ci.org/ReactiveX/RxJava/builds'><img src='https://travis-ci.org/ReactiveX/RxJava.svg?branch=2.x'></a>

## Communication

@@ -27,19 +39,21 @@ Learn more about RxJava on the <a href="https://github.com/ReactiveX/RxJava/wiki

## Versioning

Version 2.x has started development.

Version 1.x is now a stable API and will be supported for several years.

Minor 1.x increments (such as 1.1, 1.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 1.x.y increments (such as 1.0.0 -> 1.0.1, 1.3.1 -> 1.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an `@Beta` or `@Experimental` annotation can also be added in patch releases to allow rapid exploration and iteration of unstable new functionality.
Patch 1.x.y increments (such as 1.0.0 -> 1.0.1, 1.3.1 -> 1.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an [`@Beta`][beta source link] or [`@Experimental`][experimental source link] annotation can also be added in patch releases to allow rapid exploration and iteration of unstable new functionality.

#### @Beta

APIs marked with the `@Beta` annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your own control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).
APIs marked with the [`@Beta`][beta source link] annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your own control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

#### @Experimental

APIs marked with the `@Experimental` annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.
APIs marked with the [`@Experimental`][experimental source link] annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

#### @Deprecated

@@ -123,3 +137,6 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

[beta source link]: https://github.com/ReactiveX/RxJava/blob/master/src/main/java/rx/annotations/Beta.java
[experimental source link]: https://github.com/ReactiveX/RxJava/blob/master/src/main/java/rx/annotations/Experimental.java
18 changes: 13 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
buildscript {
repositories { jcenter() }
dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:2.2.3' }
dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.0.0' }
}

description = 'RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.'

apply plugin: 'rxjava-project'
apply plugin: 'java'
apply plugin: 'nebula.rxjava-project'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-core:1.8.5'
compile 'org.reactivestreams:reactive-streams:1.0.0'
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'

perfCompile 'org.openjdk.jmh:jmh-core:1.10.5'
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.10.5'
// perfCompile 'org.reactivex:rxjava:1.0.14'
}

javadoc {
exclude "**/rx/internal/**"
exclude "**/io/reactivex/internal/**"
}

// support for snapshot/final releases with the various branches RxJava uses
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
release.scope=patch
release.version=2.0.0-DP0-SNAPSHOT
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sat Dec 13 00:15:28 PST 2014
#Wed Feb 10 13:01:27 PST 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.11-bin.zip
10 changes: 3 additions & 7 deletions gradlew
Original file line number Diff line number Diff line change
@@ -42,11 +42,6 @@ case "`uname`" in
;;
esac

# For Cygwin, ensure paths are in UNIX format before anything is touched.
if $cygwin ; then
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
fi

# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
@@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >&-
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >&-
cd "$SAVED" >/dev/null

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar

@@ -114,6 +109,7 @@ fi
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
2 changes: 1 addition & 1 deletion gradlew.bat
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ echo location of your Java installation.
goto fail

:init
@rem Get command-line arguments, handling Windowz variants
@rem Get command-line arguments, handling Windows variants

if not "%OS%" == "Windows_NT" goto win9xME_args
if "%@eval[2+2]" == "4" goto 4NT_args
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/BackpressureStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

public enum BackpressureStrategy {
BUFFER,
DROP,
LATEST
}
2,361 changes: 2,361 additions & 0 deletions src/main/java/io/reactivex/Completable.java

Large diffs are not rendered by default.

3,409 changes: 3,409 additions & 0 deletions src/main/java/io/reactivex/NbpObservable.java

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions src/main/java/io/reactivex/NbpObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.NbpObservable.NbpSubscriber;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

public abstract class NbpObserver<T> implements NbpSubscriber<T> {
private Disposable s;
@Override
public final void onSubscribe(Disposable s) {
if (SubscriptionHelper.validateDisposable(this.s, s)) {
return;
}
this.s = s;
onStart();
}

protected final void cancel() {
s.dispose();
}
/**
* Called once the subscription has been set on this observer; override this
* to perform initialization.
*/
protected void onStart() {
}

}
67 changes: 67 additions & 0 deletions src/main/java/io/reactivex/Notification.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.internal.functions.Objects;

/**
* Utility class to help construct notification objects.
*/
public final class Notification {
private Notification() {
throw new IllegalStateException();
}

static final Try<Optional<Object>> COMPLETE = Try.ofValue(Optional.<Object>empty());

@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Try<Optional<T>> complete() {
return (Try)COMPLETE;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Try<Optional<T>> error(Throwable e) {
return (Try)Try.ofError(e);
}

public static <T> Try<Optional<T>> next(T value) {
Objects.requireNonNull(value, "value is null"); // TODO this coud instead return an error of NPE
return Try.ofValue(Optional.of(value));
}

public static <T> boolean isNext(Try<Optional<T>> notification) {
if (notification.hasValue()) {
return notification.value().isPresent();
}
return false;
}

public static <T> boolean isComplete(Try<Optional<T>> notification) {
if (notification.hasValue()) {
return !notification.value().isPresent();
}
return false;
}

public static <T> boolean isError(Try<Optional<T>> notification) {
return notification.hasError();
}

public static <T> T getValue(Try<Optional<T>> notification) {
if (notification.hasValue()) {
return notification.value.get();
}
return null;
}
}
3,750 changes: 3,750 additions & 0 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions src/main/java/io/reactivex/Observer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import org.reactivestreams.*;

import io.reactivex.internal.subscriptions.SubscriptionHelper;

public abstract class Observer<T> implements Subscriber<T> {
private Subscription s;
@Override
public final void onSubscribe(Subscription s) {
if (SubscriptionHelper.validateSubscription(this.s, s)) {
return;
}
this.s = s;
onStart();
}

protected final Subscription subscription() {
return s;
}

protected final void request(long n) {
subscription().request(n);
}

protected final void cancel() {
subscription().cancel();
}
/**
* Called once the subscription has been set on this observer; override this
* to perform initialization or issue an initial request.
* <p>
* The default implementation requests {@link Long#MAX_VALUE}.
*/
protected void onStart() {
request(Long.MAX_VALUE);
}

}
80 changes: 80 additions & 0 deletions src/main/java/io/reactivex/Optional.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.internal.functions.Objects;

/**
* Simplified backport of Java 8's Optional type.
*
* @param <T> the value type
*/
public final class Optional<T> {
final T value;
protected Optional(T value) {
this.value = value;
}

static final Optional<Object> EMPTY = new Optional<Object>(null);

@SuppressWarnings("unchecked")
public static <T> Optional<T> empty() {
return (Optional<T>)EMPTY;
}

public static <T> Optional<T> of(T value) {
Objects.requireNonNull(value, "value is null");
return new Optional<T>(value);
}

public boolean isPresent() {
return value != null;
}

public T get() {
return value;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Optional<?> other = (Optional<?>) obj;
if (value == null) {
if (other.value != null) {
return false;
}
} else if (!value.equals(other.value)) {
return false;
}
return true;
}


}
177 changes: 177 additions & 0 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.concurrent.TimeUnit;

import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.util.Exceptions;
import io.reactivex.plugins.RxJavaPlugins;

public abstract class Scheduler {

public abstract Worker createWorker();

/**
* Returns the 'current time' of the Scheduler in the specified time unit.
* @param unit the time unit
* @return the 'current time'
*/
public long now(TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/*
* TODO Should the lifecycle methods be part of the public API?
*/
public void start() {

}

public void shutdown() {

}

/*
* TODO This helps reducing the memory usage for
* certain one-shot scheduling required operators (such as interval,
* scalarjust + subscribeOn, etc.) but complicates the API
* surface.
*
* So either have default implementation in Scheduler or
* have the operars check for xxxDirect() support and chose paths accordingly.
*/
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

/**
* Schedules the given runnable with the given delay directly on a worker of this scheduler.
* <p>Override this method to provide an efficient implementation that,
* for example, doesn't have extra tracking structures for such one-shot
* executions.
* @param run the runnable to schedule
* @param delay the delay time
* @param unit the delay unit
* @return the disposable instance that can cancel the task
*/
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);

return w;
}

public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final ArrayCompositeResource<Disposable> acr = new ArrayCompositeResource<Disposable>(2, Disposables.consumeAndDispose());
final Worker w = createWorker();
acr.lazySet(0, w);

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

acr.setResource(1, w.schedulePeriodically(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} catch (Throwable e) {
// make sure the worker is released if the run crashes
acr.dispose();
throw Exceptions.propagate(e);
}
}
}, initialDelay, period, unit));

return acr;
}

public static abstract class Worker implements Disposable {

public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);

public Disposable schedule(Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) {
final MultipleAssignmentResource<Disposable> first = new MultipleAssignmentResource<Disposable>(Disposables.consumeAndDispose());

final MultipleAssignmentResource<Disposable> mar = new MultipleAssignmentResource<Disposable>(Disposables.consumeAndDispose(), first);

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

first.setResource(schedule(new Runnable() {
long lastNow = now(unit);
long startTime = lastNow + initialDelay;
long count;
@Override
public void run() {
decoratedRun.run();

long t = now(unit);
long c = ++count;

long targetTime = startTime + c * period;

long delay;
// if the current time is less than last time
// avoid scheduling the next run too far in the future
if (t < lastNow) {
delay = period;
// TODO not sure about this correction
startTime -= lastNow - c * period;
}
// if the current time is ahead of the target time,
// avoid scheduling a bunch of 0 delayed tasks
else if (t > targetTime) {
delay = period;
// TODO not sure about this correction
startTime += t - c * period;
} else {
delay = targetTime - t;
}

lastNow = t;

mar.setResource(schedule(this, delay, unit));
}
}, initialDelay, unit));

return mar;
}

/**
* Returns the 'current time' of the Worker in the specified time unit.
* @param unit the time unit
* @return the 'current time'
*/
public long now(TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

}
}
1,849 changes: 1,849 additions & 0 deletions src/main/java/io/reactivex/Single.java

Large diffs are not rendered by default.

117 changes: 117 additions & 0 deletions src/main/java/io/reactivex/Try.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.internal.functions.Objects;

/**
* Container for either a value of type T or a Throwable.
*
* @param <T> the value type
*/
public final class Try<T> {
/** The value. */
final T value;
/** The error or null if this holds a value. */
final Throwable error;

private Try(T value, Throwable error) {
this.value = value;
this.error = error;
}

/**
* Constructs a Try instance by wrapping the given value.
*
* @param <T> the value type
* @param value the value to wrap
* @return the created Try instance
*/
public static <T> Try<T> ofValue(T value) {
// TODO ? Objects.requireNonNull(value);
return new Try<T>(value, null);
}

/**
* Constructs a Try instance by wrapping the given Throwable.
*
* <p>Null Throwables are replaced by NullPointerException instance in this Try.
*
* @param <T> the value type
* @param e the exception to wrap
* @return the new Try instance holding the exception
*/
public static <T> Try<T> ofError(Throwable e) {
return new Try<T>(null, e != null ? e : new NullPointerException());
}

/**
* Returns the value or null if the value is actually null or if this Try holds an error instead.
* @return the value contained
* @see #hasValue()
*/
public T value() {
return value;
}

/**
* Returns the error or null if this Try holds a value instead.
*
* @return the Throwable contained or null
*
*/
public Throwable error() {
return error;
}

/**
* Returns true if this Try holds an error.
* @return true if this Try holds an error
*/
public boolean hasError() {
return error != null;
}

/**
* Returns true if this Try holds a value.
* @return true if this Try holds a value
*/
public boolean hasValue() {
return error == null;
}

@Override
public boolean equals(Object other) {
if (other instanceof Try) {

Try<?> o = (Try<?>) other;
return Objects.equals(value, o.value)
&& Objects.equals(error, o.error);
}
return false;
}

@Override
public int hashCode() {
return Objects.hashCode(value) + Objects.hashCode(error);
}

@Override
public String toString() {
if (error != null) {
return "Try[ " + error + " ]";
}
return "Try[" + value + "]";
}
}
47 changes: 47 additions & 0 deletions src/main/java/io/reactivex/annotations/BackpressureKind.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Enumeration for various kinds of backpressure support.
*/
public enum BackpressureKind {
/**
* The backpressure-related requests pass through this operator without change
*/
PASS_THROUGH,
/**
* The operator fully supports backpressure and may coordinate downstream requests
* with upstream requests through batching, arbitration or by other means.
*/
FULL,
/**
* The operator performs special backpressure management; see the associated javadoc.
*/
SPECIAL,
/**
* The operator requests Long.MAX_VALUE from upstream but respects the backpressure
* of the downstream.
*/
UNBOUNDED_IN,
/**
* The operator will emit a MissingBackpressureException if the downstream didn't request
* enough or in time.
*/
ERROR,
/**
* The operator ignores all kinds of backpressure and may overflow the downstream.
*/
NONE
}
30 changes: 30 additions & 0 deletions src/main/java/io/reactivex/annotations/BackpressureSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

import java.lang.annotation.*;

/**
* Indicates the backpressure support kind of the associated operator or class.
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface BackpressureSupport {
/**
* The backpressure supported by this method or class.
* @return backpressure supported by this method or class.
*/
BackpressureKind value();
}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/annotations/Beta.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Indicates the feature is in beta state: it will be most likely stay but
* the signature may change between versions without warning.
*/
public @interface Beta {

}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/annotations/Experimental.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Indicates the feature is in experimental state: its existence, signature or behavior
* might change without warning from one release to the next.
*/
public @interface Experimental {

}
48 changes: 48 additions & 0 deletions src/main/java/io/reactivex/annotations/SchedulerKind.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Indicates what scheduler the method or class uses by default
*/
public enum SchedulerKind {
/**
* The operator/class doesn't use schedulers.
*/
NONE,
/**
* The operator/class runs on the computation scheduler or takes timing information from it.
*/
COMPUTATION,
/**
* The operator/class runs on the io scheduler or takes timing information from it.
*/
IO,
/**
* The operator/class runs on the new thread scheduler or takes timing information from it.
*/
NEW_THREAD,
/**
* The operator/class runs on the trampoline scheduler or takes timing information from it.
*/
TRAMPOLINE,
/**
* The operator/class runs on the single scheduler or takes timing information from it.
*/
SINGLE,
/**
* The operator/class requires a scheduler to be manually specified.
*/
CUSTOM
}
30 changes: 30 additions & 0 deletions src/main/java/io/reactivex/annotations/SchedulerSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

import java.lang.annotation.*;

/**
* Indicates what kind of scheduler the class or method uses.
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface SchedulerSupport {
/**
* The kind of scheduler the class or method uses.
* @return the kind of scheduler the class or method uses
*/
SchedulerKind value();
}
51 changes: 51 additions & 0 deletions src/main/java/io/reactivex/disposables/BooleanDisposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.disposables;

import java.util.concurrent.atomic.AtomicReference;

public final class BooleanDisposable implements Disposable {
final AtomicReference<Runnable> run = new AtomicReference<Runnable>();

static final Runnable DISPOSED = new Runnable() {
@Override
public void run() { }
};

public BooleanDisposable() {
this(new Runnable() {
@Override
public void run() { }
});
}

public BooleanDisposable(Runnable run) {
this.run.lazySet(run);
}

@Override
public void dispose() {
Runnable r = run.get();
if (r != DISPOSED) {
r = run.getAndSet(DISPOSED);
if (r != DISPOSED) {
r.run();
}
}
}

public boolean isDisposed() {
return run.get() == DISPOSED;
}
}
59 changes: 59 additions & 0 deletions src/main/java/io/reactivex/disposables/CompositeDisposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.disposables;

import io.reactivex.internal.disposables.SetCompositeResource;
import io.reactivex.internal.functions.Objects;

/**
* A disposable container that can hold onto multiple other disposables.
*/
public final class CompositeDisposable implements Disposable {

final SetCompositeResource<Disposable> resources;

public CompositeDisposable() {
resources = new SetCompositeResource<Disposable>(Disposables.consumeAndDispose());
}

public CompositeDisposable(Disposable... resources) {
Objects.requireNonNull(resources, "resources is null");
this.resources = new SetCompositeResource<Disposable>(Disposables.consumeAndDispose(), resources);
}

public CompositeDisposable(Iterable<? extends Disposable> resources) {
Objects.requireNonNull(resources, "resources is null");
this.resources = new SetCompositeResource<Disposable>(Disposables.consumeAndDispose(), resources);
}

@Override
public void dispose() {
resources.dispose();
}

public boolean isDisposed() {
return resources.isDisposed();
}

public void add(Disposable d) {
resources.add(d);
}

public void remove(Disposable d) {
resources.remove(d);
}

public void clear() {
resources.clear();
}
}
26 changes: 26 additions & 0 deletions src/main/java/io/reactivex/disposables/Disposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.disposables;

/**
* Represents a disposable resource.
*/
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();

// TODO let's see if we really need this
// boolean isDisposed();
}
182 changes: 182 additions & 0 deletions src/main/java/io/reactivex/disposables/Disposables.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.disposables;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.reactivestreams.Subscription;

import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Objects;

/**
* Utility class to help create disposables by wrapping
* other types.
*/
public final class Disposables {
/** Utility class. */
private Disposables() {
throw new IllegalStateException("No instances!");
}

public static Disposable from(Runnable run) {
Objects.requireNonNull(run, "run is null");
return new RunnableDisposable(run);
}


public static Disposable from(Future<?> future) {
return from(future, true);
}

public static Disposable from(final Subscription subscription) {
Objects.requireNonNull(subscription, "subscription is null");
return new Disposable() {
@Override
public void dispose() {
subscription.cancel();
}
};
}

public static Disposable from(Future<?> future, boolean allowInterrupt) {
Objects.requireNonNull(future, "future is null");
return new FutureDisposable(future, allowInterrupt);
}

static final Disposable EMPTY = new Disposable() {
@Override
public void dispose() { }
};

public static Disposable empty() {
return EMPTY;
}

// TODO there is no way to distinguish a disposed and non-disposed resource
static final Disposable DISPOSED = new Disposable() {
@Override
public void dispose() { }
};

public static Disposable disposed() {
return DISPOSED;
}

public static CompositeDisposable from(Disposable... resources) {
return new CompositeDisposable(resources);
}

/** Wraps a Runnable instance. */
static final class RunnableDisposable
extends AtomicReference<Runnable>
implements Disposable {

/** */
private static final long serialVersionUID = 4892876354773733738L;

static final Runnable DISPOSED = new Runnable() {
@Override
public void run() { }
};

public RunnableDisposable(Runnable run) {
super(run);
}

@Override
public void dispose() {
Runnable r = get();
if (r != DISPOSED) {
r = getAndSet(DISPOSED);
if (r != DISPOSED) {
r.run();
}
}
}
}

/** Wraps a Future instance. */
static final class FutureDisposable
extends AtomicReference<Future<?>>
implements Disposable {
/** */
private static final long serialVersionUID = -569898983717900525L;

final boolean allowInterrupt;

public FutureDisposable(Future<?> run, boolean allowInterrupt) {
super(run);
this.allowInterrupt = allowInterrupt;
}

@Override
public void dispose() {
Future<?> r = get();
if (r != DisposedFuture.INSTANCE) {
r = getAndSet(DisposedFuture.INSTANCE);
if (r != DisposedFuture.INSTANCE) {
r.cancel(allowInterrupt);
}
}
}
}

/** A singleton instance of the disposed future. */
enum DisposedFuture implements Future<Object> {
INSTANCE;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return true;
}

@Override
public boolean isDone() {
return false;
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
}

@Override
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}

static final Consumer<Disposable> DISPOSER = new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
d.dispose();
}
};

/**
* Returns a consumer that calls dispose on the received Disposable.
* @return the consumer that calls dispose on the received Disposable.
*/
public static Consumer<Disposable> consumeAndDispose() {
return DISPOSER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.disposables;

import io.reactivex.internal.disposables.*;

public final class MultipleAssignmentDisposable implements Disposable {
final MultipleAssignmentResource<Disposable> resource;

public MultipleAssignmentDisposable() {
this.resource = new MultipleAssignmentResource<Disposable>(Disposables.consumeAndDispose());
}

public MultipleAssignmentDisposable(Disposable initialDisposable) {
this.resource = new MultipleAssignmentResource<Disposable>(Disposables.consumeAndDispose(), initialDisposable);
}

public void set(Disposable d) {
this.resource.setResource(d);
}

public Disposable get() {
Object o = resource.getResource();
if (o == null) {
if (resource.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
}
return (Disposable)o;
}

@Override
public void dispose() {
resource.dispose();
}

public boolean isDisposed() {
return resource.isDisposed();
}
}
90 changes: 90 additions & 0 deletions src/main/java/io/reactivex/disposables/RefCountDisposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.disposables;

import java.util.concurrent.atomic.*;

import io.reactivex.internal.functions.Objects;

public final class RefCountDisposable implements Disposable {

final AtomicReference<Disposable> resource = new AtomicReference<Disposable>();

static final Disposable DISPOSED = new Disposable() {
@Override
public void dispose() { }
};

final AtomicInteger count = new AtomicInteger();

final AtomicBoolean once = new AtomicBoolean();

public RefCountDisposable(Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
this.resource.lazySet(resource);
count.lazySet(1);
}

@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
if (count.decrementAndGet() == 0) {
disposeActual();
}
}
}

void disposeActual() {
Disposable d = resource.get();
if (d != DISPOSED) {
d = resource.getAndSet(DISPOSED);
if (d != DISPOSED && d != null) {
d.dispose();
}
}
}

public Disposable get() {
count.getAndIncrement();
return new InnerDisposable(this);
}

void release() {
if (count.decrementAndGet() == 0) {
disposeActual();
}
}

public boolean isDisposed() {
return resource == DISPOSED;
}

static final class InnerDisposable extends AtomicBoolean implements Disposable {
/** */
private static final long serialVersionUID = -7435605952646106082L;

final RefCountDisposable parent;

public InnerDisposable(RefCountDisposable parent) {
this.parent = parent;
}

@Override
public void dispose() {
if (!get() && compareAndSet(false, true)) {
parent.release();
}
}
}
}
52 changes: 52 additions & 0 deletions src/main/java/io/reactivex/disposables/SerialDisposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.disposables;

import io.reactivex.internal.disposables.*;

public final class SerialDisposable implements Disposable {
final SerialResource<Disposable> resource;

public SerialDisposable() {
this.resource = new SerialResource<Disposable>(Disposables.consumeAndDispose());
}

public SerialDisposable(Disposable initialDisposable) {
this.resource = new SerialResource<Disposable>(Disposables.consumeAndDispose(), initialDisposable);
}


public void set(Disposable d) {
this.resource.setResource(d);
}

public Disposable get() {
Object o = resource.getResource();
if (o == null) {
if (resource.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
}
return (Disposable)o;
}

@Override
public void dispose() {
resource.dispose();
}

public boolean isDisposed() {
return resource.isDisposed();
}
}
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;
package io.reactivex.exceptions;

import java.io.PrintStream;
import java.io.PrintWriter;
@@ -28,7 +28,7 @@
/**
* Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException}
* does not modify the structure of any exception it wraps, but at print-time it iterates through the list of
* Throwables contained in the composit in order to print them all.
* Throwables contained in the composite in order to print them all.
*
* Its invariant is to contain an immutable, ordered (by insertion order), unique list of non-composite
* exceptions. You can retrieve individual exceptions in this list with {@link #getExceptions()}.
@@ -46,6 +46,24 @@ public final class CompositeException extends RuntimeException {
private final List<Throwable> exceptions;
private final String message;

public CompositeException() {
this.exceptions = new ArrayList<Throwable>();
this.message = null;
}

public CompositeException(Throwable... exceptions) {
this.exceptions = new ArrayList<Throwable>();
this.message = null;
if (exceptions == null) {
this.exceptions.add(new NullPointerException("exceptions is null"));
} else {
for (Throwable t : exceptions) {
this.exceptions.add(t != null ? t : new NullPointerException("One of the exceptions is null"));
}
}
}


public CompositeException(String messagePrefix, Collection<? extends Throwable> errors) {
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
List<Throwable> _exceptions = new ArrayList<Throwable>();
@@ -87,6 +105,17 @@ public String getMessage() {
return message;
}

/**
* Adds a suppressed exception to this composite.
* <p>The method is named this way to avoid conflicts with Java 7 environments
* and its addSuppressed() method.
* @param e the exception to suppress, nulls are converted to NullPointerExceptions
*/
public void suppress(Throwable e) {
exceptions.add(e != null ? e : new NullPointerException("null exception"));
}


private Throwable cause = null;

@Override
@@ -247,7 +276,7 @@ public String getMessage() {
}
}

private final List<Throwable> getListOfCauses(Throwable ex) {
private List<Throwable> getListOfCauses(Throwable ex) {
List<Throwable> list = new ArrayList<Throwable>();
Throwable root = ex.getCause();
if (root == null) {
@@ -263,4 +292,17 @@ private final List<Throwable> getListOfCauses(Throwable ex) {
}
}
}
}
public int size() {
return exceptions.size();
}

/**
* Returns true if this CompositeException doesn't have a cause or
* any suppressed exceptions.
* @return true if this CompositeException doesn't have a cause or
* any suppressed exceptions.
*/
public boolean isEmpty() {
return exceptions.isEmpty() && getCause() == null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

/**
* Indicates that an operator attempted to emit a value but the downstream wasn't ready for it.
*/
public class MissingBackpressureException extends RuntimeException {
/** */
private static final long serialVersionUID = 8517344746016032542L;

public MissingBackpressureException() {
super();
}

public MissingBackpressureException(String message) {
super(message);
}

public MissingBackpressureException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

public final class OnCompleteFailedException extends RuntimeException {
/** */
private static final long serialVersionUID = -6179993283427447098L;

public OnCompleteFailedException(Throwable cause) {
super(cause);
}

}
23 changes: 23 additions & 0 deletions src/main/java/io/reactivex/exceptions/OnErrorFailedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

public final class OnErrorFailedException extends RuntimeException {
/** */
private static final long serialVersionUID = 2656125445290831911L;

public OnErrorFailedException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

public final class OnErrorNotImplementedException extends RuntimeException {
/** */
private static final long serialVersionUID = -3698670655303683299L;

public OnErrorNotImplementedException() {
super();
}

public OnErrorNotImplementedException(String message, Throwable cause) {
super(message, cause);
}

public OnErrorNotImplementedException(String message) {
super(message);
}

public OnErrorNotImplementedException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

public final class UnsubscribeFailedException extends RuntimeException {
/** */
private static final long serialVersionUID = 8947024194181365640L;

public UnsubscribeFailedException(Throwable cause) {
super(cause);
}

}
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/functions/BiConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface BiConsumer<T1, T2> {

void accept(T1 t1, T2 t2);
}
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/functions/BiFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface BiFunction<T1, T2, R> {

R apply(T1 t1, T2 t2);
}
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/functions/BiPredicate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface BiPredicate<T1, T2> {

boolean test(T1 t1, T2 t2);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/BooleanSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface BooleanSupplier {
boolean getAsBoolean();
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Consumer<T> {
void accept(T t);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function<T, R> {
R apply(T t);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function3.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function3<T1, T2, T3, R> {
R apply(T1 t1, T2 t2, T3 t3);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function4.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function4<T1, T2, T3, T4, R> {
R apply(T1 t1, T2 t2, T3 t3, T4 t4);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function5.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function5<T1, T2, T3, T4, T5, R> {
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function6.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function6<T1, T2, T3, T4, T5, T6, R> {
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function7.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function7<T1, T2, T3, T4, T5, T6, T7, R> {
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function8.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function8<T1, T2, T3, T4, T5, T6, T7, T8, R> {
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Function9.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> {
R apply(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}
17 changes: 17 additions & 0 deletions src/main/java/io/reactivex/functions/IntFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.functions;

public interface IntFunction<T> {
T apply(int i);
}
17 changes: 17 additions & 0 deletions src/main/java/io/reactivex/functions/LongConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.functions;

public interface LongConsumer {
void accept(long t);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/functions/Predicate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Predicate<T> {
boolean test(T t);
}
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/functions/Supplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.functions;

public interface Supplier<T> {

T get();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import java.util.concurrent.atomic.AtomicReferenceArray;

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;

/**
* A composite resource with a fixed number of slots.
*
* <p>Note that since the implementation leaks the methods of AtomicReferenceArray, one must be
* careful to only call setResource, replaceResource and dispose on it. All other methods may lead to undefined behavior
* and should be used by internal means only.
*
* @param <T> the resource tpye
*/
public final class ArrayCompositeResource<T> extends AtomicReferenceArray<Object> implements Disposable {
/** */
private static final long serialVersionUID = 2746389416410565408L;

final Consumer<? super T> disposer;

static final Object DISPOSED = new Object();

public ArrayCompositeResource(int capacity, Consumer<? super T> disposer) {
super(capacity);
this.disposer = disposer;
}

/**
* Sets the resource at the specified index and disposes the old resource.
* @param index
* @param resource
* @return true if the resource has ben set, false if the composite has been disposed
*/
@SuppressWarnings("unchecked")
public boolean setResource(int index, T resource) {
for (;;) {
Object o = get(index);
if (o == DISPOSED) {
disposer.accept(resource);
return false;
}
if (compareAndSet(index, o, resource)) {
if (o != null) {
disposer.accept((T)o);
}
return true;
}
}
}

/**
* Replaces the resource at the specified index and returns the old resource.
* @param index
* @param resource
* @return the old resource, can be null
*/
@SuppressWarnings("unchecked")
public T replaceResource(int index, T resource) {
for (;;) {
Object o = get(index);
if (o == DISPOSED) {
disposer.accept(resource);
return null;
}
if (compareAndSet(index, o, resource)) {
return (T)o;
}
}
}

@Override
@SuppressWarnings("unchecked")
public void dispose() {
if (get(0) != DISPOSED) {
int s = length();
for (int i = 0; i < s; i++) {
Object o = get(i);
if (o != DISPOSED) {
o = getAndSet(i, DISPOSED);
if (o != DISPOSED && o != null) {
disposer.accept((T)o);
}
}
}
}
}

public boolean isDisposed() {
return get(0) == DISPOSED;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

public interface CompositeResource<T> {

boolean add(T resource);

boolean remove(T resource);

boolean delete(T resource);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import io.reactivex.NbpObservable.NbpSubscriber;
import io.reactivex.disposables.Disposable;

public enum EmptyDisposable implements Disposable {
INSTANCE
;

@Override
public void dispose() {
// no-op
}

public static void complete(NbpSubscriber<?> s) {
s.onSubscribe(INSTANCE);
s.onComplete();
}

public static void error(Throwable e, NbpSubscriber<?> s) {
s.onSubscribe(INSTANCE);
s.onError(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import java.util.LinkedList;

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;

/**
* A linked-list-based composite resource with custom disposer callback.
*
* @param <T> the resource type
*/
public final class ListCompositeResource<T> implements CompositeResource<T>, Disposable {
final Consumer<? super T> disposer;

/** Indicates this resource has been disposed. */
volatile boolean disposed;

/** The set of resources, accessed while holding a lock on this. */
LinkedList<T> list;

public ListCompositeResource(Consumer<? super T> disposer) {
this.disposer = disposer;
}

public ListCompositeResource(Consumer<? super T> disposer, T... initialResources) {
this(disposer);
int n = initialResources.length;
if (n != 0) {
list = new LinkedList<T>();
for (T r : initialResources) {
list.add(r);
}
}
}

public ListCompositeResource(Consumer<? super T> disposer, Iterable<? extends T> initialResources) {
this(disposer);
list = new LinkedList<T>();
for (T r : initialResources) {
list.add(r);
}
}

/**
* Adds a new resource to this composite or disposes it if the composite has been disposed.
* @param newResource the new resource to add, not-null (not checked)
* @return false if the container is disposed
*/
@Override
public boolean add(T newResource) {
if (!disposed) {
synchronized (this) {
if (!disposed) {
LinkedList<T> a = list;
if (a == null) {
a = new LinkedList<T>();
list = a;
}
a.add(newResource);
return true;
}
}
}
disposer.accept(newResource);
return false;
}

/**
* Removes the given resource from this composite and calls the disposer if the resource
* was indeed in the composite.
* @param resource the resource to remove, not-null (not verified)
* @return false if the resource was not in this container
*/
@Override
public boolean remove(T resource) {
if (delete(resource)) {
disposer.accept(resource);
return true;
}
return false;
}

/**
* Removes the given resource if contained within this composite but doesn't call the disposer for it.
* @param resource the resource to delete, not-null (not verified)
* @return false if the resource was not in this container
*/
@Override
public boolean delete(T resource) {
if (disposed) {
return false;
}
synchronized (this) {
if (disposed) {
return false;
}
LinkedList<T> a = list;
if (a == null || a.isEmpty()) {
return false;
}

return a.remove(resource);
}
}

@Override
public void dispose() {
if (!disposed) {
LinkedList<T> s;
synchronized (this) {
if (disposed) {
return;
}
disposed = true;
s = list;
list = null;
}
if (s != null) {
for (T t : s) {
disposer.accept(t);
}
}
}
}

public boolean isDisposed() {
return disposed;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.util.TerminalAtomicsHelper;

/**
* Holds onto resources with a custom disposer callback and replacing a resource doesn't
* call the disposer but only when the MultipleAssignmentResource is disposed.
*
* <p>This resource container disposable helps in avoiding the wrapping of other resources
* into Disposables.
*
* <p>Note that since the implementation leaks the methods of AtomicReference, one must be
* careful to only call setResource and dispose on it. All other methods may lead to undefined behavior
* and should be used by internal means only.
*
* @param <T> the resource type
*/
public final class MultipleAssignmentResource<T> extends AtomicReference<Object> implements Disposable {
/** */
private static final long serialVersionUID = 5247635821051810205L;
/** The callback to dispose the resource. */
final Consumer<? super T> disposer;
/** The indicator object that this container has been disposed. */
static final Object DISPOSED = new Object();

/**
* Constructor with a custom disposer callback.
* @param disposer
*/
public MultipleAssignmentResource(Consumer<? super T> disposer) {
this.disposer = disposer;
}

/**
* Constructor with a custom disposer callback and the initial resource
* @param disposer
* @param initialResource
*/
public MultipleAssignmentResource(Consumer<? super T> disposer, T initialResource) {
this(disposer);
lazySet(initialResource);
}

/**
* Atomically replaces the current resource with the new resource but doesn't call the disposer
* for it.
* @param newResource the new resource to replace the old one
*/
@SuppressWarnings("unchecked")
public void setResource(T newResource) {
TerminalAtomicsHelper.update(this, newResource, DISPOSED, (Consumer<Object>)disposer);
}

/**
* Returns the current held resource or null if no resource
* is set or the container has been disposed.
* @return the currently held resource
*/
@SuppressWarnings("unchecked")
public T getResource() {
Object d = get();
if (d == DISPOSED) {
return null;
}
return (T)d;
}

@Override
@SuppressWarnings("unchecked")
public void dispose() {
TerminalAtomicsHelper.terminate(this, DISPOSED, (Consumer<Object>)disposer);
}

/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
public boolean isDisposed() {
return get() == DISPOSED;
}
}
182 changes: 182 additions & 0 deletions src/main/java/io/reactivex/internal/disposables/NbpFullArbiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.NbpObservable.NbpSubscriber;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Performs full arbitration of Subscriber events with strict drain (i.e., old emissions of another
* subscriber are dropped).
*
* @param <T> the value type
*/
public final class NbpFullArbiter<T> extends FullArbiterPad1 implements Disposable {
final NbpSubscriber<? super T> actual;
final SpscLinkedArrayQueue<Object> queue;

volatile Disposable s;
static final Disposable INITIAL = new Disposable() {
@Override
public void dispose() { }
};


Disposable resource;

volatile boolean cancelled;

public NbpFullArbiter(NbpSubscriber<? super T> actual, Disposable resource, int capacity) {
this.actual = actual;
this.resource = resource;
this.queue = new SpscLinkedArrayQueue<Object>(capacity);
this.s = INITIAL;
}

@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
disposeResource();
}
}

void disposeResource() {
Disposable d = resource;
resource = null;
if (d != null) {
d.dispose();
}
}

public boolean setSubscription(Disposable s) {
if (cancelled) {
return false;
}

queue.offer(this.s, NotificationLite.disposable(s));
drain();
return true;
}

public boolean onNext(T value, Disposable s) {
if (cancelled) {
return false;
}

queue.offer(s, NotificationLite.next(value));
drain();
return true;
}

public void onError(Throwable value, Disposable s) {
if (cancelled) {
RxJavaPlugins.onError(value);
return;
}
queue.offer(s, NotificationLite.error(value));
drain();
}

public void onComplete(Disposable s) {
queue.offer(s, NotificationLite.complete());
drain();
}

void drain() {
if (wip.getAndIncrement() != 0) {
return;
}

int missed = 1;

final SpscLinkedArrayQueue<Object> q = queue;
final NbpSubscriber<? super T> a = actual;

for (;;) {

for (;;) {
Object o = q.peek();

if (o == null) {
break;
}

q.poll();
Object v = q.poll();

if (o != s) {
continue;
} else
if (NotificationLite.isDisposable(v)) {
Disposable next = NotificationLite.getDisposable(v);
if (s != null) {
s.dispose();
}
s = next;
} else
if (NotificationLite.isError(v)) {
q.clear();
disposeResource();

Throwable ex = NotificationLite.getError(v);
if (!cancelled) {
cancelled = true;
a.onError(ex);
} else {
RxJavaPlugins.onError(ex);
}
} else
if (NotificationLite.isComplete(v)) {
q.clear();
disposeResource();

if (!cancelled) {
cancelled = true;
a.onComplete();
}
} else {
a.onNext(NotificationLite.<T>getValue(v));
}
}

missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}

/** Pads the object header away. */
class FullArbiterPad0 {
volatile long p1a, p2a, p3a, p4a, p5a, p6a, p7a;
volatile long p8a, p9a, p10a, p11a, p12a, p13a, p14a, p15a;
}

/** The work-in-progress counter. */
class FullArbiterWip extends FullArbiterPad0 {
final AtomicInteger wip = new AtomicInteger();
}

/** Pads the wip counter away. */
class FullArbiterPad1 extends FullArbiterWip {
volatile long p1b, p2b, p3b, p4b, p5b, p6b, p7b;
volatile long p8b, p9b, p10b, p11b, p12b, p13b, p14b, p15b;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.util.TerminalAtomicsHelper;

/**
* Holds onto resources with a custom disposer callback and replacing a resource calls
* the disposer with the old value.
*
* <p>This resource container disposable helps in avoiding the wrapping of other resources
* into Disposables.
*
* <p>Note that since the implementation leaks the methods of AtomicReference, one must be
* careful to only call setResource and dispose on it. All other methods may lead to undefined behavior
* and should be used by internal means only.
*
* @param <T> the resource type
*/
public final class SerialResource<T> extends AtomicReference<Object> implements Disposable {
/** */
private static final long serialVersionUID = 5247635821051810205L;
/** The callback to dispose the resource. */
final Consumer<? super T> disposer;
/** The indicator object that this container has been disposed. */
static final Object DISPOSED = new Object();

/**
* Constructor with a custom disposer callback.
* @param disposer
*/
public SerialResource(Consumer<? super T> disposer) {
this.disposer = disposer;
}

/**
* Constructor with a custom disposer callback and the initial resource
* @param disposer
* @param initialResource
*/
public SerialResource(Consumer<? super T> disposer, T initialResource) {
this(disposer);
lazySet(initialResource);
}

/**
* Atomically replaces the current resource with the new resource but doesn't call the disposer
* for it.
* @param newResource the new resource to replace the old one
* @return true if the set succeeded, false if the container is disposed
*/
@SuppressWarnings("unchecked")
public boolean setResource(T newResource) {
return TerminalAtomicsHelper.set(this, newResource, DISPOSED, (Consumer<Object>)disposer);
}

@Override
@SuppressWarnings("unchecked")
public void dispose() {
TerminalAtomicsHelper.terminate(this, DISPOSED, (Consumer<Object>)disposer);
}

/**
* Returns the current held resource or null if no resource
* is set or the container has been disposed.
* @return the currently held resource
*/
@SuppressWarnings("unchecked")
public T getResource() {
Object d = get();
if (d == DISPOSED) {
return null;
}
return (T)d;
}

/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
public boolean isDisposed() {
return get() == DISPOSED;
}
}
Loading