Skip to content

Support raw/non-protobuf gRPC. #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 66 additions & 26 deletions proxy_wasm_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class GrpcStreamHandlerBase {

// NB: with end_of_stream == true, callbacks can still occur: reset() to
// prevent further callbacks.
void send(StringView message, bool end_of_stream);
WasmResult send(StringView message, bool end_of_stream);
void close(); // NB: callbacks can still occur: reset() to prevent further
// callbacks.
void reset();
Expand All @@ -251,13 +251,14 @@ class GrpcStreamHandler : public GrpcStreamHandlerBase {
GrpcStreamHandler() : GrpcStreamHandlerBase() {}
virtual ~GrpcStreamHandler() {}

void send(const Request &message, bool end_of_stream) {
WasmResult send(const Request &message, bool end_of_stream) {
std::string output;
if (!message.SerializeToString(&output)) {
return;
return WasmResult::SerializationFailure;
}
GrpcStreamHandlerBase::send(output, end_of_stream);
local_close_ = local_close_ || end_of_stream;
return WasmResult::Ok;
}

virtual void onReceive(size_t body_size) = 0;
Expand Down Expand Up @@ -335,14 +336,11 @@ class RootContext : public ContextBase {
uint32_t timeout_milliseconds, HttpCallCallback callback);
// NB: the message is the response if status == OK and an error message
// otherwise. Returns false on setup error.
#ifdef PROXY_WASM_PROTOBUF
WasmResult grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
const HeaderStringPairs &initial_metadata, StringView request,
uint32_t timeout_milliseconds, GrpcSimpleCallCallback callback);
WasmResult grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
const HeaderStringPairs &initial_metadata, StringView request,
uint32_t timeout_milliseconds,
std::function<void(size_t body_size)> success_callback,
std::function<void(GrpcStatus status)> failure_callback) {
Expand All @@ -356,12 +354,48 @@ class RootContext : public ContextBase {
return grpcSimpleCall(service, service_name, method_name, initial_metadata, request,
timeout_milliseconds, callback);
}
WasmResult grpcCallHandler(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata, StringView request,
uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler);
#ifdef PROXY_WASM_PROTOBUF
WasmResult grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds, GrpcSimpleCallCallback callback) {
std::string serialized_request;
if (!request.SerializeToString(&serialized_request)) {
return WasmResult::SerializationFailure;
}
return grpcSimpleCall(service, service_name, method_name, initial_metadata, serialized_request,
timeout_milliseconds, callback);
}
WasmResult grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
std::function<void(size_t body_size)> success_callback,
std::function<void(GrpcStatus status)> failure_callback) {
std::string serialized_request;
if (!request.SerializeToString(&serialized_request)) {
return WasmResult::SerializationFailure;
}
return grpcSimpleCall(service, service_name, method_name, initial_metadata, serialized_request,
timeout_milliseconds, success_callback, failure_callback);
}
// Returns false on setup error.
WasmResult grpcCallHandler(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler);
std::unique_ptr<GrpcCallHandlerBase> handler) {
std::string serialized_request;
if (!request.SerializeToString(&serialized_request)) {
return WasmResult::SerializationFailure;
}
return grpcCallHandler(service, service_name, method_name, initial_metadata, serialized_request,
timeout_milliseconds, std::move(handler));
}
#endif
// Returns false on setup error.
WasmResult grpcStreamHandler(StringView service, StringView service_name, StringView method_name,
Expand Down Expand Up @@ -1185,20 +1219,28 @@ inline Histogram<Tags...> *Histogram<Tags...>::New(StringView name,
std::vector<MetricTag>({toMetricTag(descriptors)...}));
}

#ifdef PROXY_WASM_PROTOBUF
inline WasmResult grpcCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
const HeaderStringPairs &initial_metadata, StringView request,
uint32_t timeout_milliseconds, uint32_t *token_ptr) {
void *metadata_ptr = nullptr;
size_t metadata_size = 0;
MakeHeaderStringPairsBuffer(initial_metadata, &metadata_ptr, &metadata_size);
std::string serialized_request;
request.SerializeToString(&serialized_request);
return proxy_grpc_call(service.data(), service.size(), service_name.data(), service_name.size(),
method_name.data(), method_name.size(), metadata_ptr, metadata_size,
serialized_request.data(), serialized_request.size(), timeout_milliseconds,
token_ptr);
request.data(), request.size(), timeout_milliseconds, token_ptr);
}

#ifdef PROXY_WASM_PROTOBUF
inline WasmResult grpcCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds, uint32_t *token_ptr) {
std::string serialized_request;
if (!request.SerializeToString(&serialized_request)) {
return WasmResult::SerializationFailure;
}
return grpcCall(service, service_name, method_name, initial_metadata, serialized_request,
timeout_milliseconds, token_ptr);
}
#endif

Expand Down Expand Up @@ -1242,12 +1284,10 @@ inline void RootContext::onHttpCallResponse(uint32_t token, uint32_t headers, si
}
}

#ifdef PROXY_WASM_PROTOBUF
inline WasmResult RootContext::grpcSimpleCall(StringView service, StringView service_name,
StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
StringView request, uint32_t timeout_milliseconds,
Context::GrpcSimpleCallCallback callback) {
uint32_t token = 0;
WasmResult result = grpcCall(service, service_name, method_name, initial_metadata, request,
Expand All @@ -1257,7 +1297,6 @@ inline WasmResult RootContext::grpcSimpleCall(StringView service, StringView ser
}
return result;
}
#endif

inline void GrpcCallHandlerBase::cancel() {
grpcCancel(token_);
Expand All @@ -1278,15 +1317,19 @@ inline void GrpcStreamHandlerBase::close() {
// NB: else callbacks can still occur: reset() to prevent further callbacks.
}

inline void GrpcStreamHandlerBase::send(StringView message, bool end_of_stream) {
grpcSend(token_, message, end_of_stream);
inline WasmResult GrpcStreamHandlerBase::send(StringView message, bool end_of_stream) {
WasmResult r = grpcSend(token_, message, end_of_stream);
if (r != WasmResult::Ok) {
return r;
}
if (end_of_stream) {
// NB: callbacks can still occur: reset() to prevent further callbacks.
local_close_ = local_close_ || end_of_stream;
if (local_close_ && remote_close_) {
context_->grpc_streams_.erase(token_);
}
}
return WasmResult::Ok;
}

inline void RootContext::onGrpcReceiveInitialMetadata(uint32_t token, uint32_t headers) {
Expand Down Expand Up @@ -1377,12 +1420,10 @@ inline void RootContext::onGrpcClose(uint32_t token, GrpcStatus status) {
}
}

#ifdef PROXY_WASM_PROTOBUF
inline WasmResult RootContext::grpcCallHandler(StringView service, StringView service_name,
StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
StringView request, uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler) {
uint32_t token = 0;
auto result = grpcCall(service, service_name, method_name, initial_metadata, request,
Expand All @@ -1394,7 +1435,6 @@ inline WasmResult RootContext::grpcCallHandler(StringView service, StringView se
}
return result;
}
#endif

inline WasmResult RootContext::grpcStreamHandler(StringView service, StringView service_name,
StringView method_name,
Expand Down