_compat.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. import codecs
  2. import io
  3. import os
  4. import re
  5. import sys
  6. import typing as t
  7. from weakref import WeakKeyDictionary
  8. CYGWIN = sys.platform.startswith("cygwin")
  9. MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
  10. # Determine local App Engine environment, per Google's own suggestion
  11. APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
  12. "SERVER_SOFTWARE", ""
  13. )
  14. WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
  15. auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
  16. _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
  17. def get_filesystem_encoding() -> str:
  18. return sys.getfilesystemencoding() or sys.getdefaultencoding()
  19. def _make_text_stream(
  20. stream: t.BinaryIO,
  21. encoding: t.Optional[str],
  22. errors: t.Optional[str],
  23. force_readable: bool = False,
  24. force_writable: bool = False,
  25. ) -> t.TextIO:
  26. if encoding is None:
  27. encoding = get_best_encoding(stream)
  28. if errors is None:
  29. errors = "replace"
  30. return _NonClosingTextIOWrapper(
  31. stream,
  32. encoding,
  33. errors,
  34. line_buffering=True,
  35. force_readable=force_readable,
  36. force_writable=force_writable,
  37. )
  38. def is_ascii_encoding(encoding: str) -> bool:
  39. """Checks if a given encoding is ascii."""
  40. try:
  41. return codecs.lookup(encoding).name == "ascii"
  42. except LookupError:
  43. return False
  44. def get_best_encoding(stream: t.IO) -> str:
  45. """Returns the default stream encoding if not found."""
  46. rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
  47. if is_ascii_encoding(rv):
  48. return "utf-8"
  49. return rv
  50. class _NonClosingTextIOWrapper(io.TextIOWrapper):
  51. def __init__(
  52. self,
  53. stream: t.BinaryIO,
  54. encoding: t.Optional[str],
  55. errors: t.Optional[str],
  56. force_readable: bool = False,
  57. force_writable: bool = False,
  58. **extra: t.Any,
  59. ) -> None:
  60. self._stream = stream = t.cast(
  61. t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
  62. )
  63. super().__init__(stream, encoding, errors, **extra)
  64. def __del__(self) -> None:
  65. try:
  66. self.detach()
  67. except Exception:
  68. pass
  69. def isatty(self) -> bool:
  70. # https://bitbucket.org/pypy/pypy/issue/1803
  71. return self._stream.isatty()
  72. class _FixupStream:
  73. """The new io interface needs more from streams than streams
  74. traditionally implement. As such, this fix-up code is necessary in
  75. some circumstances.
  76. The forcing of readable and writable flags are there because some tools
  77. put badly patched objects on sys (one such offender are certain version
  78. of jupyter notebook).
  79. """
  80. def __init__(
  81. self,
  82. stream: t.BinaryIO,
  83. force_readable: bool = False,
  84. force_writable: bool = False,
  85. ):
  86. self._stream = stream
  87. self._force_readable = force_readable
  88. self._force_writable = force_writable
  89. def __getattr__(self, name: str) -> t.Any:
  90. return getattr(self._stream, name)
  91. def read1(self, size: int) -> bytes:
  92. f = getattr(self._stream, "read1", None)
  93. if f is not None:
  94. return t.cast(bytes, f(size))
  95. return self._stream.read(size)
  96. def readable(self) -> bool:
  97. if self._force_readable:
  98. return True
  99. x = getattr(self._stream, "readable", None)
  100. if x is not None:
  101. return t.cast(bool, x())
  102. try:
  103. self._stream.read(0)
  104. except Exception:
  105. return False
  106. return True
  107. def writable(self) -> bool:
  108. if self._force_writable:
  109. return True
  110. x = getattr(self._stream, "writable", None)
  111. if x is not None:
  112. return t.cast(bool, x())
  113. try:
  114. self._stream.write("") # type: ignore
  115. except Exception:
  116. try:
  117. self._stream.write(b"")
  118. except Exception:
  119. return False
  120. return True
  121. def seekable(self) -> bool:
  122. x = getattr(self._stream, "seekable", None)
  123. if x is not None:
  124. return t.cast(bool, x())
  125. try:
  126. self._stream.seek(self._stream.tell())
  127. except Exception:
  128. return False
  129. return True
  130. def _is_binary_reader(stream: t.IO, default: bool = False) -> bool:
  131. try:
  132. return isinstance(stream.read(0), bytes)
  133. except Exception:
  134. return default
  135. # This happens in some cases where the stream was already
  136. # closed. In this case, we assume the default.
  137. def _is_binary_writer(stream: t.IO, default: bool = False) -> bool:
  138. try:
  139. stream.write(b"")
  140. except Exception:
  141. try:
  142. stream.write("")
  143. return False
  144. except Exception:
  145. pass
  146. return default
  147. return True
  148. def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]:
  149. # We need to figure out if the given stream is already binary.
  150. # This can happen because the official docs recommend detaching
  151. # the streams to get binary streams. Some code might do this, so
  152. # we need to deal with this case explicitly.
  153. if _is_binary_reader(stream, False):
  154. return t.cast(t.BinaryIO, stream)
  155. buf = getattr(stream, "buffer", None)
  156. # Same situation here; this time we assume that the buffer is
  157. # actually binary in case it's closed.
  158. if buf is not None and _is_binary_reader(buf, True):
  159. return t.cast(t.BinaryIO, buf)
  160. return None
  161. def _find_binary_writer(stream: t.IO) -> t.Optional[t.BinaryIO]:
  162. # We need to figure out if the given stream is already binary.
  163. # This can happen because the official docs recommend detaching
  164. # the streams to get binary streams. Some code might do this, so
  165. # we need to deal with this case explicitly.
  166. if _is_binary_writer(stream, False):
  167. return t.cast(t.BinaryIO, stream)
  168. buf = getattr(stream, "buffer", None)
  169. # Same situation here; this time we assume that the buffer is
  170. # actually binary in case it's closed.
  171. if buf is not None and _is_binary_writer(buf, True):
  172. return t.cast(t.BinaryIO, buf)
  173. return None
  174. def _stream_is_misconfigured(stream: t.TextIO) -> bool:
  175. """A stream is misconfigured if its encoding is ASCII."""
  176. # If the stream does not have an encoding set, we assume it's set
  177. # to ASCII. This appears to happen in certain unittest
  178. # environments. It's not quite clear what the correct behavior is
  179. # but this at least will force Click to recover somehow.
  180. return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
  181. def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
  182. """A stream attribute is compatible if it is equal to the
  183. desired value or the desired value is unset and the attribute
  184. has a value.
  185. """
  186. stream_value = getattr(stream, attr, None)
  187. return stream_value == value or (value is None and stream_value is not None)
  188. def _is_compatible_text_stream(
  189. stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
  190. ) -> bool:
  191. """Check if a stream's encoding and errors attributes are
  192. compatible with the desired values.
  193. """
  194. return _is_compat_stream_attr(
  195. stream, "encoding", encoding
  196. ) and _is_compat_stream_attr(stream, "errors", errors)
  197. def _force_correct_text_stream(
  198. text_stream: t.IO,
  199. encoding: t.Optional[str],
  200. errors: t.Optional[str],
  201. is_binary: t.Callable[[t.IO, bool], bool],
  202. find_binary: t.Callable[[t.IO], t.Optional[t.BinaryIO]],
  203. force_readable: bool = False,
  204. force_writable: bool = False,
  205. ) -> t.TextIO:
  206. if is_binary(text_stream, False):
  207. binary_reader = t.cast(t.BinaryIO, text_stream)
  208. else:
  209. text_stream = t.cast(t.TextIO, text_stream)
  210. # If the stream looks compatible, and won't default to a
  211. # misconfigured ascii encoding, return it as-is.
  212. if _is_compatible_text_stream(text_stream, encoding, errors) and not (
  213. encoding is None and _stream_is_misconfigured(text_stream)
  214. ):
  215. return text_stream
  216. # Otherwise, get the underlying binary reader.
  217. possible_binary_reader = find_binary(text_stream)
  218. # If that's not possible, silently use the original reader
  219. # and get mojibake instead of exceptions.
  220. if possible_binary_reader is None:
  221. return text_stream
  222. binary_reader = possible_binary_reader
  223. # Default errors to replace instead of strict in order to get
  224. # something that works.
  225. if errors is None:
  226. errors = "replace"
  227. # Wrap the binary stream in a text stream with the correct
  228. # encoding parameters.
  229. return _make_text_stream(
  230. binary_reader,
  231. encoding,
  232. errors,
  233. force_readable=force_readable,
  234. force_writable=force_writable,
  235. )
  236. def _force_correct_text_reader(
  237. text_reader: t.IO,
  238. encoding: t.Optional[str],
  239. errors: t.Optional[str],
  240. force_readable: bool = False,
  241. ) -> t.TextIO:
  242. return _force_correct_text_stream(
  243. text_reader,
  244. encoding,
  245. errors,
  246. _is_binary_reader,
  247. _find_binary_reader,
  248. force_readable=force_readable,
  249. )
  250. def _force_correct_text_writer(
  251. text_writer: t.IO,
  252. encoding: t.Optional[str],
  253. errors: t.Optional[str],
  254. force_writable: bool = False,
  255. ) -> t.TextIO:
  256. return _force_correct_text_stream(
  257. text_writer,
  258. encoding,
  259. errors,
  260. _is_binary_writer,
  261. _find_binary_writer,
  262. force_writable=force_writable,
  263. )
  264. def get_binary_stdin() -> t.BinaryIO:
  265. reader = _find_binary_reader(sys.stdin)
  266. if reader is None:
  267. raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
  268. return reader
  269. def get_binary_stdout() -> t.BinaryIO:
  270. writer = _find_binary_writer(sys.stdout)
  271. if writer is None:
  272. raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
  273. return writer
  274. def get_binary_stderr() -> t.BinaryIO:
  275. writer = _find_binary_writer(sys.stderr)
  276. if writer is None:
  277. raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
  278. return writer
  279. def get_text_stdin(
  280. encoding: t.Optional[str] = None, errors: t.Optional[str] = None
  281. ) -> t.TextIO:
  282. rv = _get_windows_console_stream(sys.stdin, encoding, errors)
  283. if rv is not None:
  284. return rv
  285. return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
  286. def get_text_stdout(
  287. encoding: t.Optional[str] = None, errors: t.Optional[str] = None
  288. ) -> t.TextIO:
  289. rv = _get_windows_console_stream(sys.stdout, encoding, errors)
  290. if rv is not None:
  291. return rv
  292. return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
  293. def get_text_stderr(
  294. encoding: t.Optional[str] = None, errors: t.Optional[str] = None
  295. ) -> t.TextIO:
  296. rv = _get_windows_console_stream(sys.stderr, encoding, errors)
  297. if rv is not None:
  298. return rv
  299. return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
  300. def _wrap_io_open(
  301. file: t.Union[str, os.PathLike, int],
  302. mode: str,
  303. encoding: t.Optional[str],
  304. errors: t.Optional[str],
  305. ) -> t.IO:
  306. """Handles not passing ``encoding`` and ``errors`` in binary mode."""
  307. if "b" in mode:
  308. return open(file, mode)
  309. return open(file, mode, encoding=encoding, errors=errors)
  310. def open_stream(
  311. filename: str,
  312. mode: str = "r",
  313. encoding: t.Optional[str] = None,
  314. errors: t.Optional[str] = "strict",
  315. atomic: bool = False,
  316. ) -> t.Tuple[t.IO, bool]:
  317. binary = "b" in mode
  318. # Standard streams first. These are simple because they ignore the
  319. # atomic flag. Use fsdecode to handle Path("-").
  320. if os.fsdecode(filename) == "-":
  321. if any(m in mode for m in ["w", "a", "x"]):
  322. if binary:
  323. return get_binary_stdout(), False
  324. return get_text_stdout(encoding=encoding, errors=errors), False
  325. if binary:
  326. return get_binary_stdin(), False
  327. return get_text_stdin(encoding=encoding, errors=errors), False
  328. # Non-atomic writes directly go out through the regular open functions.
  329. if not atomic:
  330. return _wrap_io_open(filename, mode, encoding, errors), True
  331. # Some usability stuff for atomic writes
  332. if "a" in mode:
  333. raise ValueError(
  334. "Appending to an existing file is not supported, because that"
  335. " would involve an expensive `copy`-operation to a temporary"
  336. " file. Open the file in normal `w`-mode and copy explicitly"
  337. " if that's what you're after."
  338. )
  339. if "x" in mode:
  340. raise ValueError("Use the `overwrite`-parameter instead.")
  341. if "w" not in mode:
  342. raise ValueError("Atomic writes only make sense with `w`-mode.")
  343. # Atomic writes are more complicated. They work by opening a file
  344. # as a proxy in the same folder and then using the fdopen
  345. # functionality to wrap it in a Python file. Then we wrap it in an
  346. # atomic file that moves the file over on close.
  347. import errno
  348. import random
  349. try:
  350. perm: t.Optional[int] = os.stat(filename).st_mode
  351. except OSError:
  352. perm = None
  353. flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
  354. if binary:
  355. flags |= getattr(os, "O_BINARY", 0)
  356. while True:
  357. tmp_filename = os.path.join(
  358. os.path.dirname(filename),
  359. f".__atomic-write{random.randrange(1 << 32):08x}",
  360. )
  361. try:
  362. fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
  363. break
  364. except OSError as e:
  365. if e.errno == errno.EEXIST or (
  366. os.name == "nt"
  367. and e.errno == errno.EACCES
  368. and os.path.isdir(e.filename)
  369. and os.access(e.filename, os.W_OK)
  370. ):
  371. continue
  372. raise
  373. if perm is not None:
  374. os.chmod(tmp_filename, perm) # in case perm includes bits in umask
  375. f = _wrap_io_open(fd, mode, encoding, errors)
  376. af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
  377. return t.cast(t.IO, af), True
  378. class _AtomicFile:
  379. def __init__(self, f: t.IO, tmp_filename: str, real_filename: str) -> None:
  380. self._f = f
  381. self._tmp_filename = tmp_filename
  382. self._real_filename = real_filename
  383. self.closed = False
  384. @property
  385. def name(self) -> str:
  386. return self._real_filename
  387. def close(self, delete: bool = False) -> None:
  388. if self.closed:
  389. return
  390. self._f.close()
  391. os.replace(self._tmp_filename, self._real_filename)
  392. self.closed = True
  393. def __getattr__(self, name: str) -> t.Any:
  394. return getattr(self._f, name)
  395. def __enter__(self) -> "_AtomicFile":
  396. return self
  397. def __exit__(self, exc_type, exc_value, tb): # type: ignore
  398. self.close(delete=exc_type is not None)
  399. def __repr__(self) -> str:
  400. return repr(self._f)
  401. def strip_ansi(value: str) -> str:
  402. return _ansi_re.sub("", value)
  403. def _is_jupyter_kernel_output(stream: t.IO) -> bool:
  404. while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
  405. stream = stream._stream
  406. return stream.__class__.__module__.startswith("ipykernel.")
  407. def should_strip_ansi(
  408. stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None
  409. ) -> bool:
  410. if color is None:
  411. if stream is None:
  412. stream = sys.stdin
  413. return not isatty(stream) and not _is_jupyter_kernel_output(stream)
  414. return not color
  415. # On Windows, wrap the output streams with colorama to support ANSI
  416. # color codes.
  417. # NOTE: double check is needed so mypy does not analyze this on Linux
  418. if sys.platform.startswith("win") and WIN:
  419. from ._winconsole import _get_windows_console_stream
  420. def _get_argv_encoding() -> str:
  421. import locale
  422. return locale.getpreferredencoding()
  423. _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
  424. def auto_wrap_for_ansi(
  425. stream: t.TextIO, color: t.Optional[bool] = None
  426. ) -> t.TextIO:
  427. """Support ANSI color and style codes on Windows by wrapping a
  428. stream with colorama.
  429. """
  430. try:
  431. cached = _ansi_stream_wrappers.get(stream)
  432. except Exception:
  433. cached = None
  434. if cached is not None:
  435. return cached
  436. import colorama
  437. strip = should_strip_ansi(stream, color)
  438. ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
  439. rv = t.cast(t.TextIO, ansi_wrapper.stream)
  440. _write = rv.write
  441. def _safe_write(s):
  442. try:
  443. return _write(s)
  444. except BaseException:
  445. ansi_wrapper.reset_all()
  446. raise
  447. rv.write = _safe_write
  448. try:
  449. _ansi_stream_wrappers[stream] = rv
  450. except Exception:
  451. pass
  452. return rv
  453. else:
  454. def _get_argv_encoding() -> str:
  455. return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
  456. def _get_windows_console_stream(
  457. f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
  458. ) -> t.Optional[t.TextIO]:
  459. return None
  460. def term_len(x: str) -> int:
  461. return len(strip_ansi(x))
  462. def isatty(stream: t.IO) -> bool:
  463. try:
  464. return stream.isatty()
  465. except Exception:
  466. return False
  467. def _make_cached_stream_func(
  468. src_func: t.Callable[[], t.TextIO], wrapper_func: t.Callable[[], t.TextIO]
  469. ) -> t.Callable[[], t.TextIO]:
  470. cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
  471. def func() -> t.TextIO:
  472. stream = src_func()
  473. try:
  474. rv = cache.get(stream)
  475. except Exception:
  476. rv = None
  477. if rv is not None:
  478. return rv
  479. rv = wrapper_func()
  480. try:
  481. cache[stream] = rv
  482. except Exception:
  483. pass
  484. return rv
  485. return func
  486. _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
  487. _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
  488. _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
  489. binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
  490. "stdin": get_binary_stdin,
  491. "stdout": get_binary_stdout,
  492. "stderr": get_binary_stderr,
  493. }
  494. text_streams: t.Mapping[
  495. str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
  496. ] = {
  497. "stdin": get_text_stdin,
  498. "stdout": get_text_stdout,
  499. "stderr": get_text_stderr,
  500. }