diff --git a/Cargo.toml b/Cargo.toml index 9f71b3239..16ffc0a8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ version = "1.0.98" features = ["derive"] [dependencies.tokio] -version = "0.2.15" +version = "0.2.18" features = ["io-util", "sync"] [dependencies.tokio-rustls] diff --git a/src/cursor.rs b/src/cursor.rs new file mode 100644 index 000000000..dac197a12 --- /dev/null +++ b/src/cursor.rs @@ -0,0 +1,165 @@ +use std::{ + collections::VecDeque, + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use bson::{doc, Document}; +use derivative::Derivative; +use futures::{future::BoxFuture, stream::Stream}; + +use crate::{ + error::Result, + operation::GetMore, + options::StreamAddress, + results::GetMoreResult, + Client, + Namespace, + RUNTIME, +}; + +#[derive(Debug)] +pub(crate) struct CursorSpecification { + pub(crate) ns: Namespace, + pub(crate) address: StreamAddress, + pub(crate) id: i64, + pub(crate) batch_size: Option, + pub(crate) max_time: Option, + pub(crate) buffer: VecDeque, +} + +#[derive(Debug)] +pub struct Cursor { + client: Client, + get_more: GetMore, + exhausted: bool, + state: State, +} + +type GetMoreFuture = BoxFuture<'static, Result>; + +/// Describes the current state of the Cursor. If the state is Executing, then a getMore operation +/// is in progress. If the state is Buffer, then there are documents available from the current +/// batch. +#[derive(Derivative)] +#[derivative(Debug)] +enum State { + Executing(#[derivative(Debug = "ignore")] GetMoreFuture), + Buffer(VecDeque), + Exhausted, +} + +impl Cursor { + pub(crate) fn new(client: Client, spec: CursorSpecification) -> Self { + let get_more = GetMore::new( + spec.ns, + spec.id, + spec.address, + spec.batch_size, + spec.max_time, + ); + + Self { + client, + get_more, + exhausted: spec.id == 0, + state: State::Buffer(spec.buffer), + } + } +} + +impl Drop for Cursor { + fn drop(&mut self) { + if self.exhausted { + return; + } + + let namespace = self.get_more.namespace().clone(); + let client = self.client.clone(); + let cursor_id = self.get_more.cursor_id(); + + RUNTIME.execute(async move { + let _: Result<_> = client + .database(&namespace.db) + .run_command( + doc! { + "killCursors": &namespace.coll, + "cursors": [cursor_id] + }, + None, + ) + .await; + }) + } +} + +impl Stream for Cursor { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + loop { + match self.state { + // If the current state is Executing, then we check the progress of the getMore + // operation. + State::Executing(ref mut future) => { + match Pin::new(future).poll(cx) { + // If the getMore is finished and successful, then we pop off the first + // document from the batch, set the poll state to + // Buffer, record whether the cursor is exhausted, + // and return the popped document. + Poll::Ready(Ok(get_more_result)) => { + let mut buffer: VecDeque<_> = + get_more_result.batch.into_iter().collect(); + let next_doc = buffer.pop_front(); + + self.state = State::Buffer(buffer); + self.exhausted = get_more_result.exhausted; + + match next_doc { + Some(doc) => return Poll::Ready(Some(Ok(doc))), + None => continue, + } + } + + // If the getMore finished with an error, return that error. + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))), + + // If the getMore has not finished, keep the state as Executing and return. + Poll::Pending => return Poll::Pending, + } + } + + State::Buffer(ref mut buffer) => { + // If there is a document ready, return it. + if let Some(doc) = buffer.pop_front() { + return Poll::Ready(Some(Ok(doc))); + } + + // If no documents are left and the batch and the cursor is exhausted, set the + // state to None. + if self.exhausted { + self.state = State::Exhausted; + return Poll::Ready(None); + // If the batch is empty and the cursor is not exhausted, start a new operation + // and set the state to Executing. + } else { + let future = Box::pin( + self.client + .clone() + .execute_operation_owned(self.get_more.clone()), + ); + + self.state = State::Executing(future as GetMoreFuture); + continue; + } + } + + // If the state is None, then the cursor has already exhausted all its results, so + // do nothing. + State::Exhausted => return Poll::Ready(None), + } + } + } +} diff --git a/src/cursor/impatient.rs b/src/cursor/impatient.rs deleted file mode 100644 index d9c7a2893..000000000 --- a/src/cursor/impatient.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::{ - collections::VecDeque, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use bson::{doc, Document}; -use derivative::Derivative; -use futures::{future::BoxFuture, stream::Stream}; - -use super::CursorSpecification; -use crate::{error::Result, operation::GetMore, results::GetMoreResult, Client, RUNTIME}; - -#[derive(Debug)] -pub struct ImpatientCursor { - client: Client, - get_more: GetMore, - exhausted: bool, - state: State, -} - -/// Describes the current state of the Cursor. If the state is Executing, then a getMore operation -/// is in progress. If the state is Buffer, then there are documents available from the current -/// batch. -#[derive(Derivative)] -#[derivative(Debug)] -enum State { - Executing(#[derivative(Debug = "ignore")] BoxFuture<'static, Result>), - Buffer(VecDeque), - Exhausted, -} - -impl ImpatientCursor { - pub(crate) fn new(client: Client, spec: CursorSpecification) -> Self { - let get_more = GetMore::new( - spec.ns, - spec.id, - spec.address, - spec.batch_size, - spec.max_time, - ); - - Self { - client, - get_more, - exhausted: spec.id == 0, - state: State::Buffer(spec.buffer), - } - } - - pub(super) fn exhausted(&self) -> bool { - self.exhausted - } -} - -impl Drop for ImpatientCursor { - fn drop(&mut self) { - if self.exhausted { - return; - } - - let namespace = self.get_more.namespace().clone(); - let client = self.client.clone(); - let cursor_id = self.get_more.cursor_id(); - - RUNTIME.execute(async move { - let _: Result<_> = client - .database(&namespace.db) - .run_command( - doc! { - "killCursors": &namespace.coll, - "cursors": [cursor_id] - }, - None, - ) - .await; - }) - } -} - -impl Stream for ImpatientCursor { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let exhausted = self.exhausted; - - match self.state { - // If the current state is Executing, then we check the progress of the getMore - // operation. - State::Executing(ref mut future) => match Pin::new(future).poll(cx) { - // If the getMore is finished and successful, then we pop off the first document - // from the batch, set the poll state to Buffer, record whether the cursor is - // exhausted, and return the popped document. - Poll::Ready(Ok(get_more_result)) => { - let mut buffer: VecDeque<_> = get_more_result.batch.into_iter().collect(); - let next_doc = buffer.pop_front(); - - self.state = State::Buffer(buffer); - self.exhausted = get_more_result.exhausted; - Poll::Ready(next_doc.map(Ok)) - } - - // If the getMore finished with an error, return that error. - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), - - // If the getMore has not finished, keep the state as Executing and return. - Poll::Pending => Poll::Pending, - }, - - State::Buffer(ref mut buffer) => { - // If there is a document ready, return it. - let poll = if let Some(doc) = buffer.pop_front() { - Poll::Ready(Some(Ok(doc))) - // If the cursor is exhausted, return None. - } else if exhausted { - Poll::Ready(None) - // Since no document is ready and the cursor isn't exhausted, we need to start a new - // getMore operation, so return that the operation is pending. - } else { - Poll::Pending - }; - - // If no documents are left and the batch and the cursor is exhausted, set the state - // to None. - if buffer.is_empty() && exhausted { - self.state = State::Exhausted; - // If the batch is empty and the cursor is not exhausted, start a new operation and - // set the state to Executing. - } else if buffer.is_empty() { - let future = Box::pin( - self.client - .clone() - .execute_operation_owned(self.get_more.clone()), - ); - - self.state = State::Executing(future); - }; - - poll - } - - // If the state is None, then the cursor has already exhausted all its results, so do - // nothing. - State::Exhausted => Poll::Ready(None), - } - } -} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs deleted file mode 100644 index 23cccd7a1..000000000 --- a/src/cursor/mod.rs +++ /dev/null @@ -1,114 +0,0 @@ -mod impatient; - -use std::{ - collections::VecDeque, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use bson::Document; -use futures::stream::Stream; - -use crate::{error::Result, options::StreamAddress, Client, Namespace}; -pub use impatient::ImpatientCursor; - -/// A `Cursor` streams the result of a query. When a query is made, a `Cursor` will be returned with -/// the first batch of results from the server; the documents will be returned as the `Cursor` is -/// iterated. When the batch is exhausted and if there are more results, the `Cursor` will fetch the -/// next batch of documents, and so forth until the results are exhausted. Note that because of this -/// batching, additional network I/O may occur on any given call to `Cursor::next`. Because of this, -/// a `Cursor` iterates over `Result` items rather than simply `Document` items. -/// -/// The batch size of the `Cursor` can be configured using the options to the method that returns -/// it. For example, setting the `batch_size` field of -/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the -/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find). -/// -/// Note that the batch size determines both the number of documents stored in memory by the -/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all -/// results from the server; both of these factors should be taken into account when choosing the -/// optimal batch size. -/// -/// A cursor can be used like any other [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html). The simplest way is just to iterate over the -/// documents it yields: -/// -/// ```rust -/// # use futures::stream::StreamExt; -/// # use mongodb::{Client, error::Result}; -/// # -/// # async fn do_stuff() -> Result<()> { -/// # let client = Client::with_uri_str("mongodb://example.com").await?; -/// # let coll = client.database("foo").collection("bar"); -/// # let mut cursor = coll.find(None, None).await?; -/// # -/// while let Some(doc) = cursor.next().await { -/// println!("{}", doc?) -/// } -/// # -/// # Ok(()) -/// # } -/// ``` -/// -/// Additionally, all the other methods that an [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) has are available on `Cursor` as well. -/// This includes all of the functionality provided by [`StreamExt`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html), which provides similar functionality to the standard library `Iterator` trait. -/// For instance, if the number of results from a query is known to be small, it might make sense -/// to collect them into a vector: -/// -/// ```rust -/// # use bson::{doc, bson, Document}; -/// # use futures::stream::StreamExt; -/// # use mongodb::{Client, error::Result}; -/// # -/// # async fn do_stuff() -> Result<()> { -/// # let client = Client::with_uri_str("mongodb://example.com").await?; -/// # let coll = client.database("foo").collection("bar"); -/// # let cursor = coll.find(Some(doc! { "x": 1 }), None).await?; -/// # -/// let results: Vec> = cursor.collect().await; -/// # Ok(()) -/// # } -/// ``` -#[derive(Debug)] -pub struct Cursor { - inner: ImpatientCursor, -} - -impl Cursor { - pub(crate) fn new(client: Client, spec: CursorSpecification) -> Self { - Self { - inner: ImpatientCursor::new(client, spec), - } - } - - pub fn into_impatent(self) -> ImpatientCursor { - self.inner - } -} - -impl Stream for Cursor { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(result)) => return Poll::Ready(Some(result)), - Poll::Ready(None) if self.inner.exhausted() => break, - Poll::Ready(None) => {} - Poll::Pending => return Poll::Pending, - } - } - - Poll::Ready(None) - } -} - -#[derive(Debug)] -pub(crate) struct CursorSpecification { - pub(crate) ns: Namespace, - pub(crate) address: StreamAddress, - pub(crate) id: i64, - pub(crate) batch_size: Option, - pub(crate) max_time: Option, - pub(crate) buffer: VecDeque, -} diff --git a/src/test/cursor.rs b/src/test/cursor.rs new file mode 100644 index 000000000..ac8b6201a --- /dev/null +++ b/src/test/cursor.rs @@ -0,0 +1,79 @@ +use std::time::Duration; + +use bson::doc; +use futures::{future::Either, StreamExt}; + +use crate::{ + options::{CreateCollectionOptions, CursorType, FindOptions}, + test::{TestClient, LOCK}, + RUNTIME, +}; + +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +#[function_name::named] +async fn tailable_cursor() { + let _guard = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + let coll = client + .create_fresh_collection( + function_name!(), + function_name!(), + CreateCollectionOptions::builder() + .capped(true) + .max(5) + .size(1_000_000) + .build(), + ) + .await; + + coll.insert_many((0..5).map(|i| doc! { "_id": i }), None) + .await + .unwrap(); + + let await_time = Duration::from_millis(500); + + let mut cursor = coll + .find( + None, + FindOptions::builder() + .cursor_type(CursorType::TailableAwait) + .max_await_time(await_time) + .build(), + ) + .await + .unwrap(); + + for i in 0..5 { + assert_eq!( + cursor.next().await.transpose().unwrap(), + Some(doc! { "_id": i }) + ); + } + + let delay = RUNTIME.delay_for(await_time); + let next_doc = cursor.next(); + + let next_doc = match futures::future::select(Box::pin(delay), Box::pin(next_doc)).await { + Either::Left((_, next_doc)) => next_doc, + Either::Right((next_doc, _)) => panic!( + "should have timed out before getting next document, but instead immediately got + {:?}", + next_doc + ), + }; + + coll.insert_one(doc! { "_id": 5 }, None).await.unwrap(); + + let delay = RUNTIME.delay_for(await_time); + + match futures::future::select(Box::pin(delay), Box::pin(next_doc)).await { + Either::Left((_, next_doc)) => { + panic!("should have gotten next document, but instead timed") + } + Either::Right((next_doc, _)) => { + assert_eq!(next_doc.transpose().unwrap(), Some(doc! { "_id": 5 })) + } + }; +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 46eeac229..f04749534 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,6 +1,7 @@ mod atlas_connectivity; mod client; mod coll; +mod cursor; mod db; mod spec; mod util; diff --git a/src/test/spec/crud/aggregate.rs b/src/test/spec/crud/aggregate.rs index d4268c520..4b792527c 100644 --- a/src/test/spec/crud/aggregate.rs +++ b/src/test/spec/crud/aggregate.rs @@ -62,9 +62,14 @@ async fn run_aggregate_test(test_file: TestFile) { .await .expect(&test_case.description); + let results = cursor + .try_collect::>() + .await + .expect(&test_case.description); + assert_eq!( outcome.result.unwrap_or_default(), - cursor.try_collect::>().await.unwrap(), + results, "{}", test_case.description, ); diff --git a/src/test/util/mod.rs b/src/test/util/mod.rs index 03b7c070d..b5265bbb4 100644 --- a/src/test/util/mod.rs +++ b/src/test/util/mod.rs @@ -18,7 +18,7 @@ use self::event::EventHandler; use super::CLIENT_OPTIONS; use crate::{ error::{CommandError, ErrorKind, Result}, - options::{auth::AuthMechanism, ClientOptions}, + options::{auth::AuthMechanism, ClientOptions, CreateCollectionOptions}, Client, Collection, }; @@ -107,6 +107,21 @@ impl TestClient { coll } + pub async fn create_fresh_collection( + &self, + db_name: &str, + coll_name: &str, + options: impl Into>, + ) -> Collection { + self.drop_collection(db_name, coll_name).await; + self.database(db_name) + .create_collection(coll_name, options) + .await + .unwrap(); + + self.get_coll(db_name, coll_name) + } + pub fn auth_enabled(&self) -> bool { self.options.credential.is_some() }