brandblox

DBS_Complex

Oct 7th, 2025
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 17.74 KB | None | 0 0
  1. import logging
  2. import boto3
  3. from botocore.exceptions import ClientError
  4. from boto3.dynamodb.conditions import Key, Attr
  5.  
  6. logging.basicConfig(level=logging.INFO)
  7.  
  8. class DynamoDBDemo:
  9.     def create_table(self, table_name, key_schema, attribute_definitions, provisioned_throughput, region):
  10.         try:
  11.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  12.             print("\ncreating the table {} ...".format(table_name))
  13.             self.table = dynamodb_resource.create_table(
  14.                 TableName=table_name,
  15.                 KeySchema=key_schema,
  16.                 AttributeDefinitions=attribute_definitions,
  17.                 ProvisionedThroughput=provisioned_throughput
  18.             )
  19.             self.table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
  20.         except ClientError as e:
  21.             logging.error(e)
  22.             return False
  23.         return True  # Create/wait pattern and resource API usage [web:3]
  24.  
  25.     def store_an_item(self, region, table_name, item):
  26.         try:
  27.             print("\nstoring the item {} in the table {} ...".format(item, table_name))
  28.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  29.             table = dynamodb_resource.Table(table_name)
  30.             table.put_item(Item=item)
  31.         except ClientError as e:
  32.             logging.error(e)
  33.             return False
  34.         return True  # Put item via Table API [web:3]
  35.  
  36.     def get_an_item(self, region, table_name, key):
  37.         try:
  38.             print("\nretrieving the item with the key {} from the table {} ...".format(key, table_name))
  39.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  40.             table = dynamodb_resource.Table(table_name)
  41.             response = table.get_item(Key=key)
  42.             item = response.get('Item')
  43.             print(item)
  44.         except ClientError as e:
  45.             logging.error(e)
  46.             return False
  47.         return True  # Get item via Table API [web:3]
  48.  
  49.     # Update an item
  50.     def update_an_item(self, region, table_name, key, set_attrs=None, remove_attrs=None, add_attrs=None, condition=None, return_values="UPDATED_NEW"):
  51.         """
  52.        set_attrs: dict of {attr: value} to SET
  53.        remove_attrs: list of attribute names to REMOVE
  54.        add_attrs: dict of {attr: value} for ADD (numbers or sets)
  55.        condition: optional ConditionExpression text like "attribute_exists(artist)"
  56.        return_values: e.g., "NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"
  57.        """
  58.         try:
  59.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  60.             table = dynamodb_resource.Table(table_name)
  61.  
  62.             expr_parts = []
  63.             expr_values = {}
  64.             expr_names = {}
  65.  
  66.             # SET
  67.             if set_attrs:
  68.                 set_clauses = []
  69.                 for i, (k, v) in enumerate(set_attrs.items()):
  70.                     name_token = f"#n{i}"
  71.                     value_token = f":v{i}"
  72.                     expr_names[name_token] = k
  73.                     expr_values[value_token] = v
  74.                     set_clauses.append(f"{name_token} = {value_token}")
  75.                 if set_clauses:
  76.                     expr_parts.append("SET " + ", ".join(set_clauses))
  77.  
  78.             # REMOVE
  79.             if remove_attrs:
  80.                 rem_names = []
  81.                 base_idx = len(expr_names)
  82.                 for j, k in enumerate(remove_attrs):
  83.                     name_token = f"#n{base_idx + j}"
  84.                     expr_names[name_token] = k
  85.                     rem_names.append(name_token)
  86.                 if rem_names:
  87.                     expr_parts.append("REMOVE " + ", ".join(rem_names))
  88.  
  89.             # ADD
  90.             if add_attrs:
  91.                 add_clauses = []
  92.                 base_idx = len(expr_names)
  93.                 base_val_idx = len(expr_values)
  94.                 for t, (k, v) in enumerate(add_attrs.items()):
  95.                     name_token = f"#n{base_idx + t}"
  96.                     value_token = f":v{base_val_idx + t}"
  97.                     expr_names[name_token] = k
  98.                     expr_values[value_token] = v
  99.                     add_clauses.append(f"{name_token} {value_token}")
  100.                 if add_clauses:
  101.                     expr_parts.append("ADD " + ", ".join(add_clauses))
  102.  
  103.             update_expr = " ".join(expr_parts) if expr_parts else None
  104.             kwargs = {
  105.                 "Key": key,
  106.                 "ReturnValues": return_values
  107.             }
  108.             if update_expr:
  109.                 kwargs["UpdateExpression"] = update_expr
  110.             if expr_values:
  111.                 kwargs["ExpressionAttributeValues"] = expr_values
  112.             if expr_names:
  113.                 kwargs["ExpressionAttributeNames"] = expr_names
  114.             if condition:
  115.                 kwargs["ConditionExpression"] = condition
  116.  
  117.             print("\nupdating the item with key {} in table {} ...".format(key, table_name))
  118.             resp = table.update_item(**kwargs)
  119.             print(resp.get("Attributes"))
  120.         except ClientError as e:
  121.             logging.error(e)
  122.             return False
  123.         return True  # UpdateExpression/ConditionExpression patterns [web:3]
  124.  
  125.     # Delete an item
  126.     def delete_an_item(self, region, table_name, key, require_exists=False, return_values="ALL_OLD"):
  127.         """
  128.        require_exists: when True, use ConditionExpression to ensure the item exists; failing will raise ConditionalCheckFailedException
  129.        return_values: consider 'ALL_OLD' to get the deleted item back if it existed
  130.        """
  131.         try:
  132.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  133.             table = dynamodb_resource.Table(table_name)
  134.             kwargs = {
  135.                 "Key": key,
  136.                 "ReturnValues": return_values
  137.             }
  138.             if require_exists:
  139.                 part_key_attr = list(key.keys())[0]
  140.                 kwargs["ConditionExpression"] = Attr(part_key_attr).exists()
  141.             print("\ndeleting the item with key {} from table {} ...".format(key, table_name))
  142.             resp = table.delete_item(**kwargs)
  143.             print(resp.get("Attributes"))
  144.         except ClientError as e:
  145.             logging.error(e)
  146.             return False
  147.         return True  # Delete with optional existence condition [web:3]
  148.  
  149.     # Query patterns (by keys)
  150.     def query_items(self, region, table_name, partition_key_name, partition_key_value,
  151.                     sort_key_name=None, mode="all", sort_value=None, sort_value2=None,
  152.                     index_name=None, filter_expression=None, projection=None, expr_attr_names=None):
  153.         """
  154.        mode:
  155.          - "all": all items for partition key
  156.          - "eq": sort equals sort_value
  157.          - "begins_with": sort begins_with(sort_value)
  158.          - "between": sort BETWEEN sort_value AND sort_value2
  159.          - "gt","gte","lt","lte": inequality on sort key vs sort_value
  160.        index_name: optional GSI/LSI name
  161.        filter_expression: optional Attr(...) conditions post-key selection
  162.        projection: optional ProjectionExpression string
  163.        expr_attr_names: optional dict for reserved names in projection
  164.        """
  165.         try:
  166.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  167.             table = dynamodb_resource.Table(table_name)
  168.  
  169.             key_cond = Key(partition_key_name).eq(partition_key_value)
  170.  
  171.             if sort_key_name and mode != "all":
  172.                 if mode == "eq":
  173.                     key_cond &= Key(sort_key_name).eq(sort_value)
  174.                 elif mode == "begins_with":
  175.                     key_cond &= Key(sort_key_name).begins_with(sort_value)
  176.                 elif mode == "between":
  177.                     key_cond &= Key(sort_key_name).between(sort_value, sort_value2)
  178.                 elif mode == "gt":
  179.                     key_cond &= Key(sort_key_name).gt(sort_value)
  180.                 elif mode == "gte":
  181.                     key_cond &= Key(sort_key_name).gte(sort_value)
  182.                 elif mode == "lt":
  183.                     key_cond &= Key(sort_key_name).lt(sort_value)
  184.                 elif mode == "lte":
  185.                     key_cond &= Key(sort_key_name).lte(sort_value)
  186.  
  187.             kwargs = {
  188.                 "KeyConditionExpression": key_cond
  189.             }
  190.             if index_name:
  191.                 kwargs["IndexName"] = index_name
  192.             if filter_expression:
  193.                 kwargs["FilterExpression"] = filter_expression
  194.             if projection:
  195.                 kwargs["ProjectionExpression"] = projection
  196.             if expr_attr_names:
  197.                 kwargs["ExpressionAttributeNames"] = expr_attr_names
  198.  
  199.             print("\nquerying table {} ...".format(table_name))
  200.             items = []
  201.             resp = table.query(**kwargs)
  202.             items.extend(resp.get("Items", []))
  203.             while "LastEvaluatedKey" in resp:
  204.                 kwargs["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
  205.                 resp = table.query(**kwargs)
  206.                 items.extend(resp.get("Items", []))
  207.  
  208.             print("query returned {} items".format(len(items)))
  209.             for it in items:
  210.                 print(it)
  211.         except ClientError as e:
  212.             logging.error(e)
  213.             return False
  214.         return True  # Table.query with pagination and expressions [web:18]
  215.  
  216.     # List all songs for an artist
  217.     def list_songs_by_artist(self, region, table_name, artist, titles_only=True):
  218.         try:
  219.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  220.             table = dynamodb_resource.Table(table_name)
  221.  
  222.             kwargs = {
  223.                 "KeyConditionExpression": Key("artist").eq(artist)
  224.             }
  225.             if titles_only:
  226.                 kwargs["ProjectionExpression"] = "#a, #s"
  227.                 kwargs["ExpressionAttributeNames"] = {"#a": "artist", "#s": "song"}
  228.  
  229.             print("\nlisting songs for artist '{}' in table {} ...".format(artist, table_name))
  230.             items = []
  231.             resp = table.query(**kwargs)
  232.             items.extend(resp.get("Items", []))
  233.             while "LastEvaluatedKey" in resp:
  234.                 kwargs["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
  235.                 resp = table.query(**kwargs)
  236.                 items.extend(resp.get("Items", []))
  237.  
  238.             if titles_only:
  239.                 songs = [it.get("song") for it in items if "song" in it]
  240.                 print("songs ({}): {}".format(len(songs), songs))
  241.             else:
  242.                 print("items ({}):".format(len(items)))
  243.                 for it in items:
  244.                     print(it)
  245.         except ClientError as e:
  246.             logging.error(e)
  247.             return False
  248.         return True  # Query by partition key, ProjectionExpression to reduce RUs [web:18][web:22]
  249.  
  250.     # List all distinct artists (scan-and-dedupe)
  251.     def list_all_artists(self, region, table_name):
  252.         """
  253.        Scans the table projecting only 'artist', then returns unique artists.
  254.        Note: Scan reads the whole table; consider GSIs or precomputed lists for large datasets.
  255.        """
  256.         try:
  257.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  258.             table = dynamodb_resource.Table(table_name)
  259.  
  260.             print("\nscanning table {} to list all artists ...".format(table_name))
  261.             artists = set()
  262.             scan_kwargs = {
  263.                 "ProjectionExpression": "#a",
  264.                 "ExpressionAttributeNames": {"#a": "artist"},
  265.             }
  266.  
  267.             resp = table.scan(**scan_kwargs)
  268.             for it in resp.get("Items", []):
  269.                 val = it.get("artist")
  270.                 if val is not None:
  271.                     artists.add(val)
  272.  
  273.             while "LastEvaluatedKey" in resp:
  274.                 scan_kwargs["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
  275.                 resp = table.scan(**scan_kwargs)
  276.                 for it in resp.get("Items", []):
  277.                     val = it.get("artist")
  278.                     if val is not None:
  279.                         artists.add(val)
  280.  
  281.             artists_list = sorted(artists)
  282.             print("artists ({}): {}".format(len(artists_list), artists_list))
  283.         except ClientError as e:
  284.             logging.error(e)
  285.             return False
  286.         return True  # Scan with projection, pagination, client-side distinct [web:23]
  287.  
  288.     # Delete table
  289.     def delete_table(self, region, table_name):
  290.         try:
  291.             dynamodb_resource = boto3.resource("dynamodb", region_name=region)
  292.             table = dynamodb_resource.Table(table_name)
  293.             print("\ndeleting the table {} ...".format(table_name))
  294.             table.delete()
  295.             table.meta.client.get_waiter('table_not_exists').wait(TableName=table_name)
  296.             print("table {} deleted".format(table_name))
  297.         except ClientError as e:
  298.             logging.error(e)
  299.             return False
  300.         return True  # Delete and wait pattern [web:3]
  301.  
  302. def seed_sample_data(d, region, table_name):
  303.     # Sample items
  304.     items = [
  305.         {"artist": "Pink Floyd", "song": "Us and Them", "album": "The Dark Side of the Moon", "year": 1973},
  306.         {"artist": "Pink Floyd", "song": "Time", "album": "The Dark Side of the Moon", "year": 1973},
  307.         {"artist": "Michael Jackson", "song": "Billie Jean", "album": "Thriller", "length_seconds": 294},
  308.         {"artist": "Michael Jackson", "song": "Beat It", "album": "Thriller", "length_seconds": 258},
  309.     ]
  310.     for it in items:
  311.         d.store_an_item(region, table_name, it)
  312.  
  313. def main():
  314.     '''
  315.    If using AWS Academy Learner Lab, region access may be limited to us-east-1 and us-west-2 unless noted otherwise.
  316.    '''
  317.     region = 'us-east-1'
  318.     d = DynamoDBDemo()
  319.     table_name = "music"
  320.  
  321.     key_schema = [
  322.         {"AttributeName": "artist", "KeyType": "HASH"},
  323.         {"AttributeName": "song", "KeyType": "RANGE"}
  324.     ]
  325.     attribute_definitions = [
  326.         {"AttributeName": "artist", "AttributeType": "S"},
  327.         {"AttributeName": "song", "AttributeType": "S"}
  328.     ]
  329.     provisioned_throughput = {"ReadCapacityUnits": 1, "WriteCapacityUnits": 1}
  330.  
  331.     d.create_table(table_name, key_schema, attribute_definitions, provisioned_throughput, region)
  332.  
  333.     # Seed some example data
  334.     seed_sample_data(d, region, table_name)
  335.  
  336.     # Example get
  337.     key_info = {"artist": "Pink Floyd", "song": "Us and Them"}
  338.     d.get_an_item(region, table_name, key_info)
  339.  
  340.     # --- Interactive Menu ---
  341.     while True:
  342.         print("\n=== DynamoDB Menu ===")
  343.         print("1) Update an item")
  344.         print("2) Delete an item")
  345.         print("3) Query (partition only)")
  346.         print("4) Query (partition + sort eq)")
  347.         print("5) Query (partition + begins_with sort)")
  348.         print("6) Query (partition + sort between)")
  349.         print("7) List songs by artist")
  350.         print("8) List all artists")
  351.         print("9) Delete table")
  352.         print("10) Exit")
  353.         choice = input("Select an option: ").strip()
  354.  
  355.         if choice == "1":
  356.             # Example update on MJ - Billlie Jean
  357.             key = {"artist": "Michael Jackson", "song": "Billie Jean"}
  358.             set_attrs = {"album": "Thriller (Special Edition)", "album_remastered": True}
  359.             remove_attrs = ["length_seconds"]
  360.             add_attrs = {"play_count": 1}
  361.             condition = "attribute_exists(artist) AND attribute_exists(song)"
  362.             d.update_an_item(region, table_name, key, set_attrs=set_attrs, remove_attrs=remove_attrs, add_attrs=add_attrs, condition=condition)
  363.         elif choice == "2":
  364.             artist = input("Artist to delete from: ").strip()
  365.             song = input("Song to delete: ").strip()
  366.             key = {"artist": artist, "song": song}
  367.             d.delete_an_item(region, table_name, key, require_exists=True)
  368.         elif choice == "3":
  369.             # Partition-only query
  370.             artist = input("Artist to query: ").strip()
  371.             d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist)
  372.         elif choice == "4":
  373.             artist = input("Artist: ").strip()
  374.             song = input("Song (exact): ").strip()
  375.             d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist,
  376.                           sort_key_name="song", mode="eq", sort_value=song)
  377.         elif choice == "5":
  378.             artist = input("Artist: ").strip()
  379.             prefix = input("Song title prefix: ").strip()
  380.             d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist,
  381.                           sort_key_name="song", mode="begins_with", sort_value=prefix)
  382.         elif choice == "6":
  383.             artist = input("Artist: ").strip()
  384.             start = input("Song sort-key start (e.g., A): ").strip()
  385.             end = input("Song sort-key end (e.g., M): ").strip()
  386.             d.query_items(region, table_name, partition_key_name="artist", partition_key_value=artist,
  387.                           sort_key_name="song", mode="between", sort_value=start, sort_value2=end)
  388.         elif choice == "7":
  389.             artist = input("Artist to list songs for: ").strip()
  390.             titles_only = input("Titles only? (y/n): ").strip().lower() == "y"
  391.             d.list_songs_by_artist(region, table_name, artist, titles_only)
  392.         elif choice == "8":
  393.             d.list_all_artists(region, table_name)
  394.         elif choice == "9":
  395.             confirm = input("Type 'DELETE' to drop the table: ").strip()
  396.             if confirm == "DELETE":
  397.                 d.delete_table(region, table_name)
  398.             else:
  399.                 print("Delete cancelled.")
  400.         elif choice == "10":
  401.             break
  402.         else:
  403.             print("Invalid choice. Try again.")
  404.  
  405. if __name__ == '__main__':
  406.     main()
  407.  
Advertisement
Add Comment
Please, Sign In to add comment