archive_util.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. """Utilities for extracting common archive formats"""
  2. import zipfile
  3. import tarfile
  4. import os
  5. import shutil
  6. import posixpath
  7. import contextlib
  8. from distutils.errors import DistutilsError
  9. from pkg_resources import ensure_directory
  10. __all__ = [
  11. "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
  12. "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
  13. ]
  14. class UnrecognizedFormat(DistutilsError):
  15. """Couldn't recognize the archive type"""
  16. def default_filter(src, dst):
  17. """The default progress/filter callback; returns True for all files"""
  18. return dst
  19. def unpack_archive(
  20. filename, extract_dir, progress_filter=default_filter,
  21. drivers=None):
  22. """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
  23. `progress_filter` is a function taking two arguments: a source path
  24. internal to the archive ('/'-separated), and a filesystem path where it
  25. will be extracted. The callback must return the desired extract path
  26. (which may be the same as the one passed in), or else ``None`` to skip
  27. that file or directory. The callback can thus be used to report on the
  28. progress of the extraction, as well as to filter the items extracted or
  29. alter their extraction paths.
  30. `drivers`, if supplied, must be a non-empty sequence of functions with the
  31. same signature as this function (minus the `drivers` argument), that raise
  32. ``UnrecognizedFormat`` if they do not support extracting the designated
  33. archive type. The `drivers` are tried in sequence until one is found that
  34. does not raise an error, or until all are exhausted (in which case
  35. ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
  36. drivers, the module's ``extraction_drivers`` constant will be used, which
  37. means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
  38. order.
  39. """
  40. for driver in drivers or extraction_drivers:
  41. try:
  42. driver(filename, extract_dir, progress_filter)
  43. except UnrecognizedFormat:
  44. continue
  45. else:
  46. return
  47. else:
  48. raise UnrecognizedFormat(
  49. "Not a recognized archive type: %s" % filename
  50. )
  51. def unpack_directory(filename, extract_dir, progress_filter=default_filter):
  52. """"Unpack" a directory, using the same interface as for archives
  53. Raises ``UnrecognizedFormat`` if `filename` is not a directory
  54. """
  55. if not os.path.isdir(filename):
  56. raise UnrecognizedFormat("%s is not a directory" % filename)
  57. paths = {
  58. filename: ('', extract_dir),
  59. }
  60. for base, dirs, files in os.walk(filename):
  61. src, dst = paths[base]
  62. for d in dirs:
  63. paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
  64. for f in files:
  65. target = os.path.join(dst, f)
  66. target = progress_filter(src + f, target)
  67. if not target:
  68. # skip non-files
  69. continue
  70. ensure_directory(target)
  71. f = os.path.join(base, f)
  72. shutil.copyfile(f, target)
  73. shutil.copystat(f, target)
  74. def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
  75. """Unpack zip `filename` to `extract_dir`
  76. Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
  77. by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
  78. of the `progress_filter` argument.
  79. """
  80. if not zipfile.is_zipfile(filename):
  81. raise UnrecognizedFormat("%s is not a zip file" % (filename,))
  82. with zipfile.ZipFile(filename) as z:
  83. for info in z.infolist():
  84. name = info.filename
  85. # don't extract absolute paths or ones with .. in them
  86. if name.startswith('/') or '..' in name.split('/'):
  87. continue
  88. target = os.path.join(extract_dir, *name.split('/'))
  89. target = progress_filter(name, target)
  90. if not target:
  91. continue
  92. if name.endswith('/'):
  93. # directory
  94. ensure_directory(target)
  95. else:
  96. # file
  97. ensure_directory(target)
  98. data = z.read(info.filename)
  99. with open(target, 'wb') as f:
  100. f.write(data)
  101. unix_attributes = info.external_attr >> 16
  102. if unix_attributes:
  103. os.chmod(target, unix_attributes)
  104. def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
  105. """Resolve any links and extract link targets as normal files."""
  106. while tar_member_obj is not None and (
  107. tar_member_obj.islnk() or tar_member_obj.issym()):
  108. linkpath = tar_member_obj.linkname
  109. if tar_member_obj.issym():
  110. base = posixpath.dirname(tar_member_obj.name)
  111. linkpath = posixpath.join(base, linkpath)
  112. linkpath = posixpath.normpath(linkpath)
  113. tar_member_obj = tar_obj._getmember(linkpath)
  114. is_file_or_dir = (
  115. tar_member_obj is not None and
  116. (tar_member_obj.isfile() or tar_member_obj.isdir())
  117. )
  118. if is_file_or_dir:
  119. return tar_member_obj
  120. raise LookupError('Got unknown file type')
  121. def _iter_open_tar(tar_obj, extract_dir, progress_filter):
  122. """Emit member-destination pairs from a tar archive."""
  123. # don't do any chowning!
  124. tar_obj.chown = lambda *args: None
  125. with contextlib.closing(tar_obj):
  126. for member in tar_obj:
  127. name = member.name
  128. # don't extract absolute paths or ones with .. in them
  129. if name.startswith('/') or '..' in name.split('/'):
  130. continue
  131. prelim_dst = os.path.join(extract_dir, *name.split('/'))
  132. try:
  133. member = _resolve_tar_file_or_dir(tar_obj, member)
  134. except LookupError:
  135. continue
  136. final_dst = progress_filter(name, prelim_dst)
  137. if not final_dst:
  138. continue
  139. if final_dst.endswith(os.sep):
  140. final_dst = final_dst[:-1]
  141. yield member, final_dst
  142. def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
  143. """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
  144. Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
  145. by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
  146. of the `progress_filter` argument.
  147. """
  148. try:
  149. tarobj = tarfile.open(filename)
  150. except tarfile.TarError as e:
  151. raise UnrecognizedFormat(
  152. "%s is not a compressed or uncompressed tar file" % (filename,)
  153. ) from e
  154. for member, final_dst in _iter_open_tar(
  155. tarobj, extract_dir, progress_filter,
  156. ):
  157. try:
  158. # XXX Ugh
  159. tarobj._extract_member(member, final_dst)
  160. except tarfile.ExtractError:
  161. # chown/chmod/mkfifo/mknode/makedev failed
  162. pass
  163. return True
  164. extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile