1919concurrent uploads the artifact must appear atomically for unrelated readers.
2020"""
2121
22+ from .audit import Audit
2223from .errors import BuildError
2324from .tty import stepAction , stepMessage , \
2425 SKIPPED , EXECUTED , WARNING , INFO , TRACE , ERROR , IMPORTANT
25- from .utils import asHexStr , removePath , isWindows , getBashPath , tarfileOpen
26+ from .utils import asHexStr , removePath , isWindows , getBashPath , tarfileOpen , binStat
2627from .webdav import WebDav , HTTPException , HttpDownloadError , HttpUploadError , HttpNotFoundError , HttpAlreadyExistsError
27- from shlex import quote
2828from tempfile import mkstemp , NamedTemporaryFile , TemporaryFile , gettempdir
2929import asyncio
3030import concurrent .futures
3636import os .path
3737import shutil
3838import signal
39+ import struct
3940import subprocess
4041import tarfile
4142import urllib .parse
43+ import hashlib
4244
4345ARCHIVE_GENERATION = '-1'
4446ARTIFACT_SUFFIX = ".tgz"
@@ -65,6 +67,9 @@ def writeFileOrHandle(name, fileobj, content):
6567class DummyArchive :
6668 """Archive that does nothing"""
6769
70+ def canManage (self ):
71+ return False
72+
6873 def wantDownloadLocal (self , enable ):
6974 pass
7075
@@ -156,6 +161,26 @@ def _extract(self, fileobj, audit, content):
156161 os .makedirs (content )
157162 self .__extractPackage (tar , audit , content )
158163
164+ def _extractAudit (self , filename = None , fileobj = None ):
165+ with tarfileOpen (name = filename , mode = "r|*" , fileobj = fileobj , errorlevel = 1 ) as tar :
166+ # validate
167+ if tar .pax_headers .get ('bob-archive-vsn' ) != "1" :
168+ return
169+
170+ # find audit trail
171+ f = tar .next ()
172+ while f :
173+ if f .name == "meta/audit.json.gz" : break
174+ f = tar .next ()
175+ else :
176+ raise BuildError ("Missing audit trail!" )
177+
178+ # read audit trail
179+ auditJsonGz = tar .extractfile (f )
180+ auditJson = gzip .GzipFile (fileobj = auditJsonGz )
181+
182+ return Audit .fromByteStream (auditJson , filename )
183+
159184 def _pack (self , name , fileobj , audit , content ):
160185 pax = { 'bob-archive-vsn' : "1" }
161186 with gzip .open (name or fileobj , 'wb' , 6 ) as gzf :
@@ -192,6 +217,9 @@ def canUpload(self):
192217 def canCache (self ):
193218 return True
194219
220+ def canManage (self ):
221+ return False
222+
195223 async def uploadPackage (self , step , buildId , audit , content , executor = None ):
196224 if not audit :
197225 raise BuildError ("Missing audit trail! Cannot proceed without one." )
@@ -362,6 +390,9 @@ def canCache(self):
362390 def _openDownloadFile (self , buildId , suffix ):
363391 raise ArtifactNotFoundError ()
364392
393+ def canManage (self ):
394+ return False
395+
365396 async def downloadPackage (self , step , buildId , audit , content , caches = [],
366397 executor = None ):
367398 if not self .canDownload ():
@@ -557,6 +588,77 @@ async def downloadLocalFingerprint(self, step, key, executor=None):
557588 except (concurrent .futures .CancelledError , concurrent .futures .process .BrokenProcessPool ):
558589 raise BuildError ("Download of fingerprint interrupted." )
559590
591+ def deleteFile (self , filepath ):
592+ try :
593+ self ._delete (filepath )
594+ except (ArtifactDownloadError , OSError ) as e :
595+ if self .__ignoreErrors :
596+ return ("error (" + str (e )+ ")" , ERROR )
597+ else :
598+ raise BuildError ("Could not delete file: " + str (e ))
599+
600+ def _delete (self , filepath ):
601+ raise ArtifactDownloadError ("not implemented" )
602+
603+ def listDir (self , path ):
604+ try :
605+ return self ._listDir (path )
606+ except (ArtifactDownloadError , OSError ) as e :
607+ if self .__ignoreErrors :
608+ return ("error (" + str (e ) + ")" , ERROR )
609+ else :
610+ raise BuildError ("Could not list dir: " + str (e ))
611+
612+ def _listDir (self , path ):
613+ raise ArtifactDownloadError ("not implemented" )
614+
615+ def stat (self , filepath ):
616+ try :
617+ return self ._stat (filepath )
618+ except (ArtifactDownloadError , OSError ) as e :
619+ if self .__ignoreErrors :
620+ return ("error (" + str (e ) + ")" , ERROR )
621+ else :
622+ raise BuildError ("Could not stat file: " + str (e ))
623+
624+ def _stat (self , filepath ):
625+ raise ArtifactDownloadError ("not implemented" )
626+
627+ def getAudit (self , filepath ):
628+ try :
629+ return self ._getAudit (filepath )
630+ except (ArtifactDownloadError , OSError ) as e :
631+ if self .__ignoreErrors :
632+ return ("error (" + str (e ) + ")" , ERROR )
633+ else :
634+ raise BuildError ("Could not get audit from file: " + str (e ))
635+
636+ def _getAudit (self , filepath ):
637+ raise ArtifactDownloadError ("not implemented" )
638+
639+ def getArchiveUri (self ):
640+ try :
641+ return self ._getArchiveUri ()
642+ except (ArtifactDownloadError , OSError ) as e :
643+ if self .__ignoreErrors :
644+ return ("error (" + str (e ) + ")" , ERROR )
645+ else :
646+ raise BuildError ("Could not get archive hash: " + str (e ))
647+
648+ def _getArchiveUri (self ):
649+ raise ArtifactDownloadError ("not implemented" )
650+
651+ def getArchiveName (self ):
652+ try :
653+ return self ._getArchiveName ()
654+ except (ArtifactDownloadError , OSError ) as e :
655+ if self .__ignoreErrors :
656+ return ("error (" + str (e ) + ")" , ERROR )
657+ else :
658+ raise BuildError ("Could not get archive hash: " + str (e ))
659+
660+ def _getArchiveName (self ):
661+ raise ArtifactDownloadError ("not implemented" )
560662
561663class Tee :
562664 def __init__ (self , fileName , fileObj , buildId , caches , workspace ):
@@ -650,6 +752,9 @@ def __init__(self, spec):
650752 self .__fileMode = spec .get ("fileMode" )
651753 self .__dirMode = spec .get ("directoryMode" )
652754
755+ def canManage (self ):
756+ return True
757+
653758 def _getPath (self , buildId , suffix ):
654759 packageResultId = buildIdToName (buildId )
655760 packageResultPath = os .path .join (self .__basePath , packageResultId [0 :2 ],
@@ -686,6 +791,29 @@ def _openUploadFile(self, buildId, suffix, overwrite):
686791 NamedTemporaryFile (dir = packageResultPath , delete = False ),
687792 self .__fileMode , packageResultFile , overwrite )
688793
794+ def _delete (self , filename ):
795+ try :
796+ os .unlink (os .path .join (self .__basePath , filename ))
797+ except FileNotFoundError :
798+ pass
799+ except OSError as e :
800+ raise BuildError ("Cannot remove {}: {}" .format (filename , str (e )))
801+
802+ def _listDir (self , path ):
803+ return os .listdir (os .path .join (self .__basePath , path ))
804+
805+ def _stat (self , filename ):
806+ return binStat (os .path .join (self .__basePath , filename ))
807+
808+ def _getAudit (self , filename ):
809+ return self ._extractAudit (filename = os .path .join (self .__basePath , filename ))
810+
811+ def _getArchiveUri (self ):
812+ return self .__basePath
813+
814+ def _getArchiveName (self ):
815+ return "local archive {}" .format (self .__basePath )
816+
689817class LocalArchiveDownloader :
690818 def __init__ (self , name ):
691819 try :
@@ -751,6 +879,9 @@ def __retry(self, request):
751879 if not retry : return (False , e )
752880 retry = False
753881
882+ def canManage (self ):
883+ return True
884+
754885 def _resetConnection (self ):
755886 self ._webdav ._resetConnection ()
756887
@@ -813,6 +944,42 @@ def _putUploadFile(self, path, tmp, overwrite):
813944 else :
814945 raise result
815946
947+ def _listDir (self , path ):
948+ path_info = self ._webdav .listdir (path )
949+ entries = []
950+ for info in path_info :
951+ if info ["path" ]:
952+ entries .append (info ["path" ].removeprefix (path + "/" ))
953+ return entries
954+
955+ def _delete (self , filename ):
956+ self ._webdav .delete (filename )
957+
958+ def _stat (self , filename ):
959+ stats = self ._webdav .stat (filename )
960+ if stats ['etag' ] is not None and not stats ['etag' ].startswith ('W/' ):
961+ return stats ['etag' ]
962+ if not stats ['mdate' ] or not stats ['len' ]:
963+ raise ArtifactDownloadError ("Missing stats for file " + filename )
964+ from email .utils import parsedate_to_datetime
965+ return struct .pack ('=dL' , parsedate_to_datetime (stats ['mdate' ]).timestamp (), stats ['len' ])
966+
967+ def _getAudit (self , filename ):
968+ downloader = self ._webdav .getPartialDownloader ("/" .join ([self .__url .path , filename ]))
969+ while True :
970+ try :
971+ file = io .BytesIO (downloader .get ())
972+ return self ._extractAudit (fileobj = file )
973+ except (EOFError , tarfile .ReadError ):
974+ # partial downloader reached EOF or could not extract the audit from the tarfile, so we get more data
975+ downloader .more ()
976+ pass
977+
978+ def _getArchiveUri (self ):
979+ return self .__url .netloc + self .__url .path
980+
981+ def _getArchiveName (self ):
982+ return "http archive {}" .format (self .__url .netloc + self .__url .path )
816983
817984class HttpDownloader :
818985 def __init__ (self , archiver , response ):
0 commit comments