Skip to content

AsyncOnSubscribe offers no convenience for non-Observable sources #4177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
JakeWharton opened this issue Jul 8, 2016 · 16 comments
Closed

AsyncOnSubscribe offers no convenience for non-Observable sources #4177

JakeWharton opened this issue Jul 8, 2016 · 16 comments
Labels

Comments

@JakeWharton
Copy link
Contributor

AsyncOnSubscribe currently assumes I can trivially supply it with an Observable that emits up to n items. If you're abstracting over a callback based API there isn't an easy way to hook this in.

The current best way is to use a Subject, but this seems like a very heavyweight tool when the Observable is already going to be routed to a UnicastSubject in the AsyncOnSubscribe internals.

Are we meant to be creating backpressure-ignorant Observables using create(OnSubscribe) inside of the AsyncOnSubscribe callback?

Is there a way that we can provide a Subscriber<T> in the action in another overload to simplify the case where we want to abstract over something that's not already modeled as an Observable<T>?

@akarnokd
Copy link
Member

akarnokd commented Jul 8, 2016

You can wrap a callback-based API with create + onBackpressureX:

Observable<T> obs = Observable.create(s -> {
    Callback<T> cb = new Callback<T>() {
        @Override
        public void onSuccess(T data) {
            s.onNext(data);
        }
        @Override
        public void onFailure(Exception ex) {
             s.onError(ex);
        }
    };

    AutoCloseable close = api.method(cb);

    s.add(Subscribers.create(() -> {
        try {
            close.close();
        } catch (Exception ex) {
            RxJavaHooks.onError(ex);
        }
    });
}).onBackpressureDrop();

@stealthcode
Copy link

I don't argue your point as AsyncOnSubscribe is not intended to offer any convenience for non-observable sources. That is why the API expects you to emit an Observable<T>. It expects that you are able to adapt whatever potentially asynchronous data source into an observable (as there are many different ways to do so safely, such as the SyncOnSubscribe).

Are we meant to be creating backpressure-ignorant Observables using create(OnSubscribe) inside of the AsyncOnSubscribe callback?

Just a general question here, why would you use create? In theory using Observable.create should work fine so long as you don't change threads within any single observable to fulfill a single request amount and you don't exceed the requested amount for that observable.

However AsyncOnSubscribe is supposed to be used when you need multiple sources of data to be consumed by the same subscriber in a way that preserves ordering and you need minimal buffering. The request amounts should be used to make a single efficient RPC query for exactly the requested amount and emit it on a single source observable (i.e. a SyncOnSubscribe that reads off the network buffer or a parser). So for each requested amount you would emit an observable that emits exactly the right amount of data. In its usage I would recommend that each observable is synchronous internally (i.e. does not change threads for emitting values).

This feature might be used for pagination over data from a Reactive Streams enabled RPC such as Reactive Sockets or APIs that expose a ListenableFuture (using RxJava-Guava project to adapt it to an Observable).

@akarnokd has struggled to see value here. I don't think that I will have the time to continue arguing for this feature.

@JakeWharton
Copy link
Contributor Author

@stealthcode Fair enough. The convenience of request handling lured me towards it!

So at this point I know how to write a valid OnSubscribe. It's taken lots of missteps, mistakes, and pain. Currently RxJava offers conveniences for abstracting over synchronous non-Rx APIs in defer+just/from and fromCallable. What I'm looking for is what to recommend people use for abstracting over asynchronous non-Rx APIs. It seems like RxJava doesn't have anything here.

You can wrap a callback-based API with create + onBackpressureX

This won't handle things like request(0) and potentially loses user data.

For an example, let's say I have this API:

interface Prompter {
  void prompt(String message, Callback cb);

  interface Callback {
    void ok(String response);
    void cancel();
  }
}

How would you implement a method with the following signature

Observable<String> prompt(String message);

such that subscribing and request() > 0 calls prompt and a callback to ok() forwards the String and cancel() sends a CancellationException?

This seems ridiculously complicated to do and is why so many turn to Observable.create right away. They don't handle the request(0) case which can lead to MBPEs. They're certainly not handling backpressure or unsubscription (although in this case that doesn't matter, but in others it might!).

I really like fromCallable and fromAction for newcomers but it feels like we lack something to handle this case.

@akarnokd
Copy link
Member

akarnokd commented Jul 8, 2016

Sounds like you need what we'd done in Reactor Core: FluxCreate and its use (plus this and this).

@JakeWharton
Copy link
Contributor Author

Yes. That looks awesome!

Is that something we could adapt for RxJava 1's types? Or at worst for 2?

On Fri, Jul 8, 2016 at 11:49 AM David Karnok [email protected]
wrote:

Sounds like you need what we'd done in Reactor Core: FluxCreate
https://github.com/reactor/reactor-core/blob/master/src/main/java/reactor/core/publisher/FluxCreate.java
and its use
https://github.com/reactor/reactor-core/blob/master/src/main/java/reactor/core/publisher/Flux.java#L431
(plus this
https://github.com/reactor/reactor-core/blob/master/src/main/java/reactor/core/publisher/FluxEmitter.java
and this
https://github.com/reactor/reactor-core/blob/master/src/main/java/reactor/core/subscriber/SignalEmitter.java).


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#4177 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAEEEaYL0aQKxJWRj7RXkKiXzAwr2xJUks5qTnGUgaJpZM4JHsd9
.

@akarnokd
Copy link
Member

akarnokd commented Jul 8, 2016

I can port it to RxJava 1.x. In the meantime, could you review #4178?

@JakeWharton
Copy link
Contributor Author

Fair trade!

@stealthcode
Copy link

It seems like RxJava doesn't have anything here.

@JakeWharton have you considered using the AsyncOnSubscribe and for every request you onNext an observable built using the SyncOnSubscribe? This should respect back-pressure and have minimal buffering. What am I missing?

@JakeWharton
Copy link
Contributor Author

Can you show an example? I tried, but failed...

final class Prompter {
  interface Callback {
    void ok(String message);
    void cancel();
  }

  void prompt(String message, Callback cb) {
    new Thread(() -> {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ignored) {
      }
      if (new Random().nextBoolean()) {
        cb.ok(message + " => Hello!");
      } else {
        cb.cancel();
      }
    }).start();
  }
}
public final class Test {
  private final Prompter prompter;

  Test(Prompter prompter) {
    this.prompter = prompter;
  }

  Observable<String> prompt(String message) {
    return Observable.create(AsyncOnSubscribe.createStateless(
        new Action2<Long, Observer<Observable<? extends String>>>() {
          private final AtomicBoolean run = new AtomicBoolean();

          @Override public void call(Long requested,
              Observer<Observable<? extends String>> observableObserver) {
            if (!run.compareAndSet(false, true)) return;
            observableObserver.onNext(Observable.create(
                SyncOnSubscribe.createStateless(new Action1<Observer<? super String>>() {
                  @Override public void call(Observer<? super String> observer) {
                    prompter.prompt(message, new Prompter.Callback() {
                      @Override public void ok(String message) {
                        observer.onNext(message);
                        observer.onCompleted();
                      }

                      @Override public void cancel() {
                        observer.onError(new CancellationException());
                      }
                    });
                  }
                })));
            observableObserver.onCompleted();
          }
        }));
  }

  public static void main(String... args) {
    Test test = new Test(new Prompter());
    test.prompt("Hello?") //
        .toBlocking() //
        .subscribe(System.out::println);
  }
}

Note: I'm using Java 6 syntax for the actual wrapping code because not everyone can use Java 8 and I think it's important to emphasize the ease/simplicity of wrapping.

@akarnokd
Copy link
Member

akarnokd commented Jul 8, 2016

See #4179

AsyncOnSubscribe is a source responding to requests and can't deal with push sources in the generator method but only pull-like sources (like pull an element from a queue and create an Observable for it).

@stealthcode
Copy link

stealthcode commented Jul 8, 2016

This isn't exactly the example that I wanted to show but here is an example that uses RxJava-Guava for integrating a ListenableFuture based api.

public static void main(String[] args) {
    Observable<ByteBuffer> bytes = Observable.create(AsyncOnSubscribe.createStateful(
            () -> 0l,
            (waterMark, requestAmount, observableObserver) ->  {
                long start = requestAmount;
                long end = waterMark + requestAmount;
                ListenableFutureTask<ByteBuffer> future = ListenableFutureTask.create(() -> getData(start, end));
                Observable<ByteBuffer> requestProducer = ListenableFutureObservable.from(future, Schedulers.computation());
                observableObserver.onNext(requestProducer);
                return end;
            }));
}

private static ByteBuffer getData(long start, long end) {
    // this is where you would produce the data (here we just allocate an arbitrary bytebuffer)
    return ByteBuffer.allocate((int)(end - start));
}

SyncOnSubscribe makes it easy to write a parser/chunker so that you could get a ByteBuffer asynchronously (from the future.get()) and chunk it into UTF-8 characters for example before onNexting to the inner observable. Sorry for not fully writing this out as a legitimate example but this is something that @abersnaze was considering implementing in RxJava-Strings.

@akarnokd
Copy link
Member

Yes. That looks awesome!

Closing via #4179.

@stealthcode
Copy link

@JakeWharton I'm interested in your feedback on the above approach.

@JakeWharton
Copy link
Contributor Author

That doesn't appear to be truly asynchronous, but instead just blocks on a different thread (computation in this case) waiting for a notification.

@stealthcode
Copy link

Where does it block? The example I gave uses a ListenableFuture which will execute the work on the computation scheduler when the future is notified that a value has been set.

@JakeWharton
Copy link
Contributor Author

I assumed getData was a synchronous call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants