Skip to content

Commit 9f086a9

Browse files
authored
Merge pull request JanKaul#188 from splitgraph/load-tabular-gcp-object-store
feat: vend credentials when loading a tabular
2 parents 78cd606 + 634dc6b commit 9f086a9

File tree

3 files changed

+207
-49
lines changed

3 files changed

+207
-49
lines changed

catalogs/iceberg-rest-catalog/src/catalog.rs

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1+
use crate::{
2+
apis::{
3+
self,
4+
catalog_api_api::{self, NamespaceExistsError},
5+
configuration::Configuration,
6+
},
7+
models::{self, StorageCredential},
8+
};
19
use async_trait::async_trait;
210
use futures::{FutureExt, TryFutureExt};
11+
use iceberg_rust::object_store::parse::object_store_from_config;
312
/**
413
Iceberg rest catalog implementation
514
*/
@@ -25,21 +34,13 @@ use iceberg_rust::{
2534
table::Table,
2635
view::View,
2736
};
28-
use object_store::{aws::AmazonS3Builder, ObjectStore};
37+
use object_store::{aws::AmazonS3Builder, ObjectStore, ObjectStoreScheme};
2938
use std::{
3039
collections::HashMap,
3140
path::Path,
3241
sync::{Arc, RwLock},
3342
};
34-
35-
use crate::{
36-
apis::{
37-
self,
38-
catalog_api_api::{self, NamespaceExistsError},
39-
configuration::Configuration,
40-
},
41-
models::{self, StorageCredential},
42-
};
43+
use url::Url;
4344

4445
#[derive(Debug)]
4546
pub struct RestCatalog {
@@ -287,7 +288,7 @@ impl Catalog for RestCatalog {
287288
self.name.as_deref(),
288289
&identifier.namespace().to_string(),
289290
identifier.name(),
290-
None,
291+
Some("vended-credentials"),
291292
None,
292293
)
293294
.await
@@ -609,48 +610,24 @@ impl CatalogList for RestNoPrefixCatalogList {
609610
}
610611
}
611612

612-
const CLIENT_REGION: &str = "client.region";
613-
const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
614-
const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
615-
const AWS_SESSION_TOKEN: &str = "s3.session-token";
616-
617613
fn object_store_from_response(
618614
response: &models::LoadTableResult,
619615
) -> Result<Option<Arc<dyn ObjectStore>>, Error> {
620616
let config = match (&response.storage_credentials, &response.config) {
621-
(Some(credentials), _) => Some(&credentials[0].config),
622-
(None, Some(config)) => Some(config),
623-
(None, None) => None,
624-
};
625-
626-
let Some(config) = config else {
627-
return Ok(None);
628-
};
629-
630-
let region = config.get(CLIENT_REGION);
631-
if config.contains_key(AWS_ACCESS_KEY_ID) {
632-
let access_key_id = config.get(AWS_ACCESS_KEY_ID);
633-
let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY);
634-
let session_token = config.get(AWS_SESSION_TOKEN);
635-
let mut builder = AmazonS3Builder::new();
636-
637-
if let Some(region) = region {
638-
builder = builder.with_region(region)
639-
}
640-
if let Some(access_key_id) = access_key_id {
641-
builder = builder.with_access_key_id(access_key_id)
642-
}
643-
if let Some(secret_access_key) = secret_access_key {
644-
builder = builder.with_secret_access_key(secret_access_key)
645-
}
646-
if let Some(session_token) = session_token {
647-
builder = builder.with_token(session_token)
617+
(Some(credentials), Some(config)) => {
618+
// Enrich credentials with other options that might only be found in the config (e.g.
619+
// a custom endpoint)
620+
let mut options = credentials[0].config.clone();
621+
options.extend(config.clone());
622+
options
648623
}
624+
(Some(credentials), None) => credentials[0].config.clone(),
625+
(None, Some(config)) => config.clone(),
626+
(None, None) => return Ok(None),
627+
};
649628

650-
Ok(Some(Arc::new(builder.build()?)))
651-
} else {
652-
Ok(None)
653-
}
629+
let url = Url::parse(&response.metadata.location)?;
630+
Ok(Some(object_store_from_config(url, config)?))
654631
}
655632

656633
#[cfg(test)]

