Skip to content

Commit f88aa66

Browse files
committed
refactor: systemd timeout handling and change events
Signed-off-by: Jan Zachmann <[email protected]>
1 parent a03e2d3 commit f88aa66

File tree

7 files changed

+62
-132
lines changed

7 files changed

+62
-132
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
88
name = "omnect-device-service"
99
readme = "README.md"
1010
repository = "https://github.com/omnect/omnect-device-service.git"
11-
version = "0.40.0"
11+
version = "0.40.1"
1212

1313
[dependencies]
1414
actix-server = { version = "2.3", default-features = false }

src/systemd/mod.rs

Lines changed: 25 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ pub mod networkd;
22
pub mod unit;
33
pub mod watchdog;
44

5-
use anyhow::{bail, Result};
6-
use log::{debug, info};
5+
use anyhow::{Context, Result};
6+
use log::info;
77
use sd_notify::NotifyState;
8-
use std::{sync::Once, thread, time, time::Duration};
8+
use std::sync::Once;
99
use systemd_zbus::ManagerProxy;
10-
use tokio::time::{timeout_at, Instant};
10+
use tokio_stream::StreamExt;
1111

1212
pub fn sd_notify_ready() {
1313
static SD_NOTIFY_ONCE: Once = Once::new();
@@ -20,11 +20,11 @@ pub fn sd_notify_ready() {
2020
#[cfg(not(feature = "mock"))]
2121
pub async fn reboot() -> Result<()> {
2222
use anyhow::Context;
23-
use log::error;
23+
use log::{debug, error};
2424
use std::process::Command;
2525

2626
info!("systemd::reboot");
27-
27+
2828
//journalctl seems not to have a dbus api
2929
match Command::new("sudo").args(["journalctl", "--sync"]).status() {
3030
Ok(status) if !status.success() => error!("reboot: failed to execute 'journalctl --sync'"),
@@ -47,48 +47,27 @@ pub async fn reboot() -> Result<()> {
4747
Ok(())
4848
}
4949

50-
pub async fn wait_for_system_running(timeout: Duration) -> Result<()> {
51-
let begin = Instant::now();
52-
let deadline = begin + timeout;
53-
let system = timeout_at(deadline, zbus::Connection::system()).await??;
54-
let manager = timeout_at(
55-
deadline,
56-
// here we use manager which explicitly doesn't cache the system state
57-
ManagerProxy::builder(&system)
58-
.uncached_properties(&["SystemState"])
59-
.build(),
60-
)
61-
.await??;
62-
63-
loop {
64-
// timeout_at doesn't return error/timeout if the future returns immediately, so we have to check.
65-
// e.g. manager.system_state() returns immediately if using a caching ManagerProxy
66-
if begin.elapsed() >= timeout {
67-
bail!("wait_for_system_running timeout occurred");
68-
}
69-
70-
let system_state = timeout_at(deadline, manager.system_state()).await??;
50+
pub async fn wait_for_system_running() -> Result<()> {
51+
let connection = zbus::Connection::system()
52+
.await
53+
.context("wait_for_system_running: failed to create connection")?;
54+
// here we use manager which explicitly doesn't cache the system state
55+
let manager = ManagerProxy::builder(&connection)
56+
.uncached_properties(&["SystemState"])
57+
.build()
58+
.await
59+
.context("wait_for_system_running: failed to create manager")?;
7160

72-
match system_state.as_str() {
73-
"running" => {
74-
debug!("wait_for_system_running: system_state == running");
75-
return Ok(());
76-
}
77-
"initializing" | "starting" => {
78-
/*
79-
* ToDo https://github.com/omnect/omnect-device-service/pull/39#discussion_r1142147564
80-
* This is tricky because you have to receive system state signals, which blocks if you
81-
* are already in state "running". if you receive the system state on condition after
82-
* you got state "starting" by polling, there would by a race which can result in a
83-
* deadlock of receiving the state signal again.
84-
* So a solution would be to poll, start signal receiving, poll again and stop
85-
* possibly stop receiving.
86-
*/
87-
thread::sleep(time::Duration::from_millis(100));
88-
}
89-
_ => bail!("system in error state: \"{system_state}\""),
90-
}
61+
if manager.system_state().await? != "running" {
62+
manager
63+
.receive_system_state_changed()
64+
.await
65+
.filter(|p| p.name() == "running")
66+
.next()
67+
.await;
9168
}
69+
70+
Ok(())
9271
}
9372

9473
#[cfg(feature = "mock")]

src/systemd/unit.rs

Lines changed: 32 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,6 @@
11
use anyhow::Result;
2-
use std::time::Duration;
32
pub use systemd_zbus::Mode;
43

5-
#[cfg(not(any(feature = "mock", test)))]
6-
use {
7-
anyhow::{ensure, Context},
8-
futures_util::{join, StreamExt},
9-
log::debug,
10-
systemd_zbus::ManagerProxy,
11-
tokio::time::{timeout_at, Instant},
12-
};
13-
144
#[derive(Copy, Clone, Debug)]
155
pub enum UnitAction {
166
Reload,
@@ -20,17 +10,24 @@ pub enum UnitAction {
2010
}
2111

2212
#[cfg(not(feature = "mock"))]
23-
pub async fn unit_action(
24-
unit: &str,
25-
unit_action: UnitAction,
26-
timeout: Duration,
27-
mode: Mode,
28-
) -> Result<()> {
29-
let deadline = Instant::now() + timeout;
30-
let system = timeout_at(deadline, zbus::Connection::system()).await??;
31-
let manager = timeout_at(deadline, ManagerProxy::new(&system)).await??;
32-
33-
let mut job_removed_stream = timeout_at(deadline, manager.receive_job_removed()).await??;
13+
pub async fn unit_action(unit: &str, unit_action: UnitAction, mode: Mode) -> Result<()> {
14+
use anyhow::Context;
15+
use log::debug;
16+
use systemd_zbus::ManagerProxy;
17+
use tokio_stream::StreamExt;
18+
19+
debug!("unit_action: {unit} {unit_action:?} {mode:?}");
20+
21+
let connection = zbus::Connection::system()
22+
.await
23+
.context("failed to create connection")?;
24+
let manager: ManagerProxy<'_> = ManagerProxy::new(&connection)
25+
.await
26+
.context("failed to create manager")?;
27+
let job_removed_stream = manager
28+
.receive_job_removed()
29+
.await
30+
.context("failed to create job_removed_stream")?;
3431

3532
let action = |&unit_action| async move {
3633
match unit_action {
@@ -41,55 +38,25 @@ pub async fn unit_action(
4138
}
4239
};
4340

44-
let (job_removed, job) = join!(
45-
timeout_at(deadline, job_removed_stream.next()),
46-
timeout_at(deadline, action(&unit_action))
47-
);
48-
49-
let job = job
50-
.context(format!("systemd unit {unit_action:?} \"{unit}\" failed"))??
41+
let job = action(&unit_action)
42+
.await
43+
.context("unit action failed")?
5144
.into_inner();
5245

53-
let job_removed = job_removed
54-
.context("systemd job_removed_stream")?
55-
.context("failed to get next item in job removed stream")?;
56-
57-
let job_removed_args = job_removed.args().context("get removed args")?;
46+
job_removed_stream
47+
.filter(|job_removed| {
48+
if let Ok(args) = job_removed.args() {
49+
return args.job() == &job && args.result == "done";
50+
};
51+
false
52+
})
53+
.next()
54+
.await;
5855

59-
debug!("job removed: {job_removed_args:?}");
60-
if job_removed_args.job() == &job {
61-
ensure!(
62-
job_removed_args.result == "done",
63-
"systemd (1) unit {unit_action:?} \"{unit}\" isn't done: {}",
64-
job_removed_args.result
65-
);
66-
return Ok(());
67-
}
68-
69-
loop {
70-
let job_removed = timeout_at(deadline, job_removed_stream.next())
71-
.await?
72-
.context("no job_removed signal received")?;
73-
74-
let job_removed_args = job_removed.args()?;
75-
debug!("job removed: {job_removed_args:?}");
76-
if job_removed_args.job() == &job {
77-
ensure!(
78-
job_removed_args.result == "done",
79-
"systemd (2) unit {unit_action:?} \"{unit}\" isn't done: {}",
80-
job_removed_args.result
81-
);
82-
return Ok(());
83-
}
84-
}
56+
Ok(())
8557
}
8658

8759
#[cfg(feature = "mock")]
88-
pub async fn unit_action(
89-
_unit: &str,
90-
_unit_action: UnitAction,
91-
_timeout: Duration,
92-
_mode: Mode,
93-
) -> Result<()> {
60+
pub async fn unit_action(_unit: &str, _unit_action: UnitAction, _mode: Mode) -> Result<()> {
9461
Ok(())
9562
}

src/twin/firmware_update/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use update_validation::UpdateValidation;
2828

2929
static LOAD_UPDATE_WDT_INTERVAL_SECS: u64 = 120;
3030
static RUN_UPDATE_WDT_INTERVAL_SECS: u64 = 600;
31-
static UNIT_ACTION_TIMEOUT_SECS: u64 = 30;
3231

3332
struct LoadUpdateGuard {
3433
wdt: Option<Duration>,
@@ -77,15 +76,12 @@ impl RunUpdateGuard {
7776
systemd::unit::unit_action(
7877
IOT_HUB_DEVICE_UPDATE_SERVICE_TIMER,
7978
UnitAction::Stop,
80-
Duration::from_secs(UNIT_ACTION_TIMEOUT_SECS),
8179
systemd_zbus::Mode::Replace,
8280
)
8381
.await?;
84-
8582
systemd::unit::unit_action(
8683
IOT_HUB_DEVICE_UPDATE_SERVICE,
8784
UnitAction::Stop,
88-
Duration::from_secs(UNIT_ACTION_TIMEOUT_SECS),
8985
systemd_zbus::Mode::Replace,
9086
)
9187
.await?;
@@ -117,7 +113,6 @@ impl Drop for RunUpdateGuard {
117113
if let Err(e) = systemd::unit::unit_action(
118114
IOT_HUB_DEVICE_UPDATE_SERVICE,
119115
UnitAction::Start,
120-
Duration::from_secs(UNIT_ACTION_TIMEOUT_SECS),
121116
systemd_zbus::Mode::Fail,
122117
)
123118
.await
@@ -128,7 +123,6 @@ impl Drop for RunUpdateGuard {
128123
if let Err(e) = systemd::unit::unit_action(
129124
IOT_HUB_DEVICE_UPDATE_SERVICE_TIMER,
130125
UnitAction::Start,
131-
Duration::from_secs(UNIT_ACTION_TIMEOUT_SECS),
132126
systemd_zbus::Mode::Fail,
133127
)
134128
.await

src/twin/firmware_update/update_validation.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,9 @@ impl UpdateValidation {
165165

166166
async fn validate(&mut self) -> Result<()> {
167167
debug!("started");
168-
let now = Duration::from(nix::time::clock_gettime(
169-
nix::time::ClockId::CLOCK_MONOTONIC,
170-
)?);
171-
let timeout = self.validation_timeout - (now - self.start_monotonic_time);
172-
systemd::wait_for_system_running(timeout).await?;
168+
systemd::wait_for_system_running()
169+
.await
170+
.context("validate: wait_for_system_running timed out")?;
173171

174172
/* ToDo: if it returns with an error, we may want to handle the state
175173
* "degraded" and possibly ignore certain failed services via configuration
@@ -180,19 +178,13 @@ impl UpdateValidation {
180178
debug!("starting {IOT_HUB_DEVICE_UPDATE_SERVICE}");
181179
fs::remove_file(UPDATE_VALIDATION_FILE).context("remove UPDATE_VALIDATION_FILE")?;
182180

183-
let now = Duration::from(nix::time::clock_gettime(
184-
nix::time::ClockId::CLOCK_MONOTONIC,
185-
)?);
186-
let timeout = self.validation_timeout - (now - self.start_monotonic_time);
187-
188181
// in case of local update we don't take care of starting deviceupdate-agent.service,
189182
// since it might fail because of missing iothub connection.
190183
// instead we let deviceupdate-agent.timer doing the job periodically
191184
if !self.local_update {
192185
systemd::unit::unit_action(
193186
IOT_HUB_DEVICE_UPDATE_SERVICE,
194187
UnitAction::Start,
195-
timeout,
196188
systemd_zbus::Mode::Fail,
197189
)
198190
.await?;

src/twin/network.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ lazy_static! {
2323
}
2424

2525
static NETWORK_SERVICE: &str = "systemd-networkd.service";
26-
static NETWORK_SERVICE_RELOAD_TIMEOUT_IN_SECS: u64 = 15;
2726

2827
#[derive(PartialEq, Serialize)]
2928
pub struct Address {
@@ -93,7 +92,6 @@ impl Feature for Network {
9392
unit::unit_action(
9493
NETWORK_SERVICE,
9594
unit::UnitAction::Reload,
96-
Duration::from_secs(NETWORK_SERVICE_RELOAD_TIMEOUT_IN_SECS),
9795
systemd_zbus::Mode::Fail,
9896
)
9997
.await?

0 commit comments

Comments
 (0)