Skip to content

Commit d92b151

Browse files
Fixes to rxjava-apache-http
- made Content-Type inspection more reliable - other small improvments
1 parent c033ecc commit d92b151

File tree

3 files changed

+38
-37
lines changed

3 files changed

+38
-37
lines changed

rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> o
143143
final CompositeSubscription parentSubscription = new CompositeSubscription();
144144

145145
// return a Subscription that wraps the Future so it can be cancelled
146-
parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
146+
parentSubscription.add(Subscriptions.from(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
147147
new FutureCallback<HttpResponse>() {
148148

149149
@Override

rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerDelegate.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public ResponseConsumerDelegate(final Observer<? super ObservableHttpResponse> o
5252
@Override
5353
protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {
5454
// when we receive the response with headers we evaluate what type of consumer we want
55-
if (response.getFirstHeader("Content-Type").getValue().equals("text/event-stream")) {
55+
if (response.getFirstHeader("Content-Type").getValue().contains("text/event-stream")) {
56+
// use 'contains' instead of equals since Content-Type can contain additional information
57+
// such as charset ... see here: http://www.w3.org/International/O-HTTP-charset
5658
consumer = new ResponseConsumerEventStream(observer, subscription);
5759
} else {
5860
consumer = new ResponseConsumerBasic(observer, subscription);

rxjava-contrib/rxjava-apache-http/src/test/java/rx/apache/http/examples/ExampleObservableHttp.java

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -77,41 +77,40 @@ public void call(String resp) {
7777

7878
protected static void executeStreamingViaObservableHttpWithForEach(final HttpAsyncClient client) throws URISyntaxException, IOException, InterruptedException {
7979
System.out.println("---- executeStreamingViaObservableHttpWithForEach");
80-
for (int i = 0; i < 5; i++) {
81-
final int c = i + 1;
82-
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://ec2-54-211-91-164.compute-1.amazonaws.com:8077/eventbus.stream?topic=hystrix-metrics"), client)
83-
.toObservable()
84-
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {
85-
86-
@Override
87-
public Observable<String> call(ObservableHttpResponse response) {
88-
return response.getContent().map(new Func1<byte[], String>() {
89-
90-
@Override
91-
public String call(byte[] bb) {
92-
return new String(bb);
93-
}
94-
95-
});
96-
}
97-
})
98-
.filter(new Func1<String, Boolean>() {
99-
100-
@Override
101-
public Boolean call(String t1) {
102-
return !t1.startsWith(": ping");
103-
}
104-
})
105-
.take(3)
106-
.toBlockingObservable()
107-
.forEach(new Action1<String>() {
108-
109-
@Override
110-
public void call(String resp) {
111-
System.out.println("Response [" + c + "]: " + resp + " (" + resp.length() + ")");
112-
}
113-
});
114-
}
80+
// URL against https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp
81+
// More information at https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream
82+
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://localhost:8989/hystrix-examples-webapp/hystrix.stream"), client)
83+
.toObservable()
84+
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {
85+
86+
@Override
87+
public Observable<String> call(ObservableHttpResponse response) {
88+
return response.getContent().map(new Func1<byte[], String>() {
89+
90+
@Override
91+
public String call(byte[] bb) {
92+
return new String(bb);
93+
}
94+
95+
});
96+
}
97+
})
98+
.filter(new Func1<String, Boolean>() {
99+
100+
@Override
101+
public Boolean call(String t1) {
102+
return !t1.startsWith(": ping");
103+
}
104+
})
105+
.take(3)
106+
.toBlockingObservable()
107+
.forEach(new Action1<String>() {
108+
109+
@Override
110+
public void call(String resp) {
111+
System.out.println(resp);
112+
}
113+
});
115114
}
116115

117116
}

0 commit comments

Comments
 (0)