Skip to content

Commit c747515

Browse files
authored
RUST-354 Fix handling of tailable cursors (#162)
1 parent 9ab0e15 commit c747515

File tree

8 files changed

+268
-265
lines changed

8 files changed

+268
-265
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ version = "1.0.98"
6262
features = ["derive"]
6363

6464
[dependencies.tokio]
65-
version = "0.2.15"
65+
version = "0.2.18"
6666
features = ["io-util", "sync"]
6767

6868
[dependencies.tokio-rustls]

src/cursor.rs

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
use std::{
2+
collections::VecDeque,
3+
future::Future,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
time::Duration,
7+
};
8+
9+
use bson::{doc, Document};
10+
use derivative::Derivative;
11+
use futures::{future::BoxFuture, stream::Stream};
12+
13+
use crate::{
14+
error::Result,
15+
operation::GetMore,
16+
options::StreamAddress,
17+
results::GetMoreResult,
18+
Client,
19+
Namespace,
20+
RUNTIME,
21+
};
22+
23+
#[derive(Debug)]
24+
pub(crate) struct CursorSpecification {
25+
pub(crate) ns: Namespace,
26+
pub(crate) address: StreamAddress,
27+
pub(crate) id: i64,
28+
pub(crate) batch_size: Option<u32>,
29+
pub(crate) max_time: Option<Duration>,
30+
pub(crate) buffer: VecDeque<Document>,
31+
}
32+
33+
#[derive(Debug)]
34+
pub struct Cursor {
35+
client: Client,
36+
get_more: GetMore,
37+
exhausted: bool,
38+
state: State,
39+
}
40+
41+
type GetMoreFuture = BoxFuture<'static, Result<GetMoreResult>>;
42+
43+
/// Describes the current state of the Cursor. If the state is Executing, then a getMore operation
44+
/// is in progress. If the state is Buffer, then there are documents available from the current
45+
/// batch.
46+
#[derive(Derivative)]
47+
#[derivative(Debug)]
48+
enum State {
49+
Executing(#[derivative(Debug = "ignore")] GetMoreFuture),
50+
Buffer(VecDeque<Document>),
51+
Exhausted,
52+
}
53+
54+
impl Cursor {
55+
pub(crate) fn new(client: Client, spec: CursorSpecification) -> Self {
56+
let get_more = GetMore::new(
57+
spec.ns,
58+
spec.id,
59+
spec.address,
60+
spec.batch_size,
61+
spec.max_time,
62+
);
63+
64+
Self {
65+
client,
66+
get_more,
67+
exhausted: spec.id == 0,
68+
state: State::Buffer(spec.buffer),
69+
}
70+
}
71+
}
72+
73+
impl Drop for Cursor {
74+
fn drop(&mut self) {
75+
if self.exhausted {
76+
return;
77+
}
78+
79+
let namespace = self.get_more.namespace().clone();
80+
let client = self.client.clone();
81+
let cursor_id = self.get_more.cursor_id();
82+
83+
RUNTIME.execute(async move {
84+
let _: Result<_> = client
85+
.database(&namespace.db)
86+
.run_command(
87+
doc! {
88+
"killCursors": &namespace.coll,
89+
"cursors": [cursor_id]
90+
},
91+
None,
92+
)
93+
.await;
94+
})
95+
}
96+
}
97+
98+
impl Stream for Cursor {
99+
type Item = Result<Document>;
100+
101+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
102+
loop {
103+
match self.state {
104+
// If the current state is Executing, then we check the progress of the getMore
105+
// operation.
106+
State::Executing(ref mut future) => {
107+
match Pin::new(future).poll(cx) {
108+
// If the getMore is finished and successful, then we pop off the first
109+
// document from the batch, set the poll state to
110+
// Buffer, record whether the cursor is exhausted,
111+
// and return the popped document.
112+
Poll::Ready(Ok(get_more_result)) => {
113+
let mut buffer: VecDeque<_> =
114+
get_more_result.batch.into_iter().collect();
115+
let next_doc = buffer.pop_front();
116+
117+
self.state = State::Buffer(buffer);
118+
self.exhausted = get_more_result.exhausted;
119+
120+
match next_doc {
121+
Some(doc) => return Poll::Ready(Some(Ok(doc))),
122+
None => continue,
123+
}
124+
}
125+
126+
// If the getMore finished with an error, return that error.
127+
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
128+
129+
// If the getMore has not finished, keep the state as Executing and return.
130+
Poll::Pending => return Poll::Pending,
131+
}
132+
}
133+
134+
State::Buffer(ref mut buffer) => {
135+
// If there is a document ready, return it.
136+
if let Some(doc) = buffer.pop_front() {
137+
return Poll::Ready(Some(Ok(doc)));
138+
}
139+
140+
// If no documents are left and the batch and the cursor is exhausted, set the
141+
// state to None.
142+
if self.exhausted {
143+
self.state = State::Exhausted;
144+
return Poll::Ready(None);
145+
// If the batch is empty and the cursor is not exhausted, start a new operation
146+
// and set the state to Executing.
147+
} else {
148+
let future = Box::pin(
149+
self.client
150+
.clone()
151+
.execute_operation_owned(self.get_more.clone()),
152+
);
153+
154+
self.state = State::Executing(future as GetMoreFuture);
155+
continue;
156+
}
157+
}
158+
159+
// If the state is None, then the cursor has already exhausted all its results, so
160+
// do nothing.
161+
State::Exhausted => return Poll::Ready(None),
162+
}
163+
}
164+
}
165+
}

src/cursor/impatient.rs

-148
This file was deleted.

0 commit comments

Comments
 (0)