Skip to content

feat: Glue Catalog - namespace operations (2/3) #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 27, 2024
Prev Previous commit
Next Next commit
impl update_namespace
  • Loading branch information
marvinlanhenke committed Mar 25, 2024
commit cae3446ba0b68c84e3fdd3e8b48e569ad3b46d36
32 changes: 25 additions & 7 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use std::{collections::HashMap, fmt::Debug};
use typed_builder::TypedBuilder;

use crate::error::from_sdk_error;
use crate::utils::{convert_to_database, create_sdk_config, validate_namespace};
use crate::utils::{
convert_to_database, convert_to_namespace, create_sdk_config, validate_namespace,
};
use crate::with_catalog_id;

#[derive(Debug, TypedBuilder)]
Expand Down Expand Up @@ -113,14 +115,14 @@ impl Catalog for GlueCatalog {
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
let db_input = convert_to_database(namespace, &self.config.uri, &properties)?;
let db_input = convert_to_database(namespace, &properties)?;

let builder = self.client.0.create_database().database_input(db_input);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_sdk_error)?;

Ok(Namespace::new(namespace.clone()))
Ok(Namespace::with_properties(namespace.clone(), properties))
}

async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
Expand All @@ -132,7 +134,10 @@ impl Catalog for GlueCatalog {
let resp = builder.send().await.map_err(from_sdk_error)?;

match resp.database() {
Some(db) => Ok(Namespace::new(NamespaceIdent::new(db.name().to_string()))),
Some(db) => {
let namespace = convert_to_namespace(&db);
Ok(namespace)
}
None => Err(Error::new(
ErrorKind::DataInvalid,
format!("Database with name: {} does not exist", db_name),
Expand Down Expand Up @@ -165,10 +170,23 @@ impl Catalog for GlueCatalog {

async fn update_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
todo!()
let db_name = validate_namespace(&namespace)?;
let db_input = convert_to_database(&namespace, &properties)?;

let builder = self
.client
.0
.update_database()
.name(&db_name)
.database_input(db_input);
let builder = with_catalog_id!(builder, self.config);

builder.send().await.map_err(from_sdk_error)?;

Ok(())
}

async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
Expand Down
65 changes: 54 additions & 11 deletions crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
use std::collections::HashMap;

use aws_config::{BehaviorVersion, Region, SdkConfig};
use aws_sdk_glue::{config::Credentials, types::DatabaseInput};
use iceberg::{Error, ErrorKind, NamespaceIdent, Result};
use aws_sdk_glue::{
config::Credentials,
types::{Database, DatabaseInput},
};
use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};

use crate::error::from_build_error;

Expand All @@ -39,6 +42,8 @@ pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
/// Parameter namespace description
const DESCRIPTION: &str = "description";
/// Parameter namespace description
const LOCATION: &str = "location_uri";

/// Creates an AWS SDK configuration (SdkConfig) based on
/// provided properties and an optional endpoint URL.
Expand Down Expand Up @@ -82,21 +87,19 @@ pub(crate) async fn create_sdk_config(
/// Create `DatabaseInput` from name, uri and properties
pub(crate) fn convert_to_database(
namespace: &NamespaceIdent,
location_uri: &Option<String>,
properties: &HashMap<String, String>,
) -> Result<DatabaseInput> {
let db_name = validate_namespace(namespace)?;
let mut builder = DatabaseInput::builder().name(db_name);

if let Some(location_uri) = location_uri {
builder = builder.location_uri(location_uri);
}

for (k, v) in properties.iter() {
match k.as_ref() {
DESCRIPTION => {
builder = builder.description(v);
}
LOCATION => {
builder = builder.location_uri(v);
}
_ => {
builder = builder.parameters(k, v);
}
Expand All @@ -106,6 +109,24 @@ pub(crate) fn convert_to_database(
builder.build().map_err(from_build_error)
}

/// Create `Namespace` from aws sdk glue `Database`
pub(crate) fn convert_to_namespace(database: &Database) -> Namespace {
let db_name = database.name().to_string();
let mut properties = database
.parameters()
.map_or_else(HashMap::new, |p| p.clone());

if let Some(location_uri) = database.location_uri() {
properties.insert(LOCATION.to_string(), location_uri.to_string());
};

if let Some(description) = database.description() {
properties.insert(DESCRIPTION.to_string(), description.to_string());
}

Namespace::with_properties(NamespaceIdent::new(db_name), properties)
}

/// Checks if provided `NamespaceIdent` is valid.
pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
let name = namespace.as_ref();
Expand Down Expand Up @@ -151,16 +172,38 @@ mod tests {

use super::*;

#[test]
fn test_convert_to_namespace() -> Result<()> {
let db = Database::builder()
.name("my_db")
.location_uri("my_location")
.description("my_description")
.build()
.map_err(from_build_error)?;

let properties = HashMap::from([
(DESCRIPTION.to_string(), "my_description".to_string()),
(LOCATION.to_string(), "my_location".to_string()),
]);

let expected =
Namespace::with_properties(NamespaceIdent::new("my_db".to_string()), properties);
let result = convert_to_namespace(&db);

assert_eq!(result, expected);

Ok(())
}

#[test]
fn test_convert_to_database() -> Result<()> {
let namespace = NamespaceIdent::new("my_database".to_string());
let location_uri = Some("my_location".to_string());
let properties = HashMap::new();
let properties = HashMap::from([(LOCATION.to_string(), "my_location".to_string())]);

let result = convert_to_database(&namespace, &location_uri, &properties)?;
let result = convert_to_database(&namespace, &properties)?;

assert_eq!("my_database", result.name());
assert_eq!(location_uri, result.location_uri);
assert_eq!(Some("my_location".to_string()), result.location_uri);

Ok(())
}
Expand Down
33 changes: 33 additions & 0 deletions crates/catalog/glue/tests/glue_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,39 @@ async fn set_test_fixture(func: &str) -> TestFixture {
}
}

#[tokio::test]
async fn test_update_namespace() -> Result<()> {
let fixture = set_test_fixture("test_update_namespace").await;

let properties = HashMap::new();
let namespace = NamespaceIdent::new("my_database".into());

fixture
.glue_catalog
.create_namespace(&namespace, properties)
.await?;

let before_update = fixture.glue_catalog.get_namespace(&namespace).await?;
let before_update = before_update.properties().get("description");
assert_eq!(before_update, None);

let properties = HashMap::from([("description".to_string(), "my_update".to_string())]);

fixture
.glue_catalog
.update_namespace(&namespace, properties)
.await?;

let after_update = fixture.glue_catalog.get_namespace(&namespace).await?;
let after_update = after_update.properties().get("description");
assert_eq!(
after_update.as_deref(),
Some("my_update".to_string()).as_ref()
);

Ok(())
}

#[tokio::test]
async fn test_namespace_exists() -> Result<()> {
let fixture = set_test_fixture("test_namespace_exists").await;
Expand Down