use super::context::Context;
use crate::Error;
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::StreamExt;
use time::OffsetDateTime;
#[derive(Debug)]
pub struct Message {
pub message: crate::Message,
pub context: Context,
}
impl std::ops::Deref for Message {
type Target = crate::Message;
fn deref(&self) -> &Self::Target {
&self.message
}
}
impl From<Message> for crate::Message {
fn from(source: Message) -> crate::Message {
source.message
}
}
impl Message {
pub async fn ack(&self) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
self.context
.client
.publish(reply.to_string(), "".into())
.map_err(Error::from)
.await
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"No reply subject, not a JetStream message",
)))
}
}
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
self.context
.client
.publish(reply.to_string(), kind.into())
.map_err(Error::from)
.await
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"No reply subject, not a JetStream message",
)))
}
}
pub async fn double_ack(&self) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
let inbox = self.context.client.new_inbox();
let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
self.context
.client
.publish_with_reply(reply.to_string(), inbox, AckKind::Ack.into())
.await?;
match tokio::time::timeout(self.context.timeout, subscription.next())
.await
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
"double ack response timed out",
)
})? {
Some(_) => Ok(()),
None => Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"subscription dropped",
))),
}
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"No reply subject, not a JetStream message",
)))
}
}
#[allow(clippy::mixed_read_write_in_expression)]
pub fn info(&self) -> Result<Info<'_>, Error> {
const PREFIX: &str = "$JS.ACK.";
const SKIP: usize = PREFIX.len();
let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
})?;
if !reply.starts_with(PREFIX) {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"did not found proper prefix",
)));
}
reply = &reply[SKIP..];
let mut split = reply.split('.');
let mut tokens: [Option<&str>; 10] = [None; 10];
let mut n_tokens = 0;
for each_token in &mut tokens {
if let Some(token) = split.next() {
*each_token = Some(token);
n_tokens += 1;
}
}
let mut token_index = 0;
macro_rules! try_parse {
() => {
match str::parse(try_parse!(str)) {
Ok(parsed) => parsed,
Err(e) => {
return Err(Box::new(e));
}
}
};
(str) => {
if let Some(next) = tokens[token_index].take() {
#[allow(unused)]
{
token_index += 1;
}
next
} else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"too few tokens",
)));
}
};
}
if n_tokens >= 9 {
Ok(Info {
domain: {
let domain: &str = try_parse!(str);
if domain == "_" {
None
} else {
Some(domain)
}
},
acc_hash: Some(try_parse!(str)),
stream: try_parse!(str),
consumer: try_parse!(str),
delivered: try_parse!(),
stream_sequence: try_parse!(),
consumer_sequence: try_parse!(),
published: {
let nanos: i128 = try_parse!();
OffsetDateTime::from_unix_timestamp_nanos(nanos)?
},
pending: try_parse!(),
token: if n_tokens >= 9 {
Some(try_parse!(str))
} else {
None
},
})
} else if n_tokens == 7 {
Ok(Info {
domain: None,
acc_hash: None,
stream: try_parse!(str),
consumer: try_parse!(str),
delivered: try_parse!(),
stream_sequence: try_parse!(),
consumer_sequence: try_parse!(),
published: {
let nanos: i128 = try_parse!();
OffsetDateTime::from_unix_timestamp_nanos(nanos)?
},
pending: try_parse!(),
token: None,
})
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"bad token number",
)))
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum AckKind {
Ack,
Nak,
Progress,
Next,
Term,
}
impl From<AckKind> for Bytes {
fn from(kind: AckKind) -> Self {
use AckKind::*;
match kind {
Ack => Bytes::from_static(b"+ACK"),
Nak => Bytes::from_static(b"-NAK"),
Progress => Bytes::from_static(b"+WPI"),
Next => Bytes::from_static(b"+NXT"),
Term => Bytes::from_static(b"+TERM"),
}
}
}
#[derive(Debug, Clone)]
pub struct Info<'a> {
pub domain: Option<&'a str>,
pub acc_hash: Option<&'a str>,
pub stream: &'a str,
pub consumer: &'a str,
pub stream_sequence: u64,
pub consumer_sequence: u64,
pub delivered: i64,
pub pending: u64,
pub published: time::OffsetDateTime,
pub token: Option<&'a str>,
}