Skip to content

Commit fdb09a1

Browse files
committed
WIP
1 parent ba4eb2d commit fdb09a1

File tree

5 files changed

+132
-3
lines changed

5 files changed

+132
-3
lines changed

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-grpc-clients/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ documentation = "https://quickwit.io/docs/"
1212
[dependencies]
1313
anyhow = { workspace = true }
1414
async-trait = { workspace = true }
15+
bytes = { workspace = true }
1516
tokio = { workspace = true }
1617
tokio-stream = { workspace = true }
1718
tonic = { workspace = true }
1819
tower = { workspace = true }
1920
tracing = { workspace = true }
2021

22+
quickwit-actors = { workspace = true }
2123
quickwit-common = { workspace = true }
2224
quickwit-config = { workspace = true }
2325
quickwit-proto = { workspace = true }

quickwit/quickwit-grpc-clients/src/balance_channel.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use std::collections::HashSet;
21+
use std::marker::PhantomData;
2122
use std::net::SocketAddr;
2223
use std::ops::Sub;
24+
use std::pin::Pin;
25+
use std::task::{Poll, Context};
2326
use std::time::Duration;
2427

2528
use quickwit_cluster::ClusterMember;
2629
use quickwit_config::service::QuickwitService;
2730
use tokio::sync::mpsc::Sender;
2831
use tokio::sync::watch;
29-
use tokio::sync::watch::Receiver;
30-
use tokio_stream::wrappers::WatchStream;
32+
use tokio::sync::mpsc::Receiver;
33+
use tokio_stream::{wrappers::WatchStream, Stream};
3134
use tokio_stream::StreamExt;
3235
use tonic::transport::{Channel, Endpoint, Uri};
3336
use tower::discover::Change;
@@ -51,7 +54,7 @@ const CLIENT_TIMEOUT_DURATION: Duration = if cfg!(any(test, feature = "testsuite
5154
pub async fn create_balance_channel_from_watched_members(
5255
mut members_watch_channel: WatchStream<Vec<ClusterMember>>,
5356
service: QuickwitService,
54-
) -> anyhow::Result<(Timeout<Channel>, Receiver<usize>)> {
57+
) -> anyhow::Result<(Timeout<Channel>, tokio::sync::watch::Receiver<usize>)> {
5558
// Create a balance channel whose endpoint can be updated thanks to a sender.
5659
let (channel, channel_tx) = Channel::balance_channel(10);
5760

quickwit/quickwit-grpc-clients/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
pub mod balance_channel;
2121
pub mod control_plane_client;
22+
pub mod service_discovery;
2223

2324
pub use balance_channel::create_balance_channel_from_watched_members;
2425
pub use control_plane_client::ControlPlaneGrpcClient;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright (C) 2022 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at [email protected].
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use std::marker::PhantomData;
21+
use std::net::SocketAddr;
22+
use std::pin::Pin;
23+
use std::task::{Poll, Context};
24+
25+
use async_trait::async_trait;
26+
use quickwit_actors::{Mailbox, Actor};
27+
use quickwit_cluster::ClusterMember;
28+
use quickwit_config::service::QuickwitService;
29+
use quickwit_proto::search_service_client::SearchServiceClient;
30+
use quickwit_proto::{SearchRequest, SearchResponse};
31+
use tokio::sync::mpsc::Receiver;
32+
use tokio_stream::Stream;
33+
use tonic::transport::Channel;
34+
use tower::discover::Change;
35+
36+
type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
37+
38+
pub trait Service: Clone + Send + Sync + 'static {
39+
/// Returns the [`QuickwitService`] of the client.
40+
fn qw_service() -> QuickwitService;
41+
/// Builds a client from a [`SocketAddr`].
42+
fn build(addr: SocketAddr) -> anyhow::Result<Self>;
43+
}
44+
45+
pub struct DiscoverServiceStream<T: Service> {
46+
changes: Receiver<Change<SocketAddr, ClusterMember>>,
47+
_phantom: PhantomData<T>,
48+
}
49+
50+
impl<T: Service> DiscoverServiceStream<T> {
51+
pub fn new(changes: Receiver<Change<SocketAddr, ClusterMember>>) -> Self {
52+
Self { changes, _phantom: PhantomData }
53+
}
54+
}
55+
56+
impl<T: Service> Stream for DiscoverServiceStream<T> {
57+
type Item = DiscoverResult<SocketAddr, T, anyhow::Error>;
58+
59+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60+
let c = &mut self.changes;
61+
match Pin::new(&mut *c).poll_recv(cx) {
62+
Poll::Pending | Poll::Ready(None) => Poll::Pending,
63+
Poll::Ready(Some(change)) => match change {
64+
Change::Insert(address, cluster_member) => {
65+
let new_opt_service = if cluster_member.enabled_services.contains(&T::qw_service()) {
66+
let service = T::build(address)
67+
.map(|client| Change::Insert(address, client));
68+
Some(service)
69+
} else {
70+
None
71+
};
72+
Poll::Ready(new_opt_service)
73+
}
74+
Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
75+
},
76+
}
77+
}
78+
}
79+
80+
// Are we sure about that????
81+
impl<T: Service> Unpin for DiscoverServiceStream<T> {}
82+
83+
#[async_trait]
84+
pub trait SearchService {
85+
async fn root_search(
86+
&mut self,
87+
request: impl tonic::IntoRequest<SearchRequest> + Send + Sync,
88+
) -> Result<tonic::Response<SearchResponse>, tonic::Status>;
89+
}
90+
91+
#[derive(Clone)]
92+
struct SearchClient {
93+
inner: SearchServiceClient<Channel>,
94+
}
95+
96+
#[async_trait]
97+
impl SearchService for SearchClient where
98+
{
99+
async fn root_search(
100+
&mut self,
101+
request: impl tonic::IntoRequest<SearchRequest> + Send + Sync,
102+
) -> Result<tonic::Response<SearchResponse>, tonic::Status> {
103+
self.inner.root_search(request).await
104+
}
105+
}
106+
107+
108+
struct LocalSearchClient<T: Actor> {
109+
inner: Mailbox<T>
110+
}
111+
112+
#[async_trait]
113+
impl<T: Actor> SearchService for LocalSearchClient<T> {
114+
async fn root_search(
115+
&mut self,
116+
request: impl tonic::IntoRequest<SearchRequest> + Send + Sync,
117+
) -> Result<tonic::Response<SearchResponse>, tonic::Status> {
118+
let response = SearchResponse::default();
119+
Ok(tonic::Response::new(response))
120+
}
121+
}

0 commit comments

Comments
 (0)