#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
#![doc = include_str!("../README.md")]
#![cfg_attr(not(ci_arti_stable), allow(renamed_and_removed_lints))]
#![cfg_attr(not(ci_arti_nightly), allow(unknown_lints))]
#![deny(missing_docs)]
#![warn(noop_method_call)]
#![deny(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![deny(clippy::missing_panics_doc)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![allow(clippy::let_unit_value)] #![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)]
pub mod events;
use crate::events::{TorEvent, TorEventKind};
use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
use futures::channel::mpsc;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::future::Either;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use thiserror::Error;
use tracing::{error, warn};
static EVENT_SENDER: OnceCell<UnboundedSender<TorEvent>> = OnceCell::new();
static CURRENT_RECEIVER: OnceCell<InactiveReceiver<TorEvent>> = OnceCell::new();
const EVENT_KIND_COUNT: usize = 1;
static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
pub static BROADCAST_CAPACITY: usize = 512;
pub struct EventReactor {
receiver: UnboundedReceiver<TorEvent>,
broadcast: Sender<TorEvent>,
}
impl EventReactor {
pub fn new() -> Option<Self> {
let (tx, rx) = mpsc::unbounded();
if EVENT_SENDER.set(tx).is_ok() {
let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
CURRENT_RECEIVER
.set(brx.deactivate())
.expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
Some(Self {
receiver: rx,
broadcast: btx,
})
} else {
None
}
}
pub fn receiver() -> Option<TorEventReceiver> {
CURRENT_RECEIVER
.get()
.map(|rx| TorEventReceiver::wrap(rx.clone()))
}
pub async fn run(mut self) {
while let Some(event) = self.receiver.next().await {
match self.broadcast.try_broadcast(event) {
Ok(_) => {}
Err(TrySendError::Closed(_)) => break,
Err(TrySendError::Full(event)) => {
warn!("TorEventReceivers aren't receiving events fast enough!");
if self.broadcast.broadcast(event).await.is_err() {
break;
}
}
Err(TrySendError::Inactive(_)) => {
}
}
}
error!("event reactor shutting down; this shouldn't ever happen");
}
}
#[derive(Clone, Debug, Error)]
#[non_exhaustive]
pub enum ReceiverError {
#[error("No event subscriptions")]
NoSubscriptions,
#[error("Internal event broadcast channel closed")]
ChannelClosed,
}
#[derive(Clone, Debug)]
pub struct TorEventReceiver {
inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
subscribed: [bool; EVENT_KIND_COUNT],
}
impl futures::stream::Stream for TorEventReceiver {
type Item = TorEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.inner {
Either::Left(ref mut active) => loop {
match Pin::new(&mut *active).poll_next(cx) {
Poll::Ready(Some(e)) => {
if this.subscribed[e.kind() as usize] {
return Poll::Ready(Some(e));
}
}
x => return x,
}
},
Either::Right(_) => {
warn!("TorEventReceiver::poll_next() called without subscriptions!");
Poll::Ready(None)
}
}
}
}
impl TorEventReceiver {
pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
Self {
inner: Either::Right(rx),
subscribed: [false; EVENT_KIND_COUNT],
}
}
pub fn subscribe(&mut self, kind: TorEventKind) {
if !self.subscribed[kind as usize] {
EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
self.subscribed[kind as usize] = true;
}
if let Either::Right(inactive) = self.inner.clone() {
self.inner = Either::Left(inactive.activate());
}
}
pub fn unsubscribe(&mut self, kind: TorEventKind) {
if self.subscribed[kind as usize] {
EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
self.subscribed[kind as usize] = false;
}
if self.subscribed.iter().all(|x| !*x) {
if let Either::Left(active) = self.inner.clone() {
self.inner = Either::Right(active.deactivate());
}
}
}
}
impl Drop for TorEventReceiver {
fn drop(&mut self) {
for (i, subscribed) in self.subscribed.iter().enumerate() {
if *subscribed {
EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
}
}
}
}
pub fn event_has_subscribers(kind: TorEventKind) -> bool {
EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
}
pub fn broadcast(event: TorEvent) {
if !event_has_subscribers(event.kind()) {
return;
}
if let Some(sender) = EVENT_SENDER.get() {
let _ = sender.unbounded_send(event);
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use crate::{
broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
};
use once_cell::sync::OnceCell;
use std::sync::{Mutex, MutexGuard};
use std::time::Duration;
use tokio::runtime::Runtime;
static TEST_MUTEX: OnceCell<Mutex<Runtime>> = OnceCell::new();
fn test_setup() -> MutexGuard<'static, Runtime> {
let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
let runtime = mutex
.lock()
.expect("mutex poisoned, probably by other failing tests");
if let Some(reactor) = EventReactor::new() {
runtime.handle().spawn(reactor.run());
}
runtime
}
#[test]
fn subscriptions() {
let rt = test_setup();
rt.block_on(async move {
assert!(!event_has_subscribers(TorEventKind::Empty));
let mut rx = EventReactor::receiver().unwrap();
assert!(!event_has_subscribers(TorEventKind::Empty));
rx.subscribe(TorEventKind::Empty);
assert!(event_has_subscribers(TorEventKind::Empty));
rx.unsubscribe(TorEventKind::Empty);
assert!(!event_has_subscribers(TorEventKind::Empty));
rx.subscribe(TorEventKind::Empty);
rx.subscribe(TorEventKind::Empty);
rx.subscribe(TorEventKind::Empty);
assert!(event_has_subscribers(TorEventKind::Empty));
rx.unsubscribe(TorEventKind::Empty);
assert!(!event_has_subscribers(TorEventKind::Empty));
rx.subscribe(TorEventKind::Empty);
assert!(event_has_subscribers(TorEventKind::Empty));
std::mem::drop(rx);
assert!(!event_has_subscribers(TorEventKind::Empty));
});
}
#[test]
fn empty_recv() {
let rt = test_setup();
rt.block_on(async move {
let mut rx = EventReactor::receiver().unwrap();
let result = rx.next().await;
assert!(result.is_none());
});
}
#[test]
fn receives_events() {
let rt = test_setup();
rt.block_on(async move {
let mut rx = EventReactor::receiver().unwrap();
rx.subscribe(TorEventKind::Empty);
tokio::time::sleep(Duration::from_millis(100)).await;
broadcast(TorEvent::Empty);
let result = rx.next().await;
assert_eq!(result, Some(TorEvent::Empty));
});
}
#[test]
fn does_not_send_to_no_subscribers() {
let rt = test_setup();
rt.block_on(async move {
broadcast(TorEvent::Empty);
let mut rx = EventReactor::receiver().unwrap();
rx.subscribe(TorEventKind::Empty);
let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
assert!(result.is_err());
});
}
}