Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import boto3
- from botocore.exceptions import ClientError
- from boto3.dynamodb.conditions import Key, Attr
- logging.basicConfig(level=logging.INFO)
- class DynamoDBDemo:
- def create_table(self, table_name, key_schema, attribute_definitions, provisioned_throughput, region):
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- print("\ncreating the table {} ...".format(table_name))
- self.table = dynamodb_resource.create_table(
- TableName=table_name,
- KeySchema=key_schema,
- AttributeDefinitions=attribute_definitions,
- ProvisionedThroughput=provisioned_throughput
- )
- self.table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
- except ClientError as e:
- logging.error(e)
- return False
- return True # Create/wait pattern and resource API usage [web:3]
- def store_an_item(self, region, table_name, item):
- try:
- print("\nstoring the item {} in the table {} ...".format(item, table_name))
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- table.put_item(Item=item)
- except ClientError as e:
- logging.error(e)
- return False
- return True # Put item via Table API [web:3]
- def get_an_item(self, region, table_name, key):
- try:
- print("\nretrieving the item with the key {} from the table {} ...".format(key, table_name))
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- response = table.get_item(Key=key)
- item = response.get('Item')
- print(item)
- except ClientError as e:
- logging.error(e)
- return False
- return True # Get item via Table API [web:3]
- # Update an item
- def update_an_item(self, region, table_name, key, set_attrs=None, remove_attrs=None, add_attrs=None, condition=None, return_values="UPDATED_NEW"):
- """
- set_attrs: dict of {attr: value} to SET
- remove_attrs: list of attribute names to REMOVE
- add_attrs: dict of {attr: value} for ADD (numbers or sets)
- condition: optional ConditionExpression text like "attribute_exists(artist)"
- return_values: e.g., "NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"
- """
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- expr_parts = []
- expr_values = {}
- expr_names = {}
- # SET
- if set_attrs:
- set_clauses = []
- for i, (k, v) in enumerate(set_attrs.items()):
- name_token = f"#n{i}"
- value_token = f":v{i}"
- expr_names[name_token] = k
- expr_values[value_token] = v
- set_clauses.append(f"{name_token} = {value_token}")
- if set_clauses:
- expr_parts.append("SET " + ", ".join(set_clauses))
- # REMOVE
- if remove_attrs:
- rem_names = []
- base_idx = len(expr_names)
- for j, k in enumerate(remove_attrs):
- name_token = f"#n{base_idx + j}"
- expr_names[name_token] = k
- rem_names.append(name_token)
- if rem_names:
- expr_parts.append("REMOVE " + ", ".join(rem_names))
- # ADD
- if add_attrs:
- add_clauses = []
- base_idx = len(expr_names)
- base_val_idx = len(expr_values)
- for t, (k, v) in enumerate(add_attrs.items()):
- name_token = f"#n{base_idx + t}"
- value_token = f":v{base_val_idx + t}"
- expr_names[name_token] = k
- expr_values[value_token] = v
- add_clauses.append(f"{name_token} {value_token}")
- if add_clauses:
- expr_parts.append("ADD " + ", ".join(add_clauses))
- update_expr = " ".join(expr_parts) if expr_parts else None
- kwargs = {
- "Key": key,
- "ReturnValues": return_values
- }
- if update_expr:
- kwargs["UpdateExpression"] = update_expr
- if expr_values:
- kwargs["ExpressionAttributeValues"] = expr_values
- if expr_names:
- kwargs["ExpressionAttributeNames"] = expr_names
- if condition:
- kwargs["ConditionExpression"] = condition
- print("\nupdating the item with key {} in table {} ...".format(key, table_name))
- resp = table.update_item(**kwargs)
- print(resp.get("Attributes"))
- except ClientError as e:
- logging.error(e)
- return False
- return True # UpdateExpression/ConditionExpression patterns [web:3]
- # Delete an item
- def delete_an_item(self, region, table_name, key, require_exists=False, return_values="ALL_OLD"):
- """
- require_exists: when True, use ConditionExpression to ensure the item exists; failing will raise ConditionalCheckFailedException
- return_values: consider 'ALL_OLD' to get the deleted item back if it existed
- """
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- kwargs = {
- "Key": key,
- "ReturnValues": return_values
- }
- if require_exists:
- part_key_attr = list(key.keys())[0]
- kwargs["ConditionExpression"] = Attr(part_key_attr).exists()
- print("\ndeleting the item with key {} from table {} ...".format(key, table_name))
- resp = table.delete_item(**kwargs)
- print(resp.get("Attributes"))
- except ClientError as e:
- logging.error(e)
- return False
- return True # Delete with optional existence condition [web:3]
- # Query patterns (by keys)
- def query_items(self, region, table_name, partition_key_name, partition_key_value,
- sort_key_name=None, mode="all", sort_value=None, sort_value2=None,
- index_name=None, filter_expression=None, projection=None, expr_attr_names=None):
- """
- mode:
- - "all": all items for partition key
- - "eq": sort equals sort_value
- - "begins_with": sort begins_with(sort_value)
- - "between": sort BETWEEN sort_value AND sort_value2
- - "gt","gte","lt","lte": inequality on sort key vs sort_value
- index_name: optional GSI/LSI name
- filter_expression: optional Attr(...) conditions post-key selection
- projection: optional ProjectionExpression string
- expr_attr_names: optional dict for reserved names in projection
- """
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- key_cond = Key(partition_key_name).eq(partition_key_value)
- if sort_key_name and mode != "all":
- if mode == "eq":
- key_cond &= Key(sort_key_name).eq(sort_value)
- elif mode == "begins_with":
- key_cond &= Key(sort_key_name).begins_with(sort_value)
- elif mode == "between":
- key_cond &= Key(sort_key_name).between(sort_value, sort_value2)
- elif mode == "gt":
- key_cond &= Key(sort_key_name).gt(sort_value)
- elif mode == "gte":
- key_cond &= Key(sort_key_name).gte(sort_value)
- elif mode == "lt":
- key_cond &= Key(sort_key_name).lt(sort_value)
- elif mode == "lte":
- key_cond &= Key(sort_key_name).lte(sort_value)
- kwargs = {
- "KeyConditionExpression": key_cond
- }
- if index_name:
- kwargs["IndexName"] = index_name
- if filter_expression:
- kwargs["FilterExpression"] = filter_expression
- if projection:
- kwargs["ProjectionExpression"] = projection
- if expr_attr_names:
- kwargs["ExpressionAttributeNames"] = expr_attr_names
- print("\nquerying table {} ...".format(table_name))
- items = []
- resp = table.query(**kwargs)
- items.extend(resp.get("Items", []))
- while "LastEvaluatedKey" in resp:
- kwargs["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
- resp = table.query(**kwargs)
- items.extend(resp.get("Items", []))
- print("query returned {} items".format(len(items)))
- for it in items:
- print(it)
- except ClientError as e:
- logging.error(e)
- return False
- return True # Table.query with pagination and expressions [web:18]
- # List all songs for an artist
- def list_songs_by_artist(self, region, table_name, artist, titles_only=True):
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- kwargs = {
- "KeyConditionExpression": Key("artist").eq(artist)
- }
- if titles_only:
- kwargs["ProjectionExpression"] = "#a, #s"
- kwargs["ExpressionAttributeNames"] = {"#a": "artist", "#s": "song"}
- print("\nlisting songs for artist '{}' in table {} ...".format(artist, table_name))
- items = []
- resp = table.query(**kwargs)
- items.extend(resp.get("Items", []))
- while "LastEvaluatedKey" in resp:
- kwargs["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
- resp = table.query(**kwargs)
- items.extend(resp.get("Items", []))
- if titles_only:
- songs = [it.get("song") for it in items if "song" in it]
- print("songs ({}): {}".format(len(songs), songs))
- else:
- print("items ({}):".format(len(items)))
- for it in items:
- print(it)
- except ClientError as e:
- logging.error(e)
- return False
- return True # Query by partition key, ProjectionExpression to reduce RUs [web:18][web:22]
- # List all distinct artists (scan-and-dedupe)
- def list_all_artists(self, region, table_name):
- """
- Scans the table projecting only 'artist', then returns unique artists.
- Note: Scan reads the whole table; consider GSIs or precomputed lists for large datasets.
- """
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- print("\nscanning table {} to list all artists ...".format(table_name))
- artists = set()
- scan_kwargs = {
- "ProjectionExpression": "#a",
- "ExpressionAttributeNames": {"#a": "artist"},
- }
- resp = table.scan(**scan_kwargs)
- for it in resp.get("Items", []):
- val = it.get("artist")
- if val is not None:
- artists.add(val)
- while "LastEvaluatedKey" in resp:
- scan_kwargs["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
- resp = table.scan(**scan_kwargs)
- for it in resp.get("Items", []):
- val = it.get("artist")
- if val is not None:
- artists.add(val)
- artists_list = sorted(artists)
- print("artists ({}): {}".format(len(artists_list), artists_list))
- except ClientError as e:
- logging.error(e)
- return False
- return True # Scan with projection, pagination, client-side distinct [web:23]
- # Delete table
- def delete_table(self, region, table_name):
- try:
- dynamodb_resource = boto3.resource("dynamodb", region_name=region)
- table = dynamodb_resource.Table(table_name)
- print("\ndeleting the table {} ...".format(table_name))
- table.delete()
- table.meta.client.get_waiter('table_not_exists').wait(TableName=table_name)
- print("table {} deleted".format(table_name))
- except ClientError as e:
- logging.error(e)
- return False
- return True # Delete and wait pattern [web:3]
- def seed_sample_data(d, region, table_name):
- # Sample items
- items = [
- {"artist": "Pink Floyd", "song": "Us and Them", "album": "The Dark Side of the Moon", "year": 1973},
- {"artist": "Pink Floyd", "song": "Time", "album": "The Dark Side of the Moon", "year": 1973},
- {"artist": "Michael Jackson", "song": "Billie Jean", "album": "Thriller", "length_seconds": 294},
- {"artist": "Michael Jackson", "song": "Beat It", "album": "Thriller", "length_seconds": 258},
- ]
- for it in items:
- d.store_an_item(region, table_name, it)
- def main():
- '''
- If using AWS Academy Learner Lab, region access may be limited to us-east-1 and us-west-2 unless noted otherwise.
- '''
- region = 'us-east-1'
- d = DynamoDBDemo()
- table_name = "music"
- key_schema = [
- {"AttributeName": "artist", "KeyType": "HASH"},
- {"AttributeName": "song", "KeyType": "RANGE"}
- ]
- attribute_definitions = [
- {"AttributeName": "artist", "AttributeType": "S"},
- {"AttributeName": "song", "AttributeType": "S"}
- ]
- provisioned_throughput = {"ReadCapacityUnits": 1, "WriteCapacityUnits": 1}
- d.create_table(table_name, key_schema, attribute_definitions, provisioned_throughput, region)
- # Seed some example data
- seed_sample_data(d, region, table_name)
- # Example get
- key_info = {"artist": "Pink Floyd", "song": "Us and Them"}
- d.get_an_item(region, table_name, key_info)
- # --- Interactive Menu ---
- while True:
- print("\n=== DynamoDB Menu ===")
- print("1) Update an item")
- print("2) Delete an item")
- print("3) Query (partition only)")
- print("4) Query (partition + sort eq)")
- print("5) Query (partition + begins_with sort)")
- print("6) Query (partition + sort between)")
- print("7) List songs by artist")
- print("8) List all artists")
- print("9) Delete table")
- print("10) Exit")
- choice = input("Select an option: ").strip()
- if choice == "1":
- # Example update on MJ - Billlie Jean
- key = {"artist": "Michael Jackson", "song": "Billie Jean"}
- set_attrs = {"album": "Thriller (Special Edition)", "album_remastered": True}
- remove_attrs = ["length_seconds"]
- add_attrs = {"play_count": 1}
- condition = "attribute_exists(artist) AND attribute_exists(song)"
- d.update_an_item(region, table_name, key, set_attrs=set_attrs, remove_attrs=remove_attrs, add_attrs=add_attrs, condition=condition)
- elif choice == "2":
- artist = input("Artist to delete from: ").strip()
- song = input("Song to delete: ").strip()
- key = {"artist": artist, "song": song}
- d.delete_an_item(region, table_name, key, require_exists=True)
- elif choice == "3":
- # Partition-only query
- artist = input("Artist to query: ").strip()
- d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist)
- elif choice == "4":
- artist = input("Artist: ").strip()
- song = input("Song (exact): ").strip()
- d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist,
- sort_key_name="song", mode="eq", sort_value=song)
- elif choice == "5":
- artist = input("Artist: ").strip()
- prefix = input("Song title prefix: ").strip()
- d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist,
- sort_key_name="song", mode="begins_with", sort_value=prefix)
- elif choice == "6":
- artist = input("Artist: ").strip()
- start = input("Song sort-key start (e.g., A): ").strip()
- end = input("Song sort-key end (e.g., M): ").strip()
- d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist,
- sort_key_name="song", mode="between", sort_value=start, sort_value2=end)
- elif choice == "7":
- artist = input("Artist to list songs for: ").strip()
- titles_only = input("Titles only? (y/n): ").strip().lower() == "y"
- d.list_songs_by_artist(region, table_name, artist, titles_only)
- elif choice == "8":
- d.list_all_artists(region, table_name)
- elif choice == "9":
- confirm = input("Type 'DELETE' to drop the table: ").strip()
- if confirm == "DELETE":
- d.delete_table(region, table_name)
- else:
- print("Delete cancelled.")
- elif choice == "10":
- break
- else:
- print("Invalid choice. Try again.")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment