Skip to content

Reimplementation of the library #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refined version of eio client/server
  • Loading branch information
wokalski committed Jun 2, 2024
commit a74c99ff1fe5a3f33f541b80aac9d894012d4405
7 changes: 6 additions & 1 deletion examples/greeter-client-eio/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
(executable
(name greeter_client_eio)
(libraries grpc-client-eio ocaml-protoc-plugin eio_main greeter grpc-eio-net-client-h2))
(libraries
grpc-client-eio
ocaml-protoc-plugin
eio_main
greeter
grpc-eio-net-client-h2))
8 changes: 7 additions & 1 deletion examples/greeter-server-eio/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
(executable
(name greeter_server_eio)
(libraries eio grpc-server-eio ocaml-protoc-plugin eio_main greeter grpc-eio-net-server-h2))
(libraries
eio
grpc-server-eio
ocaml-protoc-plugin
eio_main
greeter
grpc-eio-net-server-h2))
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
nixfmt
camlPkgs.ocaml-lsp
camlPkgs.ocamlformat
camlPkgs.ocaml-protoc
];
};

Expand Down
38 changes: 21 additions & 17 deletions lib/async/client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,42 @@ module Rpc : sig
val bidirectional_streaming :
handler:(string Pipe.Writer.t -> string Pipe.Reader.t -> 'a Deferred.t) ->
'a handler
(** [bidirectional_streaming ~handler write read] sets up the sending and receiving
logic using [write] and [read], then calls [handler] with a writer pipe and
a reader pipe, for sending and receiving payloads to and from the server.
(** [bidirectional_streaming ~handler write read] sets up the sending and
receiving logic using [write] and [read], then calls [handler] with a
writer pipe and a reader pipe, for sending and receiving payloads to and
from the server.

The stream is closed when the deferred returned by the handler becomes determined. *)
The stream is closed when the deferred returned by the handler becomes
determined. *)

val client_streaming :
handler:(string Pipe.Writer.t -> string option Deferred.t -> 'a Deferred.t) ->
'a handler
(** [client_streaming ~handler write read] sets up the sending and receiving
logic using [write] and [read], then calls [handler] with a writer pipe to send
payloads to the server.
logic using [write] and [read], then calls [handler] with a writer pipe to
send payloads to the server.

The stream is closed when the deferred returned by the handler becomes determined. *)
The stream is closed when the deferred returned by the handler becomes
determined. *)

val server_streaming :
handler:(string Pipe.Reader.t -> 'a Deferred.t) ->
encoded_request:string ->
'a handler
(** [server_streaming ~handler encoded_request write read] sets up the sending and
receiving logic using [write] and [read], then sends [encoded_request] and calls
[handler] with a pipe of responses.
(** [server_streaming ~handler encoded_request write read] sets up the sending
and receiving logic using [write] and [read], then sends [encoded_request]
and calls [handler] with a pipe of responses.

The stream is closed when the deferred returned by the handler becomes determined. *)
The stream is closed when the deferred returned by the handler becomes
determined. *)

val unary :
handler:(string option -> 'a Deferred.t) ->
encoded_request:string ->
'a handler
(** [unary ~handler ~encoded_request] sends the encoded request to the server . When the
response is received, the handler is called with an option response. The response is
is None if the server sent an empty response. *)
(** [unary ~handler ~encoded_request] sends the encoded request to the server
. When the response is received, the handler is called with an option
response. The response is is None if the server sent an empty response. *)
end

type response_handler = H2.Client_connection.response_handler
Expand All @@ -60,6 +64,6 @@ val call :
?headers:H2.Headers.t ->
unit ->
('a * Grpc.Status.t, H2.Status.t) Core._result Deferred.t
(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given
by [service] and [rpc] using the [do_request] function. The [handler] is
called when this request is set up to send and receive data. *)
(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given by
[service] and [rpc] using the [do_request] function. The [handler] is called
when this request is set up to send and receive data. *)
30 changes: 20 additions & 10 deletions lib/async/server.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ module Rpc : sig

type client_streaming =
string Pipe.Reader.t -> (Grpc.Status.t * string option) Deferred.t
(** [client_streaming] is the type for an rpc where the client streams the requests and the server responds once. *)
(** [client_streaming] is the type for an rpc where the client streams the
requests and the server responds once. *)

type server_streaming =
string -> string Pipe.Writer.t -> Grpc.Status.t Deferred.t
(** [server_streaming] is the type for an rpc where the client sends one request and the server sends multiple responses. *)
(** [server_streaming] is the type for an rpc where the client sends one
request and the server sends multiple responses. *)

type bidirectional_streaming =
string Pipe.Reader.t -> string Pipe.Writer.t -> Grpc.Status.t Deferred.t
(** [bidirectional_streaming] is the type for an rpc where both the client and server can send multiple messages. *)
(** [bidirectional_streaming] is the type for an rpc where both the client and
server can send multiple messages. *)

type t =
| Unary of unary
Expand All @@ -25,29 +28,36 @@ module Rpc : sig
(** [t] represents the types of rpcs available in gRPC. *)

val unary : f:unary -> H2.Reqd.t -> unit Deferred.t
(** [unary ~f reqd] calls [f] with the request obtained from [reqd] and handles sending the response. *)
(** [unary ~f reqd] calls [f] with the request obtained from [reqd] and
handles sending the response. *)

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Deferred.t
(** [client_streaming ~f reqd] calls [f] with a stream to pull requests from and handles sending the response. *)
(** [client_streaming ~f reqd] calls [f] with a stream to pull requests from
and handles sending the response. *)

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Deferred.t
(** [server_streaming ~f reqd] calls [f] with the request optained from [reqd] and handles sending the responses pushed out. *)
(** [server_streaming ~f reqd] calls [f] with the request optained from [reqd]
and handles sending the responses pushed out. *)

val bidirectional_streaming :
f:bidirectional_streaming -> H2.Reqd.t -> unit Deferred.t
(** [bidirectional_streaming ~f reqd] calls [f] with a stream to pull requests from and andles sending the responses pushed out. *)
(** [bidirectional_streaming ~f reqd] calls [f] with a stream to pull requests
from and andles sending the responses pushed out. *)
end

module Service : sig
type t
(** [t] represents a gRPC service with potentially multiple rpcs and the information needed to route to them. *)
(** [t] represents a gRPC service with potentially multiple rpcs and the
information needed to route to them. *)

val v : unit -> t
(** [v ()] creates a new service *)

val add_rpc : name:string -> rpc:Rpc.t -> t -> t
(** [add_rpc ~name ~rpc t] adds [rpc] to [t] and ensures that [t] can route to it with [name]. *)
(** [add_rpc ~name ~rpc t] adds [rpc] to [t] and ensures that [t] can route to
it with [name]. *)

val handle_request : t -> H2.Reqd.t -> unit
(** [handle_request t reqd] handles routing [reqd] to the correct rpc if available in [t]. *)
(** [handle_request t reqd] handles routing [reqd] to the correct rpc if
available in [t]. *)
end
2 changes: 1 addition & 1 deletion lib/eio/client/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ let call (type headers net_response request response stream_error conn_error)
(match Eio.Promise.await recv_net with
| Error conn_error ->
Eio.Promise.resolve status_notify
(Grpc.Status.v ~error_message:"Connection error"
(Grpc.Status.make ~error_message:"Connection error"
Grpc.Status.Unknown);
Error conn_error
| Ok { response; next; trailers } ->
Expand Down
3 changes: 1 addition & 2 deletions lib/eio/client/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
(library
(name grpc_client_eio)
(public_name grpc-client-eio)
(libraries grpc grpc-core-eio eio grpc-client))

(libraries grpc eio grpc-client))
4 changes: 0 additions & 4 deletions lib/eio/core/dune

This file was deleted.

28 changes: 0 additions & 28 deletions lib/eio/core/seq.ml

This file was deleted.

12 changes: 0 additions & 12 deletions lib/eio/core/seq.mli

This file was deleted.

22 changes: 0 additions & 22 deletions lib/eio/core/stream.ml

This file was deleted.

Empty file removed lib/eio/core/stream.mli
Empty file.
3 changes: 3 additions & 0 deletions lib/eio/io-client-h2-ocaml-protoc/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(library
(name io_client_h2_ocaml_protoc)
(libraries pbrt pbrt_services grpc-client-eio h2 eio h2-eio))
Loading