diff --git a/examples/image-inspect.py b/examples/image-inspect.py new file mode 100644 index 0000000..406998f --- /dev/null +++ b/examples/image-inspect.py @@ -0,0 +1,27 @@ +###### +# Hack +# +# Make sibling modules visible to this nested executable +import os, sys +sys.path.insert( + 0, + os.path.dirname( + os.path.dirname( + os.path.realpath(__file__) + ) + ) +) +# End Hack +###### + +from image.containerimage import ContainerImage + +# Initialize a ContainerImage given a tag reference +my_image = ContainerImage("registry.k8s.io/pause:3.5") + +# Display the inspect information for the container image +my_image_inspect = my_image.inspect(auth={}) +print( + f"Inspect of {str(my_image)}: \n" + \ + str(my_image_inspect) +) diff --git a/image/config.py b/image/config.py index 7560c6c..1823307 100644 --- a/image/config.py +++ b/image/config.py @@ -3,7 +3,7 @@ configuration for a container image. """ -from typing import Dict, Any, Tuple, Type, Union +from typing import Dict, Any, Tuple, Type, Union, List from jsonschema import validate, ValidationError from image.configschema import CONTAINER_IMAGE_CONFIG_SCHEMA from image.platform import ContainerImagePlatform @@ -105,4 +105,50 @@ def get_platform(self) -> Type[ContainerImagePlatform]: variant = self.get_variant() if variant != None: platform_dict["variant"] = variant - return ContainerImagePlatform(platform_dict) \ No newline at end of file + return ContainerImagePlatform(platform_dict) + + def get_labels(self) -> Dict[str, str]: + """ + Returns the container image labels from the config + + Returns: + Dict[str, str]: The labels from the config + """ + return self.config.get("Labels", {}) + + def get_created_date(self) -> str: + """ + Returns the created date of the container image from the config + + Returns: + str: The created date, as a string + """ + return self.config.get("created", "") + + def get_runtime_config(self) -> Dict[str, Any]: + """ + Returns the runtime config for the container image from its config + + Returns: + Dict[str, Any]: The container image runtime config + """ + return self.config.get("config", {}) + + def get_env(self) -> List[str]: + """ + Returns the list of environment variables set for the container image + at build time from the container image runtime config + + Returns: + List[str]: The list of environment variables + """ + return self.get_runtime_config().get("Env", []) + + def get_author(self) -> str: + """ + Returns the author of the container image from its config + + Returns: + str: The container image author + """ + return self.config.get("Author", "") diff --git a/image/containerimage.py b/image/containerimage.py index 0ce409a..efe68f5 100644 --- a/image/containerimage.py +++ b/image/containerimage.py @@ -9,20 +9,21 @@ from __future__ import annotations import json import requests -from typing import List, Dict, Any, \ - Union, Type, Iterator -from image.byteunit import ByteUnit -from image.client import ContainerImageRegistryClient -from image.config import ContainerImageConfig -from image.errors import ContainerImageError -from image.manifestfactory import ContainerImageManifestFactory -from image.manifestlist import ContainerImageManifestList -from image.oci import ContainerImageManifestOCI, \ - ContainerImageIndexOCI -from image.platform import ContainerImagePlatform -from image.reference import ContainerImageReference -from image.v2s2 import ContainerImageManifestV2S2, \ - ContainerImageManifestListV2S2 +from typing import List, Dict, Any, \ + Union, Type, Iterator +from image.byteunit import ByteUnit +from image.client import ContainerImageRegistryClient +from image.config import ContainerImageConfig +from image.containerimageinspect import ContainerImageInspect +from image.errors import ContainerImageError +from image.manifestfactory import ContainerImageManifestFactory +from image.manifestlist import ContainerImageManifestList +from image.oci import ContainerImageManifestOCI, \ + ContainerImageIndexOCI +from image.platform import ContainerImagePlatform +from image.reference import ContainerImageReference +from image.v2s2 import ContainerImageManifestV2S2, \ + ContainerImageManifestListV2S2 ######################################### # Classes for managing container images # @@ -85,6 +86,103 @@ def is_oci_static( return isinstance(manifest, ContainerImageManifestOCI) or \ isinstance(manifest, ContainerImageIndexOCI) + @staticmethod + def get_host_platform_manifest_static( + ref: ContainerImageReference, + manifest: Union[ + ContainerImageManifestV2S2, + ContainerImageManifestListV2S2, + ContainerImageManifestOCI, + ContainerImageIndexOCI + ], + auth: Dict[str, Any] + ) -> Union[ + ContainerImageManifestV2S2, + ContainerImageManifestOCI + ]: + """ + Given an image's reference and manifest, this static method checks if + the manifest is a manifest list, and attempts to get the manifest from + the list matching the host platform. + + Args: + ref (ContainerImageReference): The image reference corresponding to the manifest + manifest (Union[ContainerImageManifestV2S2,ContainerImageManifestListV2S2,ContainerImageManifestOCI,ContainerImageIndexOCI]): The manifest object, generally from get_manifest method + auth (Dict[str, Any]): A valid docker config JSON with auth into the ref's registry + + Returns: + Union[ContainerImageManifestV2S2,ContainerImageManifestOCI]: The manifest response from the registry API + + Raises: + ContainerImageError: Error if the image is a manifest list without a manifest matching the host platform + """ + host_manifest = manifest + + # If manifest list, get the manifest matching the host platform + if ContainerImage.is_manifest_list_static(manifest): + found = False + host_entry_digest = None + host_plt = ContainerImagePlatform.get_host_platform() + entries = manifest.get_entries() + for entry in entries: + if entry.get_platform() == host_plt: + found = True + host_entry_digest = entry.get_digest() + if not found: + raise ContainerImageError( + "no image found in manifest list for platform: " + \ + f"{str(host_plt)}" + ) + host_ref = ContainerImage( + f"{ref.get_name()}@{host_entry_digest}" + ) + host_manifest = host_ref.get_manifest(auth=auth) + + # Return the manifest matching the host platform + return host_manifest + + @staticmethod + def get_config_static( + ref: ContainerImageReference, + manifest: Union[ + ContainerImageManifestV2S2, + ContainerImageManifestListV2S2, + ContainerImageManifestOCI, + ContainerImageIndexOCI + ], + auth: Dict[str, Any] + ) -> ContainerImageConfig: + """ + Given an image's manifest, this static method fetches that image's + config from the distribution registry API. If the image is a manifest + list, then it gets the config corresponding to the manifest matching + the host platform. + + Args: + ref (ContainerImageReference): The image reference corresponding to the manifest + manifest (Union[ContainerImageManifestV2S2,ContainerImageManifestListV2S2,ContainerImageManifestOCI,ContainerImageIndexOCI]): The manifest object, generally from get_manifest method + auth (Dict[str, Any]): A valid docker config JSON with auth into this image's registry + + Returns: + ContainerImageConfig: The config for this image + + Raises: + ContainerImageError: Error if the image is a manifest list without a manifest matching the host platform + """ + # If manifest list, get the manifest matching the host platform + manifest = ContainerImage.get_host_platform_manifest_static( + ref, manifest, auth + ) + + # Get the image's config + return ContainerImageConfig( + ContainerImageRegistryClient.get_config( + ref, + manifest.get_config_descriptor(), + auth=auth + ) + ) + def __init__(self, ref: str): """ Constructor for the ContainerImage class @@ -179,6 +277,61 @@ def get_manifest(self, auth: Dict[str, Any]) -> Union[ ContainerImageRegistryClient.get_manifest(self, auth) ) + def get_host_platform_manifest(self, auth: Dict[str, Any]) -> Union[ + ContainerImageManifestOCI, + ContainerImageManifestV2S2 + ]: + """ + Fetches the manifest from the distribution registry API. If the + manifest is a manifest list, then it attempts to fetch the manifest + in the list matching the host platform. If not found, an exception is + raised. + + Args: + auth (Dict[str, Any]): A valid docker config JSON with auth into this image's registry + + Returns: + Union[ContainerImageManifestV2S2,ContainerImageManifestOCI]: The manifest response from the registry API + + Raises: + ContainerImageError: Error if the image is a manifest list without a manifest matching the host platform + """ + # Get the container image's manifest + manifest = self.get_manifest(auth=auth) + + # Return the host platform manifest + return ContainerImage.get_host_platform_manifest_static( + self, + manifest, + auth + ) + + def get_config(self, auth: Dict[str, Any]) -> ContainerImageConfig: + """ + Fetches the image's config from the distribution registry API. If the + image is a manifest list, then it gets the config corresponding to the + manifest matching the host platform. + + Args: + auth (Dict[str, Any]): A valid docker config JSON with auth into this image's registry + + Returns: + ContainerImageConfig: The config for this image + + Raises: + ContainerImageError: Error if the image is a manifest list without a manifest matching the host platform + """ + # Get the image's manifest + manifest = self.get_manifest(auth=auth) + + # Use the image's manifest to get the image's config + config = ContainerImage.get_config_static( + self, manifest, auth + ) + + # Return the image's config + return config + def exists(self, auth: Dict[str, Any]) -> bool: """ Determine if the image reference corresponds to an image in the remote @@ -266,6 +419,60 @@ def get_size_formatted(self, auth: Dict[str, Any]) -> str: """ return ByteUnit.format_size_bytes(self.get_size(auth)) + def inspect(self, auth: Dict[str, Any]) -> ContainerImageInspect: + """ + Returns a collection of basic information about the image, equivalent + to skopeo inspect. + + Args: + auth (Dict[str, Any]): A valid docker config JSON loaded into a dict + + Returns: + ContainerImageInspect: A collection of information about the image + """ + # Get the image's manifest + manifest = self.get_host_platform_manifest(auth=auth) + + # Use the image's manifest to get the image's config + config = ContainerImage.get_config_static( + self, manifest, auth + ) + + # Format the inspect dictionary + inspect = { + "Name": self.get_name(), + "Digest": self.get_digest(auth=auth), + # TODO: Implement v2s1 manifest extension - only v2s1 manifests use this value + "DockerVersion": "", + "Created": config.get_created_date(), + "Labels": config.get_labels(), + "Architecture": config.get_architecture(), + "Variant": config.get_variant() or "", + "Os": config.get_os(), + "Layers": [ + layer.get_digest() \ + for layer \ + in manifest.get_layer_descriptors() + ], + "LayersData": [ + { + "MIMEType": layer.get_media_type(), + "Digest": layer.get_digest(), + "Size": layer.get_size(), + "Annotations": layer.get_annotations() or {} + } for layer in manifest.get_layer_descriptors() + ], + "Env": config.get_env(), + "Author": config.get_author() + } + + # Set the tag in the inspect dict + if self.is_tag_ref(): + inspect["Tag"] = self.get_identifier() + + # TODO: Get the RepoTags for the image + return ContainerImageInspect(inspect) + def delete(self, auth: Dict[str, Any]): """ Deletes the image from the registry. diff --git a/image/containerimageinspect.py b/image/containerimageinspect.py new file mode 100644 index 0000000..693b5d1 --- /dev/null +++ b/image/containerimageinspect.py @@ -0,0 +1,81 @@ +import json +from image.errors import ContainerImageError +from image.inspectschema import CONTAINER_IMAGE_INSPECT_SCHEMA +from jsonschema import validate +from typing import Dict, Any, Tuple + +class ContainerImageInspect: + """ + Represents a collection of basic informataion about a container image. + This object is equivalent to the output of skopeo inspect. + """ + @staticmethod + def validate_static(inspect: Dict[str, Any]) -> Tuple[bool, str]: + """ + Validate a container image inspect dict using its json schema + + Args: + inspect (dict): The container image inspect dict to validate + + Returns: + bool: Whether the container image inspect dict was valid + str: Error message if it was invalid + """ + # Validate the container image inspect dict + try: + validate( + instance=inspect, + schema=CONTAINER_IMAGE_INSPECT_SCHEMA + ) + except Exception as e: + return False, str(e) + return True, "" + + def __init__(self, inspect: Dict[str, Any]) -> "ContainerImageInspect": + """ + Constructor for the ContainerImageInspect class + + Args: + inspect (dict): The container image inspect dict + + Returns: + ContainerImageInspect: The ContainerImageInspect instance + """ + valid, err = ContainerImageInspect.validate_static(inspect) + if not valid: + raise ContainerImageError(f"Invalid inspect dictionary: {err}") + self.inspect = inspect + + def validate(self) -> Tuple[bool, str]: + """ + Validate a container image inspect instance + + Returns: + bool: Whether the container image inspect dict was valid + str: Error message if it was invalid + """ + return ContainerImageInspect.validate_static(self.inspect) + + def __str__(self) -> str: + """ + Stringifies a ContainerImageInspect instance + + Args: + None + + Returns: + str: The stringified inspect + """ + return json.dumps(self.inspect, indent=2, sort_keys=False) + + def __json__(self) -> Dict[str, Any]: + """ + JSONifies a ContainerImageInspect instance + + Args: + None + + Returns: + Dict[str, Any]: The JSONified descriptor + """ + return self.inspect diff --git a/image/inspectschema.py b/image/inspectschema.py new file mode 100644 index 0000000..3916395 --- /dev/null +++ b/image/inspectschema.py @@ -0,0 +1,135 @@ +CONTAINER_IMAGE_LAYER_INSPECT_SCHEMA = { + "type": "object", + "description": "The JSON schema for a container image layer inspect " + \ + "dictionary.", + "required": [ "MIMEType", "Digest", "Size" ], + "additionalProperties": False, + "properties": { + "MIMEType": { + "type": "string", + "description": "This REQUIRED property is the MIME type, or " + \ + "media type, of this container image layer" + }, + "Digest": { + "type": "string", + "description": "This REQUIRED property is the digest of this " + \ + "container image layer" + }, + "Size": { + "type": "integer", + "description": "This REQUIRED property is the size of this " + \ + "container image layer measured in bytes" + }, + "Annotations": { + "type": "object", + "description": "This OPTIONAL property is the set of " + \ + "annotations belonging to this container image", + "patternProperties": { + "(^.*$)": { "type": "string" } + } + } + } +} +""" +The JSON schema for validating a container image layer inspect dictionary +Ref: https://github.com/containers/image/blob/main/types/types.go#L491-L497 + +:meta hide-value: +""" + +CONTAINER_IMAGE_INSPECT_SCHEMA = { + "type": "object", + "description": "The JSON schema for a container image inspect dictionary", + "required": [ + "Digest", "Created", "DockerVersion", "Labels", "Architecture", "Os", + "Layers", "LayersData", "Env" + ], + "additionalProperties": False, + "properties": { + "Name": { + "type": "string", + "description": "This OPTIONAL property is the name of this " + \ + "container image" + }, + "Digest": { + "type": "string", + "description": "This REQUIRED property is the digest of this " + \ + "container image" + }, + "Tag": { + "type": "string", + "description": "This OPTIONAL property is the tag of this " + \ + "container image" + }, + # TODO: Add RepoTags property + "Created": { + "type": "string", + "description": "This REQUIRED property is the date this " + \ + "container image was built" + }, + "DockerVersion": { + "type": "string", + "description": "This REQUIRED property is the version of " + \ + "docker used to build this container image" + }, + "Labels": { + "type": "object", + "description": "This REQUIRED property is the set of labels " + \ + "applied to this container image at build time", + "patternProperties": { + "(^.*$)": { "type": "string" } + } + }, + "Architecture": { + "type": "string", + "description": "This REQUIRED property is the architecture " + \ + "for which this container image was built" + }, + "Variant": { + "type": "string", + "description": "This OPTIONAL property is the variant of the " + \ + "OS and architecture for which this container image was built" + }, + "Os": { + "type": "string", + "description": "This REQUIRED property is the operating system" + \ + "for which this container image was built" + }, + "Layers": { + "type": "array", + "description": "This REQUIRED property contains information " + \ + "on the set of layers belonging to this container image", + "items": { + "type": "string" + } + }, + "LayersData": { + "type": "array", + "description": "This REQUIRED property contains information " + \ + "on the set of layers belonging to this container image", + "items": CONTAINER_IMAGE_LAYER_INSPECT_SCHEMA + }, + "Env": { + "type": "array", + "description": "This REQUIRED property contains information " + \ + "on the set of environment variables set at build time " + \ + "in this container image", + "items": { + "type": "string" + } + }, + "Author": { + "type": "string", + "description": "This OPTIONAL property is the author who " + \ + "built the container image" + } + } +} +""" +The JSON schema for validating a container image inspect dictionary +Ref: +- https://github.com/containers/image/blob/main/types/types.go#L474-L489 +- https://github.com/containers/image/blob/main/types/types.go#L474-L489 + +:meta hide-value: +""" diff --git a/image/platform.py b/image/platform.py index 4c1792a..93fd048 100644 --- a/image/platform.py +++ b/image/platform.py @@ -4,10 +4,39 @@ to run """ +import os +import platform from typing import Dict, Any, Tuple, Union, List from jsonschema import validate, ValidationError from image.manifestschema import IMAGE_INDEX_ENTRY_PLATFORM_SCHEMA +PLATFORM_ARCHITECTURE_MAP = { + 'x86_64': 'amd64', + 'amd64': 'amd64', + 'i386': '386', + 'i686': '386', + 'arm64': 'arm64', + 'aarch64': 'arm64', + 'armv7l': 'arm', + 'armv6l': 'arm', +} +""" +A map used to transform the output of the platform.machine method such that it +matches the expected value of GoLang's GOARCH environment variable +""" + +DEFAULT_HOST_OS = platform.system().lower() +DEFAULT_HOST_ARCH = PLATFORM_ARCHITECTURE_MAP.get( + platform.machine().lower(), + platform.machine().lower() +) +HOST_OS = os.environ.get("HOST_OS", DEFAULT_HOST_OS) +HOST_ARCH = os.environ.get("HOST_ARCH", DEFAULT_HOST_ARCH) +HOST_PLATFORM = { + "os": HOST_OS, + "architecture": HOST_ARCH +} + class ContainerImagePlatform: """ Represents platform metadata, which is generally specified in an OCI image @@ -17,6 +46,16 @@ class ContainerImagePlatform: Note that the OCI and v2s2 specifications do not diverge in their schema for platform metadata, hence we reuse this class across both scenarios. """ + @staticmethod + def get_host_platform() -> "ContainerImagePlatform": + """ + Get the platform of the host machine + + Returns: + ContainerImagePlatform: The host machine platform + """ + return ContainerImagePlatform(HOST_PLATFORM) + @staticmethod def validate_static(platform: Dict[str, Any]) -> Tuple[bool, str]: """