iceberg-rust/src/object_store/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use object_store::{
1414

1515
use crate::error::Error;
1616

17+
pub mod parse;
1718
pub mod store;
1819

1920
/// Type for buckets for different cloud providers
@@ -41,14 +42,22 @@ impl Bucket<'_> {
4142
/// Get the bucket and coud provider from the location string
4243
pub fn from_path(path: &str) -> Result<Bucket, Error> {
4344
if path.starts_with("s3://") || path.starts_with("s3a://") {
44-
let prefix = if path.starts_with("s3://") { "s3://" } else { "s3a://" };
45+
let prefix = if path.starts_with("s3://") {
46+
"s3://"
47+
} else {
48+
"s3a://"
49+
};
4550
path.trim_start_matches(prefix)
4651
.split('/')
4752
.next()
4853
.map(Bucket::S3)
4954
.ok_or(Error::NotFound(format!("Bucket in path {path}")))
5055
} else if path.starts_with("gcs://") || path.starts_with("gs://") {
51-
let prefix = if path.starts_with("gcs://") { "gcs://" } else { "gs://" };
56+
let prefix = if path.starts_with("gcs://") {
57+
"gcs://"
58+
} else {
59+
"gs://"
60+
};
5261
path.trim_start_matches(prefix)
5362
.split('/')
5463
.next()
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*! Utils for converting standard Iceberg config formats to equivalent `object_store` options
2+
*/
3+
4+
use crate::error::Error;
5+
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
6+
use object_store::gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey};
7+
use object_store::{parse_url_opts, ObjectStore, ObjectStoreScheme, StaticCredentialProvider};
8+
use std::collections::HashMap;
9+
use std::sync::Arc;
10+
use url::Url;
11+
12+
/// AWS configs
13+
const CLIENT_REGION: &str = "client.region";
14+
const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
15+
const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
16+
const AWS_SESSION_TOKEN: &str = "s3.session-token";
17+
const AWS_REGION: &str = "s3.region";
18+
const AWS_ENDPOINT: &str = "s3.endpoint";
19+
const AWS_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
20+
21+
/// GCP configs
22+
const GCS_BUCKET: &str = "gcs.bucket";
23+
const GCS_CREDENTIALS_JSON: &str = "gcs.credentials-json";
24+
const GCS_TOKEN: &str = "gcs.oauth2.token";
25+
26+
/// Parse the url and Iceberg format of variuos storage options into the equivalent `object_store`
27+
/// options and build the corresponding `ObjectStore`.
28+
pub fn object_store_from_config(
29+
url: Url,
30+
config: HashMap<String, String>,
31+
) -> Result<Arc<dyn ObjectStore>, Error> {
32+
let store = match ObjectStoreScheme::parse(&url).map_err(object_store::Error::from)? {
33+
(ObjectStoreScheme::AmazonS3, _) => {
34+
let mut builder = AmazonS3Builder::new().with_url(url);
35+
for (key, option) in config {
36+
let s3_key = match key.as_str() {
37+
AWS_ACCESS_KEY_ID => AmazonS3ConfigKey::AccessKeyId,
38+
AWS_SECRET_ACCESS_KEY => AmazonS3ConfigKey::SecretAccessKey,
39+
AWS_SESSION_TOKEN => AmazonS3ConfigKey::Token,
40+
CLIENT_REGION | AWS_REGION => AmazonS3ConfigKey::Region,
41+
AWS_ENDPOINT => {
42+
if option.starts_with("http://") {
43+
// This is mainly used for testing, e.g. against MinIO
44+
builder = builder.with_allow_http(true);
45+
}
46+
AmazonS3ConfigKey::Endpoint
47+
}
48+
AWS_ALLOW_ANONYMOUS => AmazonS3ConfigKey::SkipSignature,
49+
_ => continue,
50+
};
51+
builder = builder.with_config(s3_key, option);
52+
}
53+
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
54+
}
55+
56+
(ObjectStoreScheme::GoogleCloudStorage, _) => {
57+
let mut builder = GoogleCloudStorageBuilder::new().with_url(url);
58+
for (key, option) in config {
59+
let gcs_key = match key.as_str() {
60+
GCS_CREDENTIALS_JSON => GoogleConfigKey::ServiceAccountKey,
61+
GCS_BUCKET => GoogleConfigKey::Bucket,
62+
GCS_TOKEN => {
63+
let credential = GcpCredential { bearer: option };
64+
let credential_provider =
65+
Arc::new(StaticCredentialProvider::new(credential)) as _;
66+
builder = builder.with_credentials(credential_provider);
67+
continue;
68+
}
69+
_ => continue,
70+
};
71+
builder = builder.with_config(gcs_key, option);
72+
}
73+
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
74+
}
75+
76+
_ => {
77+
let (store, _path) = parse_url_opts(&url, config)?;
78+
store.into()
79+
}
80+
};
81+
82+
Ok(store)
83+
}
84+
85+
#[cfg(test)]
86+
mod tests {
87+
use super::*;
88+
use serde_json::json;
89+
use std::collections::HashMap;
90+
use url::Url;
91+
92+
#[test]
93+
fn test_s3_config_basic() {
94+
let url = Url::parse("s3://test-bucket/path").unwrap();
95+
let mut config = HashMap::new();
96+
config.insert(AWS_ACCESS_KEY_ID.to_string(), "test-key".to_string());
97+
config.insert(AWS_SECRET_ACCESS_KEY.to_string(), "test-secret".to_string());
98+
config.insert(AWS_SESSION_TOKEN.to_string(), "test-session".to_string());
99+
config.insert(AWS_REGION.to_string(), "us-east-1".to_string());
100+
101+
let store = object_store_from_config(url, config).unwrap();
102+
let store_repr = format!("{:?}", store);
103+
104+
assert!(store_repr.contains("region: \"us-east-1\""));
105+
assert!(store_repr.contains("bucket: \"test-bucket\""));
106+
assert!(store_repr.contains("key_id: \"test-key\""));
107+
assert!(store_repr.contains("secret_key: \"test-secret\""));
108+
assert!(store_repr.contains("token: Some(\"test-session\")"));
109+
assert!(store_repr.contains("endpoint: None"));
110+
assert!(store_repr.contains("allow_http: Parsed(false)"));
111+
assert!(store_repr.contains("skip_signature: false"));
112+
}
113+
114+
#[test]
115+
fn test_s3_config_with_http_endpoint() {
116+
let url = Url::parse("s3://test-bucket/").unwrap();
117+
let mut config = HashMap::new();
118+
config.insert(
119+
AWS_ENDPOINT.to_string(),
120+
"http://localhost:9000".to_string(),
121+
);
122+
config.insert(AWS_ALLOW_ANONYMOUS.to_string(), "true".to_string());
123+
124+
let store = object_store_from_config(url, config).unwrap();
125+
let store_repr = format!("{:?}", store);
126+
127+
assert!(store_repr.contains("region: \"us-east-1\""));
128+
assert!(store_repr.contains("bucket: \"test-bucket\""));
129+
assert!(!store_repr.contains("key_id: "));
130+
assert!(!store_repr.contains("secret_key: "));
131+
assert!(!store_repr.contains("token: "));
132+
assert!(store_repr.contains("endpoint: Some(\"http://localhost:9000\")"));
133+
assert!(store_repr.contains("allow_http: Parsed(true)"));
134+
assert!(store_repr.contains("skip_signature: true"));
135+
}
136+
137+
#[test]
138+
fn test_gcs_config_with_service_account() {
139+
let url = Url::parse("gs://test-bucket/").unwrap();
140+
let mut config = HashMap::new();
141+
config.insert(
142+
GCS_CREDENTIALS_JSON.to_string(),
143+
json!(
144+
{
145+
"disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""
146+
}
147+
)
148+
.to_string(),
149+
);
150+
config.insert(GCS_BUCKET.to_string(), "test-bucket".to_string());
151+
152+
let store = object_store_from_config(url, config).unwrap();
153+
let store_repr = format!("{:?}", store);
154+
155+
assert!(store_repr.contains("bearer: \"\""));
156+
assert!(store_repr.contains("bucket_name: \"test-bucket\""));
157+
}
158+
159+
#[test]
160+
fn test_gcs_config_with_oauth_token() {
161+
let url = Url::parse("gs://test-bucket/").unwrap();
162+
let mut config = HashMap::new();
163+
config.insert(GCS_TOKEN.to_string(), "oauth-token-123".to_string());
164+
config.insert(GCS_BUCKET.to_string(), "test-bucket".to_string());
165+
166+
let store = object_store_from_config(url, config).unwrap();
167+
let store_repr = format!("{:?}", store);
168+
169+
assert!(store_repr.contains("bearer: \"oauth-token-123\""));
170+
assert!(store_repr.contains("bucket_name: \"test-bucket\""));
171+
}
172+
}

0 commit comments

Comments
 (0)