Skip to content

Commit 3b8c06a

Browse files
committed
feat: initial support for HTTP services
1 parent af786b6 commit 3b8c06a

File tree

8 files changed

+198
-4
lines changed

8 files changed

+198
-4
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
async-trait = "0.1.83"
8+
axum = "0.7.7"
89
env-settings = "0.1.11"
910
env-settings-derive = "0.1.11"
1011
env-settings-utils = "0.1.11"

examples/services/http/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[package]
2+
name = "http"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
axum = "0.7.7"
8+
mikros = { path = "../../../" }
9+
tokio = { version = "1.41.1", features = ["full"] }
10+
tonic = "0.12.3"

examples/services/http/service.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
name = "http-example"
2+
version = "v0.1.0"
3+
language = "rust"
4+
product = "examples"
5+
types = ["http"]

examples/services/http/src/main.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use std::sync::Arc;
2+
3+
use axum::extract::State;
4+
use axum::routing::get;
5+
use mikros::service::builder::ServiceBuilder;
6+
use mikros::service::context::Context;
7+
8+
#[derive(Clone)]
9+
pub struct Service {
10+
}
11+
12+
impl Service {
13+
pub fn new() -> Self {
14+
Self {}
15+
}
16+
}
17+
18+
// Handler method for the first endpoint
19+
//async fn handler_one(ctx: Option<State<Arc<Context>>>) -> String {
20+
async fn handler_one() -> String {
21+
println!("Handler One");
22+
format!("Handler One")
23+
}
24+
25+
// Handler method for the second endpoint
26+
async fn handler_two(State(ctx): State<Arc<Context>>) -> String {
27+
println!("Handler Two");
28+
format!("Handler Two")
29+
}
30+
31+
#[tokio::main]
32+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
33+
let x = axum::Router::new()
34+
.route("/one", get(handler_one));
35+
// .route("/two", get(handler_two));
36+
37+
let service = Service::new();
38+
let svc = ServiceBuilder::default()
39+
.http(x)
40+
.build();
41+
42+
match svc {
43+
Ok(mut svc) => {
44+
if let Err(e) = svc.start().await {
45+
println!("application error: {}", e);
46+
}
47+
},
48+
Err(e) => panic!("{}", e.to_string())
49+
}
50+
51+
Ok(())
52+
}

src/definition/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl Definitions {
121121
}
122122

