Skip to content

Remote IP available through context-object #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/application/src/cron_jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ impl<RT: Runtime> CronJobContext<RT> {
request_id.clone().unwrap_or_else(RequestId::new),
execution_id.clone().unwrap_or_else(ExecutionId::new),
caller.parent_scheduled_job(),
caller.remote_ip(),
caller.is_root(),
);
sentry::configure_scope(|scope| context.add_sentry_tags(scope));
Expand Down
1 change: 1 addition & 0 deletions crates/application/src/scheduled_jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
request_id.clone().unwrap_or_else(RequestId::new),
execution_id.clone().unwrap_or_else(ExecutionId::new),
caller.parent_scheduled_job(),
caller.remote_ip(),
caller.is_root(),
);
sentry::configure_scope(|scope| context.add_sentry_tags(scope));
Expand Down
14 changes: 7 additions & 7 deletions crates/application/src/tests/http_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn test_http_action_basic(rt: TestRuntime) -> anyhow::Result<()> {
common::RequestId::new(),
http_request,
Identity::system(),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
response_streamer,
)
.await?;
Expand Down Expand Up @@ -134,7 +134,7 @@ async fn test_http_action_error(rt: TestRuntime) -> anyhow::Result<()> {
common::RequestId::new(),
http_request,
Identity::system(),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
response_streamer,
)
.await?;
Expand Down Expand Up @@ -191,7 +191,7 @@ async fn test_http_action_not_found(rt: TestRuntime) -> anyhow::Result<()> {
common::RequestId::new(),
http_request,
Identity::system(),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
response_streamer,
)
.await?;
Expand Down Expand Up @@ -243,7 +243,7 @@ async fn test_http_action_disconnect_before_head(
common::RequestId::new(),
http_request,
Identity::system(),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
response_streamer,
));
select! {
Expand Down Expand Up @@ -307,7 +307,7 @@ async fn test_http_action_disconnect_while_streaming(
common::RequestId::new(),
http_request,
Identity::system(),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
response_streamer,
)
.await
Expand Down Expand Up @@ -382,7 +382,7 @@ async fn test_http_action_continues_after_client_disconnects(
common::RequestId::new(),
http_request,
Identity::system(),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
response_streamer,
)
.await
Expand Down Expand Up @@ -414,7 +414,7 @@ async fn test_http_action_continues_after_client_disconnects(
udf_path: "functions:didWrite".parse()?,
},
vec![json!({})],
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
ExecuteQueryTimestamp::Latest,
None,
)
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/tests/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn test_udf_logs(rt: TestRuntime) -> anyhow::Result<()> {
PublicFunctionPath::Component(path),
vec![],
Identity::system(),
FunctionCaller::SyncWorker(ClientVersion::unknown()),
FunctionCaller::SyncWorker(ClientVersion::unknown(), None),
)
.await?;
assert!(result.result.is_ok());
Expand Down
6 changes: 3 additions & 3 deletions crates/application/src/tests/returns_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn run_zero_arg_mutation(
vec![obj],
Identity::user(UserIdentity::test()),
None,
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
None,
)
.await
Expand All @@ -60,7 +60,7 @@ async fn run_zero_arg_query(
}),
vec![obj],
Identity::user(UserIdentity::test()),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
)
.await
}
Expand All @@ -79,7 +79,7 @@ async fn run_zero_arg_action(
}),
vec![obj],
Identity::user(UserIdentity::test()),
FunctionCaller::HttpEndpoint,
FunctionCaller::HttpEndpoint(None),
)
.await
}
Expand Down
34 changes: 34 additions & 0 deletions crates/common/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
Display,
Formatter,
},
net::SocketAddr,
str::FromStr,
};

Expand Down Expand Up @@ -39,6 +40,8 @@ pub struct ExecutionContext {
pub execution_id: ExecutionId,
/// The id of the scheduled job that triggered this UDF, if any.
pub parent_scheduled_job: Option<(ComponentId, DeveloperDocumentId)>,
/// The remote IP address of the client that initiated this request
pub remote_ip: Option<SocketAddr>,
/// False if this function was called as part of a request (e.g. action
/// calling a mutation) TODO: This is a stop gap solution. The richer
/// version of this would be something like parent_execution_id:
Expand All @@ -48,10 +51,26 @@ pub struct ExecutionContext {

impl ExecutionContext {
pub fn new(request_id: RequestId, caller: &FunctionCaller) -> Self {
let remote_ip = caller.remote_ip();
Self {
request_id,
execution_id: ExecutionId::new(),
parent_scheduled_job: caller.parent_scheduled_job(),
remote_ip,
is_root: caller.is_root(),
}
}

pub fn new_with_remote_ip(
request_id: RequestId,
caller: &FunctionCaller,
remote_ip: Option<SocketAddr>,
) -> Self {
Self {
request_id,
execution_id: ExecutionId::new(),
parent_scheduled_job: caller.parent_scheduled_job(),
remote_ip,
is_root: caller.is_root(),
}
}
Expand All @@ -60,12 +79,14 @@ impl ExecutionContext {
request_id: RequestId,
execution_id: ExecutionId,
parent_scheduled_job: Option<(ComponentId, DeveloperDocumentId)>,
remote_ip: Option<SocketAddr>,
is_root: bool,
) -> Self {
Self {
request_id,
execution_id,
parent_scheduled_job,
remote_ip,
is_root,
}
}
Expand All @@ -80,6 +101,7 @@ impl ExecutionContext {
request_id: RequestId::new(),
execution_id: ExecutionId::new(),
parent_scheduled_job: None,
remote_ip: None,
is_root: true,
}
}
Expand All @@ -97,6 +119,9 @@ impl HeapSize for ExecutionContext {
+ self
.parent_scheduled_job
.map_or(0, |(_, document_id)| document_id.heap_size())
+ self
.remote_ip
.map_or(0, |_| std::mem::size_of::<std::net::SocketAddr>())
+ self.is_root.heap_size()
}
}
Expand Down Expand Up @@ -259,6 +284,7 @@ impl From<ExecutionContext> for pb::common::ExecutionContext {
pb::common::ExecutionContext {
request_id: Some(value.request_id.into()),
execution_id: Some(value.execution_id.to_string()),
remote_ip: value.remote_ip.map(|addr| addr.to_string()),
parent_scheduled_job_component_id: parent_component_id
.and_then(|id| id.serialize_to_string()),
parent_scheduled_job: parent_document_id.map(Into::into),
Expand All @@ -275,13 +301,19 @@ impl TryFrom<pb::common::ExecutionContext> for ExecutionContext {
value.parent_scheduled_job_component_id.as_deref(),
)?;
let parent_document_id = value.parent_scheduled_job.map(|s| s.parse()).transpose()?;
let remote_ip = value
.remote_ip
.map(|s| s.parse())
.transpose()
.context("Invalid remote IP address")?;
Ok(Self {
request_id: RequestId::from_str(&value.request_id.context("Missing request id")?)?,
execution_id: match &value.execution_id {
Some(e) => ExecutionId::from_str(e)?,
None => ExecutionId::new(),
},
parent_scheduled_job: parent_document_id.map(|id| (parent_component_id, id)),
remote_ip,
is_root: value.is_root.unwrap_or_default(),
})
}
Expand All @@ -290,10 +322,12 @@ impl TryFrom<pb::common::ExecutionContext> for ExecutionContext {
impl From<ExecutionContext> for JsonValue {
fn from(value: ExecutionContext) -> Self {
let (parent_component_id, parent_document_id) = value.parent_scheduled_job.unzip();
let remote_ip_str = value.remote_ip.map(|addr| addr.to_string());
json!({
"requestId": String::from(value.request_id),
"executionId": value.execution_id.to_string(),
"isRoot": value.is_root,
"remoteIp": remote_ip_str,
"parentScheduledJob": parent_document_id.map(|id| id.to_string()),
"parentScheduledJobComponentId": parent_component_id.unwrap_or(ComponentId::Root).serialize_to_string(),
})
Expand Down
68 changes: 42 additions & 26 deletions crates/common/src/types/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
self,
Debug,
},
net::SocketAddr,
str::FromStr,
};

Expand Down Expand Up @@ -145,13 +146,13 @@ pub enum AllowedVisibility {
#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum FunctionCaller {
SyncWorker(ClientVersion),
HttpApi(ClientVersion),
SyncWorker(ClientVersion, Option<SocketAddr>),
HttpApi(ClientVersion, Option<SocketAddr>),
/// Used by function tester in the dashboard
Tester(ClientVersion),
// This is a user defined http actions called externally. If the http action
// calls other functions, their caller would be `Action`.
HttpEndpoint,
HttpEndpoint(Option<SocketAddr>),
Cron,
Scheduler {
job_id: DeveloperDocumentId,
Expand All @@ -168,10 +169,10 @@ pub enum FunctionCaller {
impl FunctionCaller {
pub fn client_version(&self) -> Option<ClientVersion> {
match self {
FunctionCaller::SyncWorker(c) => Some(c),
FunctionCaller::HttpApi(c) => Some(c),
FunctionCaller::SyncWorker(c, _) => Some(c),
FunctionCaller::HttpApi(c, _) => Some(c),
FunctionCaller::Tester(c) => Some(c),
FunctionCaller::HttpEndpoint
FunctionCaller::HttpEndpoint(_)
| FunctionCaller::Cron
| FunctionCaller::Scheduler { .. }
| FunctionCaller::Action { .. } => None,
Expand All @@ -183,10 +184,10 @@ impl FunctionCaller {

pub fn parent_scheduled_job(&self) -> Option<(ComponentId, DeveloperDocumentId)> {
match self {
FunctionCaller::SyncWorker(_)
| FunctionCaller::HttpApi(_)
FunctionCaller::SyncWorker(_, _)
| FunctionCaller::HttpApi(_, _)
| FunctionCaller::Tester(_)
| FunctionCaller::HttpEndpoint
| FunctionCaller::HttpEndpoint(_)
| FunctionCaller::Cron => None,
#[cfg(any(test, feature = "testing"))]
FunctionCaller::Test => None,
Expand All @@ -200,12 +201,27 @@ impl FunctionCaller {
}
}

pub fn remote_ip(&self) -> Option<SocketAddr> {
let remote_ip = match self {
FunctionCaller::SyncWorker(_, remote_ip)
| FunctionCaller::HttpApi(_, remote_ip)
| FunctionCaller::HttpEndpoint(remote_ip) => *remote_ip,
FunctionCaller::Tester(_)
| FunctionCaller::Cron
| FunctionCaller::Scheduler { .. }
| FunctionCaller::Action { .. } => None,
#[cfg(any(test, feature = "testing"))]
FunctionCaller::Test => None,
};
remote_ip
}

pub fn is_root(&self) -> bool {
match self {
FunctionCaller::SyncWorker(_)
| FunctionCaller::HttpApi(_)
FunctionCaller::SyncWorker(_, _)
| FunctionCaller::HttpApi(_, _)
| FunctionCaller::Tester(_)
| FunctionCaller::HttpEndpoint
| FunctionCaller::HttpEndpoint(_)
| FunctionCaller::Cron
| FunctionCaller::Scheduler { .. } => true,
FunctionCaller::Action { .. } => false,
Expand All @@ -219,9 +235,9 @@ impl FunctionCaller {
// to run it even if the client goes away. However, we preserve the right
// to interrupt actions if the backend restarts.
match self {
FunctionCaller::SyncWorker(_)
| FunctionCaller::HttpApi(_)
| FunctionCaller::HttpEndpoint
FunctionCaller::SyncWorker(_, _)
| FunctionCaller::HttpApi(_, _)
| FunctionCaller::HttpEndpoint(_)
| FunctionCaller::Tester(_) => true,
FunctionCaller::Cron
| FunctionCaller::Scheduler { .. }
Expand All @@ -233,13 +249,13 @@ impl FunctionCaller {

pub fn allowed_visibility(&self) -> AllowedVisibility {
match self {
FunctionCaller::SyncWorker(_) | FunctionCaller::HttpApi(_) => {
FunctionCaller::SyncWorker(_, _) | FunctionCaller::HttpApi(_, _) => {
AllowedVisibility::PublicOnly
},
// NOTE: Allowed visibility doesn't make sense in the context of an
// user defined http action since all http actions are public, and
// we shouldn't be checking visibility. We define this for completeness.
FunctionCaller::HttpEndpoint => AllowedVisibility::PublicOnly,
FunctionCaller::HttpEndpoint(_) => AllowedVisibility::PublicOnly,
FunctionCaller::Tester(_)
| FunctionCaller::Cron
| FunctionCaller::Scheduler { .. }
Expand All @@ -253,10 +269,10 @@ impl FunctionCaller {
impl fmt::Display for FunctionCaller {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
FunctionCaller::SyncWorker(_) => "SyncWorker",
FunctionCaller::HttpApi(_) => "HttpApi",
FunctionCaller::SyncWorker(_, _) => "SyncWorker",
FunctionCaller::HttpApi(_, _) => "HttpApi",
FunctionCaller::Tester(_) => "Tester",
FunctionCaller::HttpEndpoint => "HttpEndpoint",
FunctionCaller::HttpEndpoint(_) => "HttpEndpoint",
FunctionCaller::Cron => "Cron",
FunctionCaller::Scheduler { .. } => "Scheduler",
FunctionCaller::Action { .. } => "Action",
Expand All @@ -270,16 +286,16 @@ impl fmt::Display for FunctionCaller {
impl From<FunctionCaller> for pb::common::FunctionCaller {
fn from(caller: FunctionCaller) -> Self {
let caller = match caller {
FunctionCaller::SyncWorker(client_version) => {
FunctionCaller::SyncWorker(client_version, _) => {
pb::common::function_caller::Caller::SyncWorker(client_version.into())
},
FunctionCaller::HttpApi(client_version) => {
FunctionCaller::HttpApi(client_version, _) => {
pb::common::function_caller::Caller::HttpApi(client_version.into())
},
FunctionCaller::Tester(client_version) => {
pb::common::function_caller::Caller::Tester(client_version.into())
},
FunctionCaller::HttpEndpoint => pb::common::function_caller::Caller::HttpEndpoint(()),
FunctionCaller::HttpEndpoint(_) => pb::common::function_caller::Caller::HttpEndpoint(()),
FunctionCaller::Cron => pb::common::function_caller::Caller::Cron(()),
FunctionCaller::Scheduler {
job_id,
Expand Down Expand Up @@ -318,16 +334,16 @@ impl TryFrom<pb::common::FunctionCaller> for FunctionCaller {
fn try_from(msg: pb::common::FunctionCaller) -> anyhow::Result<Self> {
let caller = match msg.caller {
Some(pb::common::function_caller::Caller::SyncWorker(client_version)) => {
FunctionCaller::SyncWorker(client_version.try_into()?)
FunctionCaller::SyncWorker(client_version.try_into()?, None)
},
Some(pb::common::function_caller::Caller::HttpApi(client_version)) => {
FunctionCaller::HttpApi(client_version.try_into()?)
FunctionCaller::HttpApi(client_version.try_into()?, None)
},
Some(pb::common::function_caller::Caller::Tester(client_version)) => {
FunctionCaller::Tester(client_version.try_into()?)
},
Some(pb::common::function_caller::Caller::HttpEndpoint(())) => {
FunctionCaller::HttpEndpoint
FunctionCaller::HttpEndpoint(None)
},
Some(pb::common::function_caller::Caller::Cron(())) => FunctionCaller::Cron,
Some(pb::common::function_caller::Caller::Scheduler(caller)) => {
Expand Down
Loading