6666from typing import Dict , Iterator , Optional
6767
6868from tuf import exceptions
69- from tuf .api .metadata import Metadata
69+ from tuf .api .metadata import Metadata , Root , Snapshot , Targets , Timestamp
7070from tuf .api .serialization import DeserializationError
7171
7272logger = logging .getLogger (__name__ )
@@ -92,13 +92,13 @@ def __init__(self, root_data: bytes):
9292 RepositoryError: Metadata failed to load or verify. The actual
9393 error type and content will contain more details.
9494 """
95- self ._trusted_set = {} # type : Dict[str: Metadata]
95+ self ._trusted_set : Dict [str , Metadata ] = {}
9696 self .reference_time = datetime .utcnow ()
9797
9898 # Load and validate the local root metadata. Valid initial trusted root
9999 # metadata is required
100100 logger .debug ("Updating initial trusted root" )
101- self .update_root (root_data )
101+ self ._load_trusted_root (root_data )
102102
103103 def __getitem__ (self , role : str ) -> Metadata :
104104 """Returns current Metadata for 'role'"""
@@ -114,27 +114,27 @@ def __iter__(self) -> Iterator[Metadata]:
114114
115115 # Helper properties for top level metadata
116116 @property
117- def root (self ) -> Optional [ Metadata ]:
118- """Current root Metadata or None """
119- return self ._trusted_set . get ( "root" )
117+ def root (self ) -> Metadata [ Root ]:
118+ """Current root Metadata"""
119+ return self ._trusted_set [ "root" ]
120120
121121 @property
122- def timestamp (self ) -> Optional [Metadata ]:
122+ def timestamp (self ) -> Optional [Metadata [ Timestamp ] ]:
123123 """Current timestamp Metadata or None"""
124124 return self ._trusted_set .get ("timestamp" )
125125
126126 @property
127- def snapshot (self ) -> Optional [Metadata ]:
127+ def snapshot (self ) -> Optional [Metadata [ Snapshot ] ]:
128128 """Current snapshot Metadata or None"""
129129 return self ._trusted_set .get ("snapshot" )
130130
131131 @property
132- def targets (self ) -> Optional [Metadata ]:
132+ def targets (self ) -> Optional [Metadata [ Targets ] ]:
133133 """Current targets Metadata or None"""
134134 return self ._trusted_set .get ("targets" )
135135
136136 # Methods for updating metadata
137- def update_root (self , data : bytes ):
137+ def update_root (self , data : bytes ) -> None :
138138 """Verifies and loads 'data' as new root metadata.
139139
140140 Note that an expired intermediate root is considered valid: expiry is
@@ -152,7 +152,7 @@ def update_root(self, data: bytes):
152152 logger .debug ("Updating root" )
153153
154154 try :
155- new_root = Metadata .from_bytes (data )
155+ new_root = Metadata [ Root ] .from_bytes (data )
156156 except DeserializationError as e :
157157 raise exceptions .RepositoryError ("Failed to load root" ) from e
158158
@@ -161,21 +161,21 @@ def update_root(self, data: bytes):
161161 f"Expected 'root', got '{ new_root .signed .type } '"
162162 )
163163
164- if self .root is not None :
165- # We are not loading initial trusted root: verify the new one
166- self .root .verify_delegate ("root" , new_root )
164+ # Verify that new root is signed by trusted root
165+ self .root .verify_delegate ("root" , new_root )
167166
168- if new_root .signed .version != self .root .signed .version + 1 :
169- raise exceptions .ReplayedMetadataError (
170- "root" , new_root .signed .version , self .root .signed .version
171- )
167+ if new_root .signed .version != self .root .signed .version + 1 :
168+ raise exceptions .ReplayedMetadataError (
169+ "root" , new_root .signed .version , self .root .signed .version
170+ )
172171
172+ # Verify that new root is signed by itself
173173 new_root .verify_delegate ("root" , new_root )
174174
175175 self ._trusted_set ["root" ] = new_root
176176 logger .debug ("Updated root" )
177177
178- def update_timestamp (self , data : bytes ):
178+ def update_timestamp (self , data : bytes ) -> None :
179179 """Verifies and loads 'data' as new timestamp metadata.
180180
181181 Note that an expired intermediate timestamp is considered valid so it
@@ -199,7 +199,7 @@ def update_timestamp(self, data: bytes):
199199 # timestamp/snapshot can not yet be loaded at this point
200200
201201 try :
202- new_timestamp = Metadata .from_bytes (data )
202+ new_timestamp = Metadata [ Timestamp ] .from_bytes (data )
203203 except DeserializationError as e :
204204 raise exceptions .RepositoryError ("Failed to load timestamp" ) from e
205205
@@ -237,7 +237,7 @@ def update_timestamp(self, data: bytes):
237237 self ._trusted_set ["timestamp" ] = new_timestamp
238238 logger .debug ("Updated timestamp" )
239239
240- def update_snapshot (self , data : bytes ):
240+ def update_snapshot (self , data : bytes ) -> None :
241241 """Verifies and loads 'data' as new snapshot metadata.
242242
243243 Note that intermediate snapshot is considered valid even if it is
@@ -276,7 +276,7 @@ def update_snapshot(self, data: bytes):
276276 ) from e
277277
278278 try :
279- new_snapshot = Metadata .from_bytes (data )
279+ new_snapshot = Metadata [ Snapshot ] .from_bytes (data )
280280 except DeserializationError as e :
281281 raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
282282
@@ -314,7 +314,11 @@ def update_snapshot(self, data: bytes):
314314 self ._trusted_set ["snapshot" ] = new_snapshot
315315 logger .debug ("Updated snapshot" )
316316
317- def _check_final_snapshot (self ):
317+ def _check_final_snapshot (self ) -> None :
318+ """Check snapshot expiry and version before targets is updated"""
319+
320+ assert self .snapshot is not None # nosec
321+ assert self .timestamp is not None # nosec
318322 if self .snapshot .signed .is_expired (self .reference_time ):
319323 raise exceptions .ExpiredMetadataError ("snapshot.json is expired" )
320324
@@ -328,7 +332,7 @@ def _check_final_snapshot(self):
328332 f"got { self .snapshot .signed .version } "
329333 )
330334
331- def update_targets (self , data : bytes ):
335+ def update_targets (self , data : bytes ) -> None :
332336 """Verifies and loads 'data' as new top-level targets metadata.
333337
334338 Args:
@@ -342,7 +346,7 @@ def update_targets(self, data: bytes):
342346
343347 def update_delegated_targets (
344348 self , data : bytes , role_name : str , delegator_name : str
345- ):
349+ ) -> None :
346350 """Verifies and loads 'data' as new metadata for target 'role_name'.
347351
348352 Args:
@@ -383,7 +387,7 @@ def update_delegated_targets(
383387 ) from e
384388
385389 try :
386- new_delegate = Metadata .from_bytes (data )
390+ new_delegate = Metadata [ Targets ] .from_bytes (data )
387391 except DeserializationError as e :
388392 raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
389393
@@ -405,3 +409,24 @@ def update_delegated_targets(
405409
406410 self ._trusted_set [role_name ] = new_delegate
407411 logger .debug ("Updated %s delegated by %s" , role_name , delegator_name )
412+
413+ def _load_trusted_root (self , data : bytes ) -> None :
414+ """Verifies and loads 'data' as trusted root metadata.
415+
416+ Note that an expired initial root is considered valid: expiry is
417+ only checked for the final root in update_timestamp().
418+ """
419+ try :
420+ new_root = Metadata [Root ].from_bytes (data )
421+ except DeserializationError as e :
422+ raise exceptions .RepositoryError ("Failed to load root" ) from e
423+
424+ if new_root .signed .type != "root" :
425+ raise exceptions .RepositoryError (
426+ f"Expected 'root', got '{ new_root .signed .type } '"
427+ )
428+
429+ new_root .verify_delegate ("root" , new_root )
430+
431+ self ._trusted_set ["root" ] = new_root
432+ logger .debug ("Loaded trusted root" )
0 commit comments