123123
pub(crate) fn get_service_type(&self, kind: ServiceKind) -> merrors::Result<&service::Service> {
124-
match self.types.iter().find(|t| t.0 == definition::ServiceKind::Grpc) {
124+
match self.types.iter().find(|t| t.0 == kind) {
125125
Some(t) => Ok(t),
126126
None => Err(merrors::Error::NotFound(format!("could not find service kind '{}'", kind)))
127127
}

src/service/builder.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22
use std::sync::{Arc};
33
use std::convert::Infallible;
44

5+
use axum::Router;
56
use futures::lock::Mutex;
67
use http::{request::Request, response::Response};
78
use tonic::body::BoxBody;
@@ -11,6 +12,7 @@ use crate::{definition, errors as merrors, plugin};
1112
use crate::service::native::{NativeService, Native};
1213
use crate::service::script::{ScriptService, Script};
1314
use crate::service::grpc::Grpc;
15+
use crate::service::http::Http;
1416
use crate::service::lifecycle::Lifecycle;
1517
use crate::service::Service;
1618

@@ -43,7 +45,7 @@ impl ServiceBuilder {
4345
self
4446
}
4547

46-
/// Initializes the grpc service type with the required structure
48+
/// Initializes the gRPC service type with the required structure
4749
/// implementing its API.
4850
pub fn grpc<S>(mut self, server: S) -> Self
4951
where
@@ -59,7 +61,7 @@ impl ServiceBuilder {
5961
self
6062
}
6163

62-
/// Initializes the grpc service type with the required structure
64+
/// Initializes the gRPC service type with the required structure
6365
/// implementing its API and another with implementing the Lifecycle
6466
/// API.
6567
pub fn grpc_with_lifecycle<S, B: Lifecycle + 'static>(mut self, server: S, lifecycle: Arc<Mutex<B>>) -> Self
@@ -76,6 +78,21 @@ impl ServiceBuilder {
7678
self
7779
}
7880

81+
/// Initializes the HTTP service type with the required structure
82+
/// implementing the service endpoint handlers.
83+
pub fn http(mut self, router: Router) -> Self {
84+
self.servers.insert(definition::ServiceKind::Http.to_string(), Box::new(Http::new(router)));
85+
self
86+
}
87+
88+
/// Initializes the HTTP service type with the required structure
89+
/// implementing the service endpoint handlers and another with
90+
/// implementing the Lifecycle API.
91+
pub fn http_with_lifecycle<B: Lifecycle + 'static>(mut self, router: Router, lifecycle: Arc<Mutex<B>>) -> Self {
92+
self.servers.insert(definition::ServiceKind::Http.to_string(), Box::new(Http::new_with_lifecycle(router, lifecycle)));
93+
self
94+
}
95+
7996
/// Adds external features into the current service environment.
8097
pub fn with_features(mut self, features: Vec<Box<dyn plugin::feature::Feature>>) -> Self {
8198
self.features.extend(features);

src/service/http/mod.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
4+
use axum::Router;
5+
use futures::lock::Mutex;
6+
use logger::fields::FieldValue;
7+
use tokio::net::TcpListener;
8+
use tokio::sync::watch::Receiver;
9+
10+
use crate::{definition, env, errors as merrors, plugin};
11+
use crate::service::context::Context;
12+
use crate::service::lifecycle::Lifecycle;
13+
14+
#[derive(Clone)]
15+
pub(crate) struct Http {
16+
port: i32,
17+
router: Router,
18+
lifecycle: Option<Box<Arc<Mutex<dyn Lifecycle>>>>,
19+
}
20+
21+
impl Http {
22+
pub fn new(router: Router) -> Self {
23+
Self {
24+
port: 0,
25+
router,
26+
lifecycle: None,
27+
}
28+
}
29+
30+
pub fn new_with_lifecycle<B: Lifecycle + 'static>(router: Router, lifecycle: Arc<Mutex<B>>) -> Self {
31+
Self {
32+
port: 0,
33+
router,
34+
lifecycle: Some(Box::new(lifecycle)),
35+
}
36+
}
37+
}
38+
39+
#[async_trait::async_trait]
40+
impl Lifecycle for Http {
41+
async fn on_start(&mut self) -> merrors::Result<()> {
42+
if let Some(lifecycle) = &self.lifecycle {
43+
return lifecycle.lock().await.on_start().await
44+
}
45+
46+
Ok(())
47+
}
48+
49+
async fn on_finish(&self) -> merrors::Result<()> {
50+
if let Some(lifecycle) = &self.lifecycle {
51+
return lifecycle.lock().await.on_finish().await
52+
}
53+
54+
Ok(())
55+
}
56+
}
57+
58+
#[async_trait::async_trait]
59+
impl plugin::service::Service for Http {
60+
fn kind(&self) -> definition::ServiceKind {
61+
definition::ServiceKind::Http
62+
}
63+
64+
fn initialize(&mut self, envs: Arc<env::Env>, definitions: Arc<definition::Definitions>) -> merrors::Result<()> {
65+
let service_type = definitions.get_service_type(definition::ServiceKind::Http)?;
66+
self.port = match service_type.1 {
67+
None => envs.http_port,
68+
Some(port) => port,
69+
};
70+
71+
Ok(())
72+
}
73+
74+
fn info(&self) -> HashMap<String, FieldValue> {
75+
logger::fields![
76+
"svc.port" => FieldValue::Number(self.port as i64),
77+
"svc.mode" => FieldValue::String(definition::ServiceKind::Http.to_string()),
78+
]
79+
}
80+
81+
async fn run(&self, ctx: &Context, shutdown_rx: Receiver<()>) -> merrors::Result<()> {
82+
let addr = format!("0.0.0.0:{}", self.port);
83+
let shutdown_signal = async move {
84+
let mut shutdown_rx = shutdown_rx.clone();
85+
86+
// Wait until the receiver sees the shutdown signal
87+
shutdown_rx.changed().await.ok();
88+
};
89+
90+
let shared_ctx = Arc::new(ctx.clone());
91+
let app = Router::new().with_state(shared_ctx).merge(self.router.clone());
92+
93+
match TcpListener::bind(addr).await {
94+
Ok(incoming) => {
95+
if let Err(e) = axum::serve(incoming, app).with_graceful_shutdown(shutdown_signal).await {
96+
return Err(merrors::Error::InternalServiceError(format!("could not initialize http server: {}", e)))
97+
}
98+
99+
Ok(())
100+
}
101+
Err(e) => Err(merrors::Error::InternalServiceError(format!("could not initialize http server: {}", e)))
102+
}
103+
}
104+
105+
async fn stop(&self, _ctx: &Context) {
106+
// noop
107+
}
108+
}

src/service/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
pub mod builder;
22
pub mod context;
33
pub mod grpc;
4+
pub mod http;
5+
pub mod lifecycle;
46
pub mod native;
57
pub mod script;
6-
pub mod lifecycle;
78

89
use std::collections::HashMap;
910
use std::sync::Arc;

0 commit comments

Comments
 (0)