Skip to content

Commit c81e16f

Browse files
committed
Unit test for panick, and handling a post-mortem observation.
1 parent 731deb4 commit c81e16f

File tree

5 files changed

+79
-40
lines changed

5 files changed

+79
-40
lines changed

quickwit-actors/src/actor_handle.rs

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,7 @@ mod tests {
235235
_ctx: &ActorContext<Self>,
236236
) -> Result<(), ActorExitStatus> {
237237
self.count += 1;
238-
if self.count == 2 {
239-
panic!("Oops");
240-
}
241-
Ok(())
238+
panic!("Oops");
242239
}
243240
}
244241

@@ -250,10 +247,7 @@ mod tests {
250247
_ctx: &ActorContext<Self>,
251248
) -> Result<(), ActorExitStatus> {
252249
self.count += 1;
253-
if self.count == 2 {
254-
panic!("Oops");
255-
}
256-
Ok(())
250+
panic!("Oops");
257251
}
258252
}
259253

@@ -277,10 +271,7 @@ mod tests {
277271
_ctx: &ActorContext<Self>,
278272
) -> Result<(), ActorExitStatus> {
279273
self.count += 1;
280-
if self.count == 2 {
281-
return Err(ActorExitStatus::DownstreamClosed);
282-
}
283-
Ok(())
274+
Err(ActorExitStatus::DownstreamClosed)
284275
}
285276
}
286277

@@ -292,10 +283,7 @@ mod tests {
292283
_ctx: &ActorContext<Self>,
293284
) -> Result<(), ActorExitStatus> {
294285
self.count += 1;
295-
if self.count == 2 {
296-
return Err(ActorExitStatus::DownstreamClosed);
297-
}
298-
Ok(())
286+
Err(ActorExitStatus::DownstreamClosed)
299287
}
300288
}
301289

@@ -304,8 +292,6 @@ mod tests {
304292
let universe = Universe::new();
305293
let (mailbox, handle) = universe.spawn(PanickingActor::default());
306294
universe.send_message(&mailbox, ()).await?;
307-
assert_eq!(handle.process_pending_and_observe().await.state, 1);
308-
universe.send_message(&mailbox, ()).await?;
309295
let (exit_status, count) = handle.join().await;
310296
assert!(matches!(exit_status, ActorExitStatus::Panicked));
311297
assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
@@ -317,11 +303,9 @@ mod tests {
317303
let universe = Universe::new();
318304
let (mailbox, handle) = universe.spawn_sync_actor(PanickingActor::default());
319305
universe.send_message(&mailbox, ()).await?;
320-
assert_eq!(handle.process_pending_and_observe().await.state, 1);
321-
universe.send_message(&mailbox, ()).await?;
322306
let (exit_status, count) = handle.join().await;
323307
assert!(matches!(exit_status, ActorExitStatus::Panicked));
324-
assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
308+
assert!(matches!(count, 1));
325309
Ok(())
326310
}
327311

@@ -330,11 +314,9 @@ mod tests {
330314
let universe = Universe::new();
331315
let (mailbox, handle) = universe.spawn(ExitActor::default());
332316
universe.send_message(&mailbox, ()).await?;
333-
assert_eq!(handle.process_pending_and_observe().await.state, 1);
334-
universe.send_message(&mailbox, ()).await?;
335317
let (exit_status, count) = handle.join().await;
336318
assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
337-
assert!(matches!(count, 2)); //< Upon panick we cannot get a post mortem state.
319+
assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
338320
Ok(())
339321
}
340322

@@ -343,11 +325,9 @@ mod tests {
343325
let universe = Universe::new();
344326
let (mailbox, handle) = universe.spawn_sync_actor(ExitActor::default());
345327
universe.send_message(&mailbox, ()).await?;
346-
assert_eq!(handle.process_pending_and_observe().await.state, 1);
347-
universe.send_message(&mailbox, ()).await?;
348328
let (exit_status, count) = handle.join().await;
349329
assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
350-
assert!(matches!(count, 2));
330+
assert!(matches!(count, 1));
351331
Ok(())
352332
}
353333
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use tokio::sync::watch::Sender;
2+
3+
use crate::Actor;
4+
5+
// Quickwit
6+
// Copyright (C) 2021 Quickwit Inc.
7+
//
8+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
9+
// For commercial licensing, contact us at [email protected].
10+
//
11+
// AGPL:
12+
// This program is free software: you can redistribute it and/or modify
13+
// it under the terms of the GNU Affero General Public License as
14+
// published by the Free Software Foundation, either version 3 of the
15+
// License, or (at your option) any later version.
16+
//
17+
// This program is distributed in the hope that it will be useful,
18+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
19+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20+
// GNU Affero General Public License for more details.
21+
//
22+
// You should have received a copy of the GNU Affero General Public License
23+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
24+
25+
pub struct ActorWithStateTx<A: Actor> {
26+
pub actor: A,
27+
pub state_tx: Sender<A::ObservableState>,
28+
}
29+
30+
impl<A: Actor> Drop for ActorWithStateTx<A> {
31+
fn drop(&mut self) {
32+
let final_state = self.actor.observable_state();
33+
// We ignore the result here. An error only marks the absence of an observer.
34+
let _ = self.state_tx.send(final_state);
35+
}
36+
}

quickwit-actors/src/async_actor.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use crate::actor::{process_command, ActorExitStatus};
2222
use crate::actor_handle::ActorHandle;
2323
use crate::actor_state::ActorState;
24+
use crate::actor_with_state_tx::ActorWithStateTx;
2425
use crate::mailbox::{create_mailbox, CommandOrMessage, Inbox};
2526
use crate::scheduler::SchedulerMessage;
2627
use crate::{Actor, ActorContext, KillSwitch, Mailbox, RecvError};
@@ -135,29 +136,40 @@ async fn process_msg<A: Actor + AsyncActor>(
135136
}
136137

137138
async fn async_actor_loop<A: AsyncActor>(
138-
mut actor: A,
139+
actor: A,
139140
mut inbox: Inbox<A::Message>,
140141
mut ctx: ActorContext<A>,
141142
state_tx: Sender<A::ObservableState>,
142143
) -> ActorExitStatus {
143-
let mut exit_status_opt: Option<ActorExitStatus> = actor.initialize(&ctx).await.err();
144+
// We rely on this object internally to fetch a post-mortem state,
145+
// even in case of a panic.
146+
let mut actor_with_state_tx = ActorWithStateTx { actor, state_tx };
147+
148+
let mut exit_status_opt: Option<ActorExitStatus> =
149+
actor_with_state_tx.actor.initialize(&ctx).await.err();
150+
144151
let exit_status: ActorExitStatus = loop {
145152
tokio::task::yield_now().await;
146153
if let Some(exit_status) = exit_status_opt {
147154
break exit_status;
148155
}
149-
exit_status_opt = process_msg(&mut actor, &mut inbox, &mut ctx, &state_tx).await;
156+
exit_status_opt = process_msg(
157+
&mut actor_with_state_tx.actor,
158+
&mut inbox,
159+
&mut ctx,
160+
&actor_with_state_tx.state_tx,
161+
)
162+
.await;
150163
};
151164
ctx.exit(&exit_status);
152165

153-
if let Err(finalize_error) = actor
166+
if let Err(finalize_error) = actor_with_state_tx
167+
.actor
154168
.finalize(&exit_status, &ctx)
155169
.await
156-
.with_context(|| format!("Finalization of actor {}", actor.name()))
170+
.with_context(|| format!("Finalization of actor {}", actor_with_state_tx.actor.name()))
157171
{
158172
error!(err=?finalize_error, "finalize_error");
159173
}
160-
let final_state = actor.observable_state();
161-
let _ = state_tx.send(final_state);
162174
exit_status
163175
}

quickwit-actors/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tokio::time::Duration;
3333
mod actor;
3434
mod actor_handle;
3535
mod actor_state;
36+
mod actor_with_state_tx;
3637
mod async_actor;
3738
pub(crate) mod channel_with_priority;
3839
mod kill_switch;

quickwit-actors/src/sync_actor.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tracing::{debug, error, info};
2424

2525
use crate::actor::{process_command, ActorExitStatus};
2626
use crate::actor_state::ActorState;
27+
use crate::actor_with_state_tx::ActorWithStateTx;
2728
use crate::mailbox::{create_mailbox, CommandOrMessage, Inbox};
2829
use crate::scheduler::SchedulerMessage;
2930
use crate::{Actor, ActorContext, ActorHandle, KillSwitch, Mailbox, RecvError};
@@ -132,23 +133,32 @@ fn process_msg<A: Actor + SyncActor>(
132133
}
133134

134135
fn sync_actor_loop<A: SyncActor>(
135-
mut actor: A,
136+
actor: A,
136137
mut inbox: Inbox<A::Message>,
137138
mut ctx: ActorContext<A>,
138139
state_tx: Sender<A::ObservableState>,
139140
) -> ActorExitStatus {
140-
let mut exit_status_opt: Option<ActorExitStatus> = actor.initialize(&ctx).err();
141+
// We rely on this object internally to fetch a post-mortem state,
142+
// even in case of a panic.
143+
let mut actor_with_state_tx = ActorWithStateTx { actor, state_tx };
144+
145+
let mut exit_status_opt: Option<ActorExitStatus> =
146+
actor_with_state_tx.actor.initialize(&ctx).err();
147+
141148
let exit_status: ActorExitStatus = loop {
142149
if let Some(exit_status) = exit_status_opt {
143150
break exit_status;
144151
}
145-
exit_status_opt = process_msg(&mut actor, &mut inbox, &mut ctx, &state_tx);
152+
exit_status_opt = process_msg(
153+
&mut actor_with_state_tx.actor,
154+
&mut inbox,
155+
&mut ctx,
156+
&actor_with_state_tx.state_tx,
157+
);
146158
};
147159
ctx.exit(&exit_status);
148-
if let Err(error) = actor.finalize(&exit_status, &ctx) {
160+
if let Err(error) = actor_with_state_tx.actor.finalize(&exit_status, &ctx) {
149161
error!(error=?error, "Finalizing failed");
150162
}
151-
let final_state = actor.observable_state();
152-
let _ = state_tx.send(final_state);
153163
exit_status
154164
}

0 commit comments

Comments
 (0)