Skip to content

Commit 64ac01a

Browse files
committed
feat: add lifecycle trait calls for services
1 parent 12d62b0 commit 64ac01a

File tree

12 files changed

+107
-32
lines changed

12 files changed

+107
-32
lines changed

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ async-trait = "0.1.83"
88
env-settings = "0.1.11"
99
env-settings-derive = "0.1.11"
1010
env-settings-utils = "0.1.11"
11-
logger = { git = "https://github.com/rsfreitas/logger" }
11+
futures = "0.3.31"
1212
http = "1.1.0"
13+
logger = { git = "https://github.com/rsfreitas/logger" }
14+
prost = "0.13.3"
1315
serde = "1.0.214"
1416
serde_derive = "1.0.214"
15-
tonic = { version = "0.12.3", features = ["transport"]}
1617
tokio = { version = "1.41.1", features = ["full"] }
1718
toml = "0.8.19"
19+
tonic = { version = "0.12.3", features = ["transport"]}
1820
tower = "0.5.1"
1921
tower-service = "0.3.3"
20-
validator = { version = "0.19.0", features = ["derive"] }
21-
futures = "0.3.31"
22-
prost = "0.13.3"
22+
validator = { version = "0.19.0", features = ["derive"] }

examples/services/native/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7+
async-trait = "0.1.83"
78
mikros = { path = "../../../" }
89
tokio = { version = "1.41.1", features = ["full"] }

examples/services/native/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ mod service;
22

33
extern crate mikros;
44

5-
use std::sync::{Arc, Mutex};
5+
use std::sync::Arc;
66

7+
use mikros::FutureMutex;
78
use mikros::service::builder::{ServiceBuilder};
89
use service::Service as AppService;
910

1011
#[tokio::main]
1112
async fn main() {
12-
let s = Arc::new(Mutex::new(AppService::new()));
13+
let s = Arc::new(FutureMutex::new(AppService::new()));
1314
let svc = ServiceBuilder::default()
1415
.script(s.clone())
1516
.native(s.clone())

examples/services/native/src/service.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,19 @@ impl Service {
1111
}
1212
}
1313

14+
#[async_trait::async_trait]
15+
impl mikros::service::lifecycle::Lifecycle for Service {
16+
async fn on_start(&mut self) -> merrors::Result<()> {
17+
println!("lifecycle on_start");
18+
Ok(())
19+
}
20+
21+
async fn on_finish(&self) -> merrors::Result<()> {
22+
println!("lifecycle on_finish");
23+
Ok(())
24+
}
25+
}
26+
1427
impl mikros::service::native::NativeService for Service {
1528
fn start(&self, ctx: &Context) -> merrors::Result<()> {
1629
ctx.logger().info("Start native service");

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ pub mod service;
77

88
mod args;
99
mod grpc;
10+
11+
// Forward some declarations for services
12+
pub use futures::lock::Mutex as FutureMutex;

src/plugin/service.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use tokio::sync::watch;
55

66
use crate::{definition, env, errors as merrors};
77
use crate::service::context::Context;
8+
use crate::service::lifecycle::Lifecycle;
89

910
#[async_trait::async_trait]
10-
pub trait Service: Send + ServiceClone {
11+
pub trait Service: ServiceClone + Lifecycle {
1112
fn kind(&self) -> definition::ServiceKind;
1213
fn initialize(&mut self, envs: Arc<env::Env>, definitions: Arc<definition::Definitions>) -> merrors::Result<()>;
1314
fn info(&self) -> HashMap<String, logger::fields::FieldValue>;

src/service/builder.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::collections::HashMap;
2-
use std::sync::{Arc, Mutex};
2+
use std::sync::{Arc};
33
use std::convert::Infallible;
44

5+
use futures::lock::Mutex;
56
use http::{request::Request, response::Response};
67
use tonic::body::BoxBody;
78
use tonic::server::NamedService;
@@ -10,6 +11,7 @@ use crate::{definition, errors as merrors, plugin};
1011
use crate::service::native::{NativeService, Native};
1112
use crate::service::script::{ScriptService, Script};
1213
use crate::service::grpc::Grpc;
14+
use crate::service::lifecycle::Lifecycle;
1315
use crate::service::Service;
1416

1517
pub struct ServiceBuilder {
@@ -42,8 +44,9 @@ impl ServiceBuilder {
4244
pub fn grpc<S>(mut self, svc: S) -> Self
4345
where
4446
S: tonic::codegen::Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
45-
+ Clone
4647
+ NamedService
48+
+ Lifecycle
49+
+ Clone
4750
+ Send
4851
+ Sync
4952
+ 'static,

src/service/grpc/mod.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::{definition, env, plugin};
1414
use crate::service::context::Context;
1515
use crate::errors as merrors;
1616
use crate::grpc;
17+
use crate::service::lifecycle::Lifecycle;
1718

1819
#[derive(Clone)]
1920
pub(crate) struct Grpc<S> {
@@ -25,6 +26,7 @@ impl<S> Grpc<S>
2526
where
2627
S: tonic::codegen::Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
2728
+ NamedService
29+
+ Lifecycle
2830
+ Clone
2931
+ Send
3032
+ Sync
@@ -39,11 +41,33 @@ where
3941
}
4042
}
4143

44+
#[async_trait::async_trait]
45+
impl<S> Lifecycle for Grpc<S>
46+
where
47+
S: tonic::codegen::Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
48+
+ NamedService
49+
+ Lifecycle
50+
+ Clone
51+
+ Send
52+
+ Sync
53+
+ 'static,
54+
S::Future: 'static + Send,
55+
{
56+
async fn on_start(&mut self) -> merrors::Result<()> {
57+
self.server.on_start().await
58+
}
59+
60+
async fn on_finish(&self) -> merrors::Result<()> {
61+
self.server.on_finish().await
62+
}
63+
}
64+
4265
#[async_trait::async_trait]
4366
impl<S> plugin::service::Service for Grpc<S>
4467
where
4568
S: tonic::codegen::Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
4669
+ NamedService
70+
+ Lifecycle
4771
+ Clone
4872
+ Send
4973
+ Sync
@@ -85,7 +109,7 @@ where
85109
};
86110

87111
let layer = tower::ServiceBuilder::new()
88-
.layer(grpc::ContextExtractor::new(&ctx))
112+
.layer(grpc::ContextExtractor::new(ctx))
89113
.into_inner();
90114

91115
if let Err(e) = Server::builder()
@@ -94,7 +118,7 @@ where
94118
.serve_with_shutdown(addr, shutdown_signal)
95119
.await
96120
{
97-
return Err(merrors::Error::InternalServiceError(format!("could not initialize grpc server: {}", e.to_string())))
121+
return Err(merrors::Error::InternalServiceError(format!("could not initialize grpc server: {}", e)))
98122
}
99123

100124
Ok(())

src/service/lifecycle.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
use crate::errors as merrors;
22

33
#[async_trait::async_trait]
4-
pub trait Lifecycle {
5-
async fn on_start(&self) -> merrors::Result<()> {
6-
println!("default on_start");
4+
pub trait Lifecycle: Send + Sync {
5+
async fn on_start(&mut self) -> merrors::Result<()> {
76
Ok(())
87
}
98

109
async fn on_finish(&self) -> merrors::Result<()> {
11-
println!("default on_finish");
1210
Ok(())
1311
}
1412
}

src/service/mod.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ pub mod context;
33
pub mod grpc;
44
pub mod native;
55
pub mod script;
6-
7-
mod lifecycle;
6+
pub mod lifecycle;
87

98
use std::collections::HashMap;
109
use std::sync::Arc;
@@ -81,7 +80,7 @@ impl Service {
8180
self.logger.info("service starting");
8281
self.validate_definitions()?;
8382
self.start_features()?;
84-
self.initialize_service_internals()?;
83+
self.initialize_service_internals().await?;
8584
self.print_service_resources();
8685
self.run().await
8786
}
@@ -97,7 +96,7 @@ impl Service {
9796
}
9897

9998
for t in &self.definitions.types {
100-
if self.servers.get(&t.0.to_string()).is_none() {
99+
if !self.servers.contains_key(&t.0.to_string()) {
101100
return Err(merrors::Error::ServiceKindUninitialized(t.0.clone()))
102101
}
103102
}
@@ -110,7 +109,7 @@ impl Service {
110109
Ok(())
111110
}
112111

113-
fn initialize_service_internals(&mut self) -> merrors::Result<()> {
112+
async fn initialize_service_internals(&mut self) -> merrors::Result<()> {
114113
let definitions = self.definitions.clone();
115114
let envs = self.envs.clone();
116115

@@ -119,8 +118,12 @@ impl Service {
119118
svc.initialize(envs.clone(), definitions.clone())?
120119
}
121120

122-
// couple clients
123-
// call lifecycle on_start
121+
// TODO couple clients
122+
123+
for s in &definitions.types {
124+
let svc = self.get_server(&s.0)?;
125+
svc.on_start().await?;
126+
}
124127

125128
Ok(())
126129
}
@@ -200,6 +203,7 @@ impl Service {
200203
for s in &definitions.types {
201204
let svc = self.get_server(&s.0)?;
202205
svc.stop(&context).await;
206+
svc.on_finish().await?;
203207
}
204208

205209
// Then stops our task runner

src/service/native/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::collections::HashMap;
2-
use std::sync::{Arc, Mutex};
2+
use std::sync::{Arc};
33

4+
use futures::lock::Mutex;
45
use logger::fields::FieldValue;
56
use tokio::sync::watch;
67

78
use crate::{definition, env, errors as merrors, plugin};
89
use crate::service::context::Context;
10+
use crate::service::lifecycle::Lifecycle;
911

1012
#[async_trait::async_trait]
11-
pub trait NativeService: Send + NativeServiceClone {
13+
pub trait NativeService: NativeServiceClone + Lifecycle + Send + Sync {
1214
/// This is the place where the service/application must be initialized. It
1315
/// should do the required initialization, put any job to execute in background
1416
/// and leave. It shouldn't block.
@@ -52,6 +54,17 @@ impl Native {
5254
}
5355
}
5456

57+
#[async_trait::async_trait]
58+
impl Lifecycle for Native {
59+
async fn on_start(&mut self) -> merrors::Result<()> {
60+
self.svc.lock().await.on_start().await
61+
}
62+
63+
async fn on_finish(&self) -> merrors::Result<()> {
64+
self.svc.lock().await.on_finish().await
65+
}
66+
}
67+
5568
#[async_trait::async_trait]
5669
impl plugin::service::Service for Native {
5770
fn kind(&self) -> definition::ServiceKind {
@@ -69,10 +82,10 @@ impl plugin::service::Service for Native {
6982
}
7083

7184
async fn run(&self, ctx: &Context, _: watch::Receiver<()>) -> merrors::Result<()> {
72-
self.svc.lock().unwrap().start(ctx)
85+
self.svc.lock().await.start(ctx)
7386
}
7487

7588
async fn stop(&self, ctx: &Context) {
76-
self.svc.lock().unwrap().stop(ctx)
89+
self.svc.lock().await.stop(ctx)
7790
}
7891
}

src/service/script/mod.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use std::collections::HashMap;
2-
use std::sync::{Arc, Mutex};
2+
use std::sync::{Arc};
33

4+
use futures::lock::Mutex;
45
use logger::fields::FieldValue;
56
use tokio::sync::watch;
67

78
use crate::{definition, env, errors as merrors, plugin};
89
use crate::service::context::Context;
10+
use crate::service::lifecycle::Lifecycle;
911

10-
pub trait ScriptService: Send + ScriptServiceClone {
12+
#[async_trait::async_trait]
13+
pub trait ScriptService: ScriptServiceClone + Lifecycle + Send {
1114
fn run(&self, ctx: &Context) -> merrors::Result<()>;
1215
fn cleanup(&self, ctx: &Context);
1316
}
@@ -44,6 +47,17 @@ impl Script {
4447
}
4548
}
4649

50+
#[async_trait::async_trait]
51+
impl Lifecycle for Script {
52+
async fn on_start(&mut self) -> merrors::Result<()> {
53+
self.svc.lock().await.on_start().await
54+
}
55+
56+
async fn on_finish(&self) -> merrors::Result<()> {
57+
self.svc.lock().await.on_finish().await
58+
}
59+
}
60+
4761
#[async_trait::async_trait]
4862
impl plugin::service::Service for Script {
4963
fn kind(&self) -> definition::ServiceKind {
@@ -61,10 +75,10 @@ impl plugin::service::Service for Script {
6175
}
6276

6377
async fn run(&self, ctx: &Context, _: watch::Receiver<()>) -> merrors::Result<()> {
64-
self.svc.lock().unwrap().run(ctx)
78+
self.svc.lock().await.run(ctx)
6579
}
6680

6781
async fn stop(&self, ctx: &Context) {
68-
self.svc.lock().unwrap().cleanup(ctx)
82+
self.svc.lock().await.cleanup(ctx)
6983
}
70-
}
84+
}

0 commit comments

Comments
 (0)