-
Notifications
You must be signed in to change notification settings - Fork 7.6k
AsyncOnSubscribe offers no convenience for non-Observable sources #4177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
You can wrap a callback-based API with Observable<T> obs = Observable.create(s -> {
Callback<T> cb = new Callback<T>() {
@Override
public void onSuccess(T data) {
s.onNext(data);
}
@Override
public void onFailure(Exception ex) {
s.onError(ex);
}
};
AutoCloseable close = api.method(cb);
s.add(Subscribers.create(() -> {
try {
close.close();
} catch (Exception ex) {
RxJavaHooks.onError(ex);
}
});
}).onBackpressureDrop(); |
I don't argue your point as
Just a general question here, why would you use create? In theory using However This feature might be used for pagination over data from a Reactive Streams enabled RPC such as Reactive Sockets or APIs that expose a ListenableFuture (using RxJava-Guava project to adapt it to an Observable). @akarnokd has struggled to see value here. I don't think that I will have the time to continue arguing for this feature. |
@stealthcode Fair enough. The convenience of request handling lured me towards it! So at this point I know how to write a valid
This won't handle things like For an example, let's say I have this API: interface Prompter {
void prompt(String message, Callback cb);
interface Callback {
void ok(String response);
void cancel();
}
} How would you implement a method with the following signature Observable<String> prompt(String message); such that subscribing and This seems ridiculously complicated to do and is why so many turn to I really like |
Sounds like you need what we'd done in Reactor Core: FluxCreate and its use (plus this and this). |
Yes. That looks awesome! Is that something we could adapt for RxJava 1's types? Or at worst for 2? On Fri, Jul 8, 2016 at 11:49 AM David Karnok [email protected]
|
I can port it to RxJava 1.x. In the meantime, could you review #4178? |
Fair trade! |
@JakeWharton have you considered using the |
Can you show an example? I tried, but failed... final class Prompter {
interface Callback {
void ok(String message);
void cancel();
}
void prompt(String message, Callback cb) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
if (new Random().nextBoolean()) {
cb.ok(message + " => Hello!");
} else {
cb.cancel();
}
}).start();
}
} public final class Test {
private final Prompter prompter;
Test(Prompter prompter) {
this.prompter = prompter;
}
Observable<String> prompt(String message) {
return Observable.create(AsyncOnSubscribe.createStateless(
new Action2<Long, Observer<Observable<? extends String>>>() {
private final AtomicBoolean run = new AtomicBoolean();
@Override public void call(Long requested,
Observer<Observable<? extends String>> observableObserver) {
if (!run.compareAndSet(false, true)) return;
observableObserver.onNext(Observable.create(
SyncOnSubscribe.createStateless(new Action1<Observer<? super String>>() {
@Override public void call(Observer<? super String> observer) {
prompter.prompt(message, new Prompter.Callback() {
@Override public void ok(String message) {
observer.onNext(message);
observer.onCompleted();
}
@Override public void cancel() {
observer.onError(new CancellationException());
}
});
}
})));
observableObserver.onCompleted();
}
}));
}
public static void main(String... args) {
Test test = new Test(new Prompter());
test.prompt("Hello?") //
.toBlocking() //
.subscribe(System.out::println);
}
} Note: I'm using Java 6 syntax for the actual wrapping code because not everyone can use Java 8 and I think it's important to emphasize the ease/simplicity of wrapping. |
See #4179 AsyncOnSubscribe is a source responding to requests and can't deal with push sources in the generator method but only pull-like sources (like pull an element from a queue and create an Observable for it). |
This isn't exactly the example that I wanted to show but here is an example that uses RxJava-Guava for integrating a public static void main(String[] args) {
Observable<ByteBuffer> bytes = Observable.create(AsyncOnSubscribe.createStateful(
() -> 0l,
(waterMark, requestAmount, observableObserver) -> {
long start = requestAmount;
long end = waterMark + requestAmount;
ListenableFutureTask<ByteBuffer> future = ListenableFutureTask.create(() -> getData(start, end));
Observable<ByteBuffer> requestProducer = ListenableFutureObservable.from(future, Schedulers.computation());
observableObserver.onNext(requestProducer);
return end;
}));
}
private static ByteBuffer getData(long start, long end) {
// this is where you would produce the data (here we just allocate an arbitrary bytebuffer)
return ByteBuffer.allocate((int)(end - start));
}
|
Closing via #4179. |
@JakeWharton I'm interested in your feedback on the above approach. |
That doesn't appear to be truly asynchronous, but instead just blocks on a different thread (computation in this case) waiting for a notification. |
Where does it block? The example I gave uses a |
I assumed |
AsyncOnSubscribe
currently assumes I can trivially supply it with anObservable
that emits up ton
items. If you're abstracting over a callback based API there isn't an easy way to hook this in.The current best way is to use a
Subject
, but this seems like a very heavyweight tool when theObservable
is already going to be routed to aUnicastSubject
in theAsyncOnSubscribe
internals.Are we meant to be creating backpressure-ignorant
Observables
usingcreate(OnSubscribe)
inside of theAsyncOnSubscribe
callback?Is there a way that we can provide a
Subscriber<T>
in the action in another overload to simplify the case where we want to abstract over something that's not already modeled as anObservable<T>
?The text was updated successfully, but these errors were encountered: