Skip to content

Commit 07a9b69

Browse files
committed
Merge branch 'master' into default_if_empty
2 parents 97cabf5 + fa72a27 commit 07a9b69

File tree

6 files changed

+168
-38
lines changed

6 files changed

+168
-38
lines changed

CHANGES.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
11
# RxJava Releases #
22

3+
### Version 0.14.1 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.14.1%22)) ###
4+
5+
* [Pull 402](https://github.com/Netflix/RxJava/pull/402) rxjava-apache-http improvements
6+
7+
### Version 0.14.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.14.0%22)) ###
8+
9+
Further progress to the Scala adaptor and a handful of new operators.
10+
11+
Bump to 0.14.0 due to small breaking change to `distinct` operator removing overloaded methods with `Comparator`. These methods were added in 0.13.2 and determined to be incorrect.
12+
13+
This release also includes a new contrib module, [rxjava-apache-http](https://github.com/Netflix/RxJava/tree/master/rxjava-contrib/rxjava-apache-http) that provides an Observable API to the Apache HttpAsyncClient.
14+
15+
* [Pull 396](https://github.com/Netflix/RxJava/pull/396) Add missing methods to Scala Adaptor
16+
* [Pull 390](https://github.com/Netflix/RxJava/pull/390) Operators: ElementAt and ElementAtOrDefault
17+
* [Pull 398](https://github.com/Netflix/RxJava/pull/398) Operators: IsEmpty and Exists (instead of Any)
18+
* [Pull 397](https://github.com/Netflix/RxJava/pull/397) Observable API for Apache HttpAsyncClient 4.0
19+
* [Pull 400](https://github.com/Netflix/RxJava/pull/400) Removing `comparator` overloads of `distinct`
20+
321
### Version 0.13.5
422

523
* Upload to Sonatype failed so version skipped

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.14.1-SNAPSHOT
1+
version=0.14.2-SNAPSHOT
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# rxjava-apache-http
2+
3+
Observable API for Apache [HttpAsyncClient](http://hc.apache.org/httpcomponents-asyncclient-dev/)
4+
5+
It is aware of Content-Type `text/event-stream` and will stream each event via `Observer.onNext`.
6+
7+
Other Content-Types will be returned as a single call to `Observer.onNext`.
8+
9+
Main Classes:
10+
11+
- [ObservableHttp](https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java)
12+
- [ObservableHttpResponse](https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttpResponse.java)
13+
14+
15+
# Binaries
16+
17+
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ccom.netflix.rxjava).
18+
19+
Example for [Maven](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-apache-http%22):
20+
21+
```xml
22+
<dependency>
23+
<groupId>com.netflix.rxjava</groupId>
24+
<artifactId>rxjava-apache-http</artifactId>
25+
<version>x.y.z</version>
26+
</dependency>
27+
```
28+
29+
and for Ivy:
30+
31+
```xml
32+
<dependency org="com.netflix.rxjava" name="rxjava-apache-http" rev="x.y.z" />
33+
```
34+
35+
# Sample Usage
36+
37+
### Create a Request
38+
39+
```java
40+
ObservableHttp.createGet("http://www.wikipedia.com", httpClient).toObservable();
41+
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), httpClient).toObservable();
42+
```
43+
44+
### Http Client
45+
46+
A basic default client:
47+
48+
```java
49+
CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
50+
```
51+
52+
or a custom client with configuration options:
53+
54+
```java
55+
final RequestConfig requestConfig = RequestConfig.custom()
56+
.setSocketTimeout(3000)
57+
.setConnectTimeout(500).build();
58+
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
59+
.setDefaultRequestConfig(requestConfig)
60+
.setMaxConnPerRoute(20)
61+
.setMaxConnTotal(50)
62+
.build();
63+
```
64+
65+
### Normal Http GET
66+
67+
Execute a request and transform the `byte[]` reponse to a `String`:
68+
69+
```groovy
70+
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), client)
71+
.toObservable()
72+
.flatMap({ ObservableHttpResponse response ->
73+
return response.getContent().map({ byte[] bb ->
74+
return new String(bb);
75+
});
76+
})
77+
.toBlockingObservable()
78+
.forEach({ String resp ->
79+
// this will be invoked once with the response
80+
println(resp);
81+
});
82+
```
83+
84+
### Streaming Http GET with [Server-Sent Events (text/event-stream)](http://www.w3.org/TR/eventsource/) Response
85+
86+
Execute a request and transform the `byte[]` response of each event to a `String`:
87+
88+
```groovy
89+
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://hostname/event.stream"), client)
90+
.toObservable()
91+
.flatMap({ ObservableHttpResponse response ->
92+
return response.getContent().map({ byte[] bb ->
93+
return new String(bb);
94+
});
95+
})
96+
.toBlockingObservable()
97+
.forEach({ String resp ->
98+
// this will be invoked for each event
99+
println(resp);
100+
});
101+
```
102+
103+
An example event-stream is from [Hystrix](https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream) used for streaming metrics. An [example webapp](https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp) can be used to test.
104+
105+
Output looks like:
106+
107+
```
108+
data: {"type":"HystrixCommand","name":"CreditCardCommand","group":"CreditCard","currentTime":1379823924934,"isCircuitBreakerOpen":false,"errorPercentage":0,"errorCount":0,"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":0,"rollingCountFallbackFailure":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":0,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":0,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":0,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":3000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1}
109+
data: {"type":"HystrixCommand","name":"GetPaymentInformationCommand","group":"PaymentInformation","currentTime":1379823924934,"isCircuitBreakerOpen":false,"errorPercentage":0,"errorCount":0,"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":0,"rollingCountFallbackFailure":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":0,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":0,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":0,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1}
110+
```
111+

rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> o
143143
final CompositeSubscription parentSubscription = new CompositeSubscription();
144144

145145
// return a Subscription that wraps the Future so it can be cancelled
146-
parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
146+
parentSubscription.add(Subscriptions.from(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
147147
new FutureCallback<HttpResponse>() {
148148

149149
@Override

rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerDelegate.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public ResponseConsumerDelegate(final Observer<? super ObservableHttpResponse> o
5252
@Override
5353
protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {
5454
// when we receive the response with headers we evaluate what type of consumer we want
55-
if (response.getFirstHeader("Content-Type").getValue().equals("text/event-stream")) {
55+
if (response.getFirstHeader("Content-Type").getValue().contains("text/event-stream")) {
56+
// use 'contains' instead of equals since Content-Type can contain additional information
57+
// such as charset ... see here: http://www.w3.org/International/O-HTTP-charset
5658
consumer = new ResponseConsumerEventStream(observer, subscription);
5759
} else {
5860
consumer = new ResponseConsumerBasic(observer, subscription);

rxjava-contrib/rxjava-apache-http/src/test/java/rx/apache/http/examples/ExampleObservableHttp.java

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -77,41 +77,40 @@ public void call(String resp) {
7777

7878
protected static void executeStreamingViaObservableHttpWithForEach(final HttpAsyncClient client) throws URISyntaxException, IOException, InterruptedException {
7979
System.out.println("---- executeStreamingViaObservableHttpWithForEach");
80-
for (int i = 0; i < 5; i++) {
81-
final int c = i + 1;
82-
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://ec2-54-211-91-164.compute-1.amazonaws.com:8077/eventbus.stream?topic=hystrix-metrics"), client)
83-
.toObservable()
84-
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {
85-
86-
@Override
87-
public Observable<String> call(ObservableHttpResponse response) {
88-
return response.getContent().map(new Func1<byte[], String>() {
89-
90-
@Override
91-
public String call(byte[] bb) {
92-
return new String(bb);
93-
}
94-
95-
});
96-
}
97-
})
98-
.filter(new Func1<String, Boolean>() {
99-
100-
@Override
101-
public Boolean call(String t1) {
102-
return !t1.startsWith(": ping");
103-
}
104-
})
105-
.take(3)
106-
.toBlockingObservable()
107-
.forEach(new Action1<String>() {
108-
109-
@Override
110-
public void call(String resp) {
111-
System.out.println("Response [" + c + "]: " + resp + " (" + resp.length() + ")");
112-
}
113-
});
114-
}
80+
// URL against https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp
81+
// More information at https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream
82+
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://localhost:8989/hystrix-examples-webapp/hystrix.stream"), client)
83+
.toObservable()
84+
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {
85+
86+
@Override
87+
public Observable<String> call(ObservableHttpResponse response) {
88+
return response.getContent().map(new Func1<byte[], String>() {
89+
90+
@Override
91+
public String call(byte[] bb) {
92+
return new String(bb);
93+
}
94+
95+
});
96+
}
97+
})
98+
.filter(new Func1<String, Boolean>() {
99+
100+
@Override
101+
public Boolean call(String t1) {
102+
return !t1.startsWith(": ping");
103+
}
104+
})
105+
.take(3)
106+
.toBlockingObservable()
107+
.forEach(new Action1<String>() {
108+
109+
@Override
110+
public void call(String resp) {
111+
System.out.println(resp);
112+
}
113+
});
115114
}
116115

117116
}

0 commit comments

Comments
 (0)