Skip to content

Commit af429b4

Browse files
authored
feat(pageserver): observability for feature flags (neondatabase#12034)
## Problem Part of neondatabase#11813. This pull request adds misc observability improvements for the functionality. ## Summary of changes * Info span for the PostHog feature background loop. * New evaluate feature flag API. * Put the request error into the error message. * Log when feature flag gets updated. --------- Signed-off-by: Alex Chi Z <[email protected]>
1 parent 3b4d4eb commit af429b4

File tree

5 files changed

+102
-24
lines changed

5 files changed

+102
-24
lines changed

libs/posthog_client_lite/src/background_loop.rs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{sync::Arc, time::Duration};
44

55
use arc_swap::ArcSwap;
66
use tokio_util::sync::CancellationToken;
7+
use tracing::{Instrument, info_span};
78

89
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
910

@@ -26,31 +27,35 @@ impl FeatureResolverBackgroundLoop {
2627
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
2728
let this = self.clone();
2829
let cancel = self.cancel.clone();
29-
handle.spawn(async move {
30-
tracing::info!("Starting PostHog feature resolver");
31-
let mut ticker = tokio::time::interval(refresh_period);
32-
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
33-
loop {
34-
tokio::select! {
35-
_ = ticker.tick() => {}
36-
_ = cancel.cancelled() => break
37-
}
38-
let resp = match this
39-
.posthog_client
40-
.get_feature_flags_local_evaluation()
41-
.await
42-
{
43-
Ok(resp) => resp,
44-
Err(e) => {
45-
tracing::warn!("Cannot get feature flags: {}", e);
46-
continue;
30+
handle.spawn(
31+
async move {
32+
tracing::info!("Starting PostHog feature resolver");
33+
let mut ticker = tokio::time::interval(refresh_period);
34+
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
35+
loop {
36+
tokio::select! {
37+
_ = ticker.tick() => {}
38+
_ = cancel.cancelled() => break
4739
}
48-
};
49-
let feature_store = FeatureStore::new_with_flags(resp.flags);
50-
this.feature_store.store(Arc::new(feature_store));
40+
let resp = match this
41+
.posthog_client
42+
.get_feature_flags_local_evaluation()
43+
.await
44+
{
45+
Ok(resp) => resp,
46+
Err(e) => {
47+
tracing::warn!("Cannot get feature flags: {}", e);
48+
continue;
49+
}
50+
};
51+
let feature_store = FeatureStore::new_with_flags(resp.flags);
52+
this.feature_store.store(Arc::new(feature_store));
53+
tracing::info!("Feature flag updated");
54+
}
55+
tracing::info!("PostHog feature resolver stopped");
5156
}
52-
tracing::info!("PostHog feature resolver stopped");
53-
});
57+
.instrument(info_span!("posthog_feature_resolver")),
58+
);
5459
}
5560

5661
pub fn feature_store(&self) -> Arc<FeatureStore> {

libs/posthog_client_lite/src/lib.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,18 @@ impl FeatureStore {
448448
)))
449449
}
450450
}
451+
452+
/// Infer whether a feature flag is a boolean flag by checking if it has a multivariate filter.
453+
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
454+
if let Some(flag_config) = self.flags.get(flag_key) {
455+
Ok(flag_config.filters.multivariate.is_none())
456+
} else {
457+
Err(PostHogEvaluationError::NotAvailable(format!(
458+
"Not found in the local evaluation spec: {}",
459+
flag_key
460+
)))
461+
}
462+
}
451463
}
452464

453465
pub struct PostHogClientConfig {
@@ -528,7 +540,15 @@ impl PostHogClient {
528540
.bearer_auth(&self.config.server_api_key)
529541
.send()
530542
.await?;
543+
let status = response.status();
531544
let body = response.text().await?;
545+
if !status.is_success() {
546+
return Err(anyhow::anyhow!(
547+
"Failed to get feature flags: {}, {}",
548+
status,
549+
body
550+
));
551+
}
532552
Ok(serde_json::from_str(&body)?)
533553
}
534554

pageserver/src/feature_resolver.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,14 @@ impl FeatureResolver {
9191
))
9292
}
9393
}
94+
95+
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
96+
if let Some(inner) = &self.inner {
97+
inner.feature_store().is_feature_flag_boolean(flag_key)
98+
} else {
99+
Err(PostHogEvaluationError::NotAvailable(
100+
"PostHog integration is not enabled".to_string(),
101+
))
102+
}
103+
}
94104
}

pageserver/src/http/routes.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3663,6 +3663,46 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow
36633663
Ok(())
36643664
}
36653665

3666+
async fn tenant_evaluate_feature_flag(
3667+
request: Request<Body>,
3668+
_cancel: CancellationToken,
3669+
) -> Result<Response<Body>, ApiError> {
3670+
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
3671+
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
3672+
3673+
let flag: String = must_parse_query_param(&request, "flag")?;
3674+
let as_type: String = must_parse_query_param(&request, "as")?;
3675+
3676+
let state = get_state(&request);
3677+
3678+
async {
3679+
let tenant = state
3680+
.tenant_manager
3681+
.get_attached_tenant_shard(tenant_shard_id)?;
3682+
if as_type == "boolean" {
3683+
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
3684+
let result = result.map(|_| true).map_err(|e| e.to_string());
3685+
json_response(StatusCode::OK, result)
3686+
} else if as_type == "multivariate" {
3687+
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
3688+
json_response(StatusCode::OK, result)
3689+
} else {
3690+
// Auto infer the type of the feature flag.
3691+
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
3692+
if is_boolean {
3693+
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
3694+
let result = result.map(|_| true).map_err(|e| e.to_string());
3695+
json_response(StatusCode::OK, result)
3696+
} else {
3697+
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
3698+
json_response(StatusCode::OK, result)
3699+
}
3700+
}
3701+
}
3702+
.instrument(info_span!("tenant_evaluate_feature_flag", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
3703+
.await
3704+
}
3705+
36663706
/// Common functionality of all the HTTP API handlers.
36673707
///
36683708
/// - Adds a tracing span to each request (by `request_span`)
@@ -4039,5 +4079,8 @@ pub fn make_router(
40394079
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
40404080
|r| api_handler(r, activate_post_import_handler),
40414081
)
4082+
.get("/v1/tenant/:tenant_shard_id/feature_flag", |r| {
4083+
api_handler(r, tenant_evaluate_feature_flag)
4084+
})
40424085
.any(handler_404))
40434086
}

pageserver/src/tenant.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ pub struct TenantShard {
383383

384384
l0_flush_global_state: L0FlushGlobalState,
385385

386-
feature_resolver: FeatureResolver,
386+
pub(crate) feature_resolver: FeatureResolver,
387387
}
388388
impl std::fmt::Debug for TenantShard {
389389
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {

0 commit comments

Comments
 (0)