Skip to content

Commit 032d3af

Browse files
Merge pull request ReactiveX#1282 from abersnaze/closeable
utility function for creating observables for closeable resources
2 parents 0586b8a + fff2817 commit 032d3af

File tree

2 files changed

+232
-45
lines changed

2 files changed

+232
-45
lines changed

rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java

Lines changed: 102 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
*/
1616
package rx.observables;
1717

18+
import rx.Observable;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Observable.Operator;
21+
import rx.Subscriber;
22+
import rx.Subscription;
23+
import rx.functions.Func0;
24+
import rx.functions.Func1;
25+
import rx.functions.Func2;
26+
27+
import java.io.Closeable;
1828
import java.io.IOException;
1929
import java.io.InputStream;
2030
import java.io.Reader;
@@ -27,15 +37,10 @@
2737
import java.nio.charset.CoderResult;
2838
import java.nio.charset.CodingErrorAction;
2939
import java.util.Arrays;
40+
import java.util.concurrent.Callable;
41+
import java.util.concurrent.atomic.AtomicBoolean;
3042
import java.util.regex.Pattern;
3143

32-
import rx.Observable;
33-
import rx.Observable.OnSubscribe;
34-
import rx.Observable.Operator;
35-
import rx.Subscriber;
36-
import rx.functions.Func1;
37-
import rx.functions.Func2;
38-
3944
public class StringObservable {
4045
/**
4146
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
@@ -50,6 +55,73 @@ public class StringObservable {
5055
public static Observable<byte[]> from(final InputStream i) {
5156
return from(i, 8 * 1024);
5257
}
58+
59+
private static class CloseableResource<S extends Closeable> implements Subscription {
60+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
61+
private S closable;
62+
63+
public CloseableResource(S closeable) {
64+
this.closable = closeable;
65+
}
66+
67+
@Override
68+
public void unsubscribe() {
69+
if (unsubscribed.compareAndSet(false, true)) {
70+
try {
71+
closable.close();
72+
} catch (Exception e) {
73+
throw new RuntimeException(e);
74+
}
75+
}
76+
}
77+
78+
@Override
79+
public boolean isUnsubscribed() {
80+
return unsubscribed.get();
81+
}
82+
}
83+
84+
/**
85+
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
86+
* @see StringObservable#from(UnsafeFunc0, UnsafeFunc1)
87+
*
88+
* @param <R>
89+
*/
90+
public static interface UnsafeFunc0<R> extends Callable<R> {
91+
public R call() throws Exception;
92+
}
93+
94+
/**
95+
* Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe.
96+
*
97+
* <pre>
98+
* StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
99+
* </pre>
100+
*
101+
* @param resourceFactory
102+
* Generates a new {@link Closeable} resource for each new subscription to the returned Observable
103+
* @param observableFactory
104+
* Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)}
105+
* @return
106+
*/
107+
public static <R, S extends Closeable> Observable<R> using(final UnsafeFunc0<S> resourceFactory,
108+
final Func1<S, Observable<R>> observableFactory) {
109+
return Observable.using(new Func0<CloseableResource<S>>() {
110+
@Override
111+
public CloseableResource<S> call() {
112+
try {
113+
return new CloseableResource<S>(resourceFactory.call());
114+
} catch (Throwable e) {
115+
throw new RuntimeException(e);
116+
}
117+
}
118+
}, new Func1<CloseableResource<S>, Observable<R>>() {
119+
@Override
120+
public Observable<R> call(CloseableResource<S> t1) {
121+
return observableFactory.call(t1.closable);
122+
}
123+
});
124+
}
53125

54126
/**
55127
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
@@ -320,10 +392,24 @@ public byte[] call(String str) {
320392
* @return the Observable returing all strings concatenated as a single string
321393
*/
322394
public static Observable<String> stringConcat(Observable<String> src) {
323-
return src.reduce(new Func2<String, String, String>() {
395+
return toString(src.reduce(new StringBuilder(), new Func2<StringBuilder, String, StringBuilder>() {
324396
@Override
325-
public String call(String a, String b) {
326-
return a + b;
397+
public StringBuilder call(StringBuilder a, String b) {
398+
return a.append(b);
399+
}
400+
}));
401+
}
402+
403+
/**
404+
* Maps {@link Observable}&lt;{@link Object}&gt; to {@link Observable}&lt;{@link String}&gt; by using {@link String#valueOf(Object)}
405+
* @param src
406+
* @return
407+
*/
408+
public static Observable<String> toString(Observable<?> src) {
409+
return src.map(new Func1<Object, String>() {
410+
@Override
411+
public String call(Object obj) {
412+
return String.valueOf(obj);
327413
}
328414
});
329415
}
@@ -429,11 +515,11 @@ private void output(String part) {
429515
* @return an Observable which emits a single String value having the concatenated
430516
* values of the source observable with the separator between elements
431517
*/
432-
public static <T> Observable<String> join(final Observable<T> source, final CharSequence separator) {
433-
return source.lift(new Operator<String, T>() {
518+
public static Observable<String> join(final Observable<String> source, final CharSequence separator) {
519+
return source.lift(new Operator<String, String>() {
434520
@Override
435-
public Subscriber<T> call(final Subscriber<? super String> o) {
436-
return new Subscriber<T>(o) {
521+
public Subscriber<String> call(final Subscriber<? super String> o) {
522+
return new Subscriber<String>(o) {
437523
boolean mayAddSeparator;
438524
StringBuilder b = new StringBuilder();
439525

@@ -455,12 +541,12 @@ public void onError(Throwable e) {
455541
}
456542

457543
@Override
458-
public void onNext(Object t) {
544+
public void onNext(String t) {
459545
if (mayAddSeparator) {
460546
b.append(separator);
461547
}
462548
mayAddSeparator = true;
463-
b.append(String.valueOf(t));
549+
b.append(t);
464550
}
465551
};
466552
}

0 commit comments

Comments
 (0)