Skip to content

Commit 12d62b0

Browse files
committed
feat: add grpc RPC context extractor
1 parent ba2dcf5 commit 12d62b0

File tree

6 files changed

+99
-7
lines changed

6 files changed

+99
-7
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ tokio = { version = "1.41.1", features = ["full"] }
1717
toml = "0.8.19"
1818
tower = "0.5.1"
1919
tower-service = "0.3.3"
20-
validator = { version = "0.19.0", features = ["derive"] }
20+
validator = { version = "0.19.0", features = ["derive"] }
21+
futures = "0.3.31"
22+
prost = "0.13.3"

examples/services/grpc/src/main.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ pub mod helloworld {
22
include!("generated/helloworld.rs"); // Include generated code
33
}
44

5-
use mikros::service::builder::ServiceBuilder;
5+
use mikros::features;
6+
use mikros::service::{builder::ServiceBuilder, context};
67
use tonic::{Request, Response, Status};
78

89
use helloworld::greeter_server::{Greeter, GreeterServer};
@@ -14,6 +15,13 @@ pub struct MyGreeter {}
1415
#[tonic::async_trait]
1516
impl Greeter for MyGreeter {
1617
async fn say_hello(&self, request: Request<HelloRequest>) -> Result<Response<HelloReply>, Status> {
18+
let ctx = context::from_request(&request);
19+
ctx.logger().info("say hello RPC called");
20+
21+
let _ = features::example::retrieve(&ctx, |api| {
22+
api.do_something();
23+
});
24+
1725
let reply = HelloReply {
1826
message: format!("Hello, {}!", request.into_inner().name),
1927
};
@@ -26,7 +34,7 @@ impl Greeter for MyGreeter {
2634
async fn main() -> Result<(), Box<dyn std::error::Error>> {
2735
let greeter = MyGreeter::default();
2836
let greeter_service = GreeterServer::new(greeter);
29-
37+
3038
let svc = ServiceBuilder::default()
3139
.grpc(greeter_service)
3240
.build();
@@ -39,6 +47,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3947
},
4048
Err(e) => panic!("{}", e.to_string())
4149
}
42-
50+
4351
Ok(())
4452
}

src/grpc/mod.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use std::sync::Arc;
2+
use std::task::{Context, Poll};
3+
4+
use tower::{Layer, Service};
5+
6+
use crate::service::context;
7+
8+
#[derive(Clone)]
9+
pub(crate) struct ContextExtractor {
10+
ctx: Arc<context::Context>,
11+
}
12+
13+
impl ContextExtractor {
14+
pub(crate) fn new(ctx: &context::Context) -> Self {
15+
ContextExtractor {
16+
ctx: Arc::new(ctx.clone()),
17+
}
18+
}
19+
}
20+
21+
impl<S> Layer<S> for ContextExtractor {
22+
type Service = ContextExtractorMiddleware<S>;
23+
24+
fn layer(&self, service: S) -> Self::Service {
25+
ContextExtractorMiddleware {
26+
inner: service,
27+
ctx: self.ctx.clone(),
28+
}
29+
}
30+
}
31+
32+
#[derive(Clone)]
33+
pub(crate) struct ContextExtractorMiddleware<S> {
34+
inner: S,
35+
ctx: Arc<context::Context>,
36+
}
37+
38+
impl<S, B> Service<http::Request<B>> for ContextExtractorMiddleware<S>
39+
where
40+
S: Service<http::Request<B>, Response = http::Response<B>>
41+
+ Clone
42+
+ Send
43+
+ 'static,
44+
S::Future: Send,
45+
B: Send + 'static,
46+
{
47+
type Response = S::Response;
48+
type Error = S::Error;
49+
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
50+
51+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52+
self.inner.poll_ready(cx)
53+
}
54+
55+
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
56+
let clone = self.inner.clone();
57+
let mut inner = std::mem::replace(&mut self.inner, clone);
58+
59+
req.extensions_mut().insert(self.ctx.clone());
60+
Box::pin(async move {
61+
let response = inner.call(req).await?;
62+
Ok(response)
63+
})
64+
}
65+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ pub mod plugin;
66
pub mod service;
77

88
mod args;
9+
mod grpc;

src/service/context.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,9 @@ impl Context {
4646
pub fn service_name(&self) -> String {
4747
self.definitions.name.clone()
4848
}
49-
}
49+
}
50+
51+
/// Retrieves the mikros Context from an RPC request argument.
52+
pub fn from_request<B: prost::Message>(request: &tonic::Request<B>) -> Arc<Context> {
53+
request.extensions().get::<Arc<Context>>().unwrap().clone()
54+
}

src/service/grpc/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::collections::HashMap;
22
use std::convert::Infallible;
33
use std::sync::Arc;
4+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
45

56
use http::{request::Request, response::Response};
67
use logger::fields::FieldValue;
@@ -12,6 +13,7 @@ use tonic::transport::Server;
1213
use crate::{definition, env, plugin};
1314
use crate::service::context::Context;
1415
use crate::errors as merrors;
16+
use crate::grpc;
1517

1618
#[derive(Clone)]
1719
pub(crate) struct Grpc<S> {
@@ -69,16 +71,25 @@ where
6971
]
7072
}
7173

72-
async fn run(&self, _ctx: &Context, shutdown_rx: watch::Receiver<()>) -> merrors::Result<()> {
73-
let addr = format!("127.0.0.1:{}", self.port).parse().unwrap();
74+
async fn run(&self, ctx: &Context, shutdown_rx: watch::Receiver<()>) -> merrors::Result<()> {
75+
let addr = SocketAddr::new(
76+
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
77+
self.port as u16,
78+
);
79+
7480
let shutdown_signal = async {
7581
let mut shutdown_rx = shutdown_rx.clone();
7682

7783
// Wait until the receiver sees the shutdown signal
7884
shutdown_rx.changed().await.ok();
7985
};
8086

87+
let layer = tower::ServiceBuilder::new()
88+
.layer(grpc::ContextExtractor::new(&ctx))
89+
.into_inner();
90+
8191
if let Err(e) = Server::builder()
92+
.layer(layer)
8293
.add_service(self.server.clone())
8394
.serve_with_shutdown(addr, shutdown_signal)
8495
.await

0 commit comments

Comments
 (0)