_internal.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. import logging
  2. import operator
  3. import re
  4. import string
  5. import sys
  6. import typing
  7. import typing as t
  8. from datetime import date
  9. from datetime import datetime
  10. from datetime import timezone
  11. from itertools import chain
  12. from weakref import WeakKeyDictionary
  13. if t.TYPE_CHECKING:
  14. from _typeshed.wsgi import StartResponse
  15. from _typeshed.wsgi import WSGIApplication
  16. from _typeshed.wsgi import WSGIEnvironment
  17. from .wrappers.request import Request # noqa: F401
  18. _logger: t.Optional[logging.Logger] = None
  19. _signature_cache = WeakKeyDictionary() # type: ignore
  20. _epoch_ord = date(1970, 1, 1).toordinal()
  21. _legal_cookie_chars = frozenset(
  22. c.encode("ascii")
  23. for c in f"{string.ascii_letters}{string.digits}/=!#$%&'*+-.^_`|~:"
  24. )
  25. _cookie_quoting_map = {b",": b"\\054", b";": b"\\073", b'"': b'\\"', b"\\": b"\\\\"}
  26. for _i in chain(range(32), range(127, 256)):
  27. _cookie_quoting_map[_i.to_bytes(1, sys.byteorder)] = f"\\{_i:03o}".encode("latin1")
  28. _octal_re = re.compile(rb"\\[0-3][0-7][0-7]")
  29. _quote_re = re.compile(rb"[\\].")
  30. _legal_cookie_chars_re = rb"[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]"
  31. _cookie_re = re.compile(
  32. rb"""
  33. (?P<key>[^=;]+)
  34. (?:\s*=\s*
  35. (?P<val>
  36. "(?:[^\\"]|\\.)*" |
  37. (?:.*?)
  38. )
  39. )?
  40. \s*;
  41. """,
  42. flags=re.VERBOSE,
  43. )
  44. class _Missing:
  45. def __repr__(self) -> str:
  46. return "no value"
  47. def __reduce__(self) -> str:
  48. return "_missing"
  49. _missing = _Missing()
  50. @typing.overload
  51. def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]:
  52. ...
  53. @typing.overload
  54. def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]:
  55. ...
  56. def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]:
  57. """Create a function that will be called with a string argument. If
  58. the reference is bytes, values will be encoded to bytes.
  59. """
  60. if isinstance(reference, str):
  61. return lambda x: x
  62. return operator.methodcaller("encode", "latin1")
  63. def _check_str_tuple(value: t.Tuple[t.AnyStr, ...]) -> None:
  64. """Ensure tuple items are all strings or all bytes."""
  65. if not value:
  66. return
  67. item_type = str if isinstance(value[0], str) else bytes
  68. if any(not isinstance(item, item_type) for item in value):
  69. raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})")
  70. _default_encoding = sys.getdefaultencoding()
  71. def _to_bytes(
  72. x: t.Union[str, bytes], charset: str = _default_encoding, errors: str = "strict"
  73. ) -> bytes:
  74. if x is None or isinstance(x, bytes):
  75. return x
  76. if isinstance(x, (bytearray, memoryview)):
  77. return bytes(x)
  78. if isinstance(x, str):
  79. return x.encode(charset, errors)
  80. raise TypeError("Expected bytes")
  81. @typing.overload
  82. def _to_str( # type: ignore
  83. x: None,
  84. charset: t.Optional[str] = ...,
  85. errors: str = ...,
  86. allow_none_charset: bool = ...,
  87. ) -> None:
  88. ...
  89. @typing.overload
  90. def _to_str(
  91. x: t.Any,
  92. charset: t.Optional[str] = ...,
  93. errors: str = ...,
  94. allow_none_charset: bool = ...,
  95. ) -> str:
  96. ...
  97. def _to_str(
  98. x: t.Optional[t.Any],
  99. charset: t.Optional[str] = _default_encoding,
  100. errors: str = "strict",
  101. allow_none_charset: bool = False,
  102. ) -> t.Optional[t.Union[str, bytes]]:
  103. if x is None or isinstance(x, str):
  104. return x
  105. if not isinstance(x, (bytes, bytearray)):
  106. return str(x)
  107. if charset is None:
  108. if allow_none_charset:
  109. return x
  110. return x.decode(charset, errors) # type: ignore
  111. def _wsgi_decoding_dance(
  112. s: str, charset: str = "utf-8", errors: str = "replace"
  113. ) -> str:
  114. return s.encode("latin1").decode(charset, errors)
  115. def _wsgi_encoding_dance(
  116. s: str, charset: str = "utf-8", errors: str = "replace"
  117. ) -> str:
  118. if isinstance(s, bytes):
  119. return s.decode("latin1", errors)
  120. return s.encode(charset).decode("latin1", errors)
  121. def _get_environ(obj: t.Union["WSGIEnvironment", "Request"]) -> "WSGIEnvironment":
  122. env = getattr(obj, "environ", obj)
  123. assert isinstance(
  124. env, dict
  125. ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)"
  126. return env
  127. def _has_level_handler(logger: logging.Logger) -> bool:
  128. """Check if there is a handler in the logging chain that will handle
  129. the given logger's effective level.
  130. """
  131. level = logger.getEffectiveLevel()
  132. current = logger
  133. while current:
  134. if any(handler.level <= level for handler in current.handlers):
  135. return True
  136. if not current.propagate:
  137. break
  138. current = current.parent # type: ignore
  139. return False
  140. class _ColorStreamHandler(logging.StreamHandler):
  141. """On Windows, wrap stream with Colorama for ANSI style support."""
  142. def __init__(self) -> None:
  143. try:
  144. import colorama
  145. except ImportError:
  146. stream = None
  147. else:
  148. stream = colorama.AnsiToWin32(sys.stderr)
  149. super().__init__(stream)
  150. def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None:
  151. """Log a message to the 'werkzeug' logger.
  152. The logger is created the first time it is needed. If there is no
  153. level set, it is set to :data:`logging.INFO`. If there is no handler
  154. for the logger's effective level, a :class:`logging.StreamHandler`
  155. is added.
  156. """
  157. global _logger
  158. if _logger is None:
  159. _logger = logging.getLogger("werkzeug")
  160. if _logger.level == logging.NOTSET:
  161. _logger.setLevel(logging.INFO)
  162. if not _has_level_handler(_logger):
  163. _logger.addHandler(_ColorStreamHandler())
  164. getattr(_logger, type)(message.rstrip(), *args, **kwargs)
  165. @typing.overload
  166. def _dt_as_utc(dt: None) -> None:
  167. ...
  168. @typing.overload
  169. def _dt_as_utc(dt: datetime) -> datetime:
  170. ...
  171. def _dt_as_utc(dt: t.Optional[datetime]) -> t.Optional[datetime]:
  172. if dt is None:
  173. return dt
  174. if dt.tzinfo is None:
  175. return dt.replace(tzinfo=timezone.utc)
  176. elif dt.tzinfo != timezone.utc:
  177. return dt.astimezone(timezone.utc)
  178. return dt
  179. _TAccessorValue = t.TypeVar("_TAccessorValue")
  180. class _DictAccessorProperty(t.Generic[_TAccessorValue]):
  181. """Baseclass for `environ_property` and `header_property`."""
  182. read_only = False
  183. def __init__(
  184. self,
  185. name: str,
  186. default: t.Optional[_TAccessorValue] = None,
  187. load_func: t.Optional[t.Callable[[str], _TAccessorValue]] = None,
  188. dump_func: t.Optional[t.Callable[[_TAccessorValue], str]] = None,
  189. read_only: t.Optional[bool] = None,
  190. doc: t.Optional[str] = None,
  191. ) -> None:
  192. self.name = name
  193. self.default = default
  194. self.load_func = load_func
  195. self.dump_func = dump_func
  196. if read_only is not None:
  197. self.read_only = read_only
  198. self.__doc__ = doc
  199. def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]:
  200. raise NotImplementedError
  201. @typing.overload
  202. def __get__(
  203. self, instance: None, owner: type
  204. ) -> "_DictAccessorProperty[_TAccessorValue]":
  205. ...
  206. @typing.overload
  207. def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue:
  208. ...
  209. def __get__(
  210. self, instance: t.Optional[t.Any], owner: type
  211. ) -> t.Union[_TAccessorValue, "_DictAccessorProperty[_TAccessorValue]"]:
  212. if instance is None:
  213. return self
  214. storage = self.lookup(instance)
  215. if self.name not in storage:
  216. return self.default # type: ignore
  217. value = storage[self.name]
  218. if self.load_func is not None:
  219. try:
  220. return self.load_func(value)
  221. except (ValueError, TypeError):
  222. return self.default # type: ignore
  223. return value # type: ignore
  224. def __set__(self, instance: t.Any, value: _TAccessorValue) -> None:
  225. if self.read_only:
  226. raise AttributeError("read only property")
  227. if self.dump_func is not None:
  228. self.lookup(instance)[self.name] = self.dump_func(value)
  229. else:
  230. self.lookup(instance)[self.name] = value
  231. def __delete__(self, instance: t.Any) -> None:
  232. if self.read_only:
  233. raise AttributeError("read only property")
  234. self.lookup(instance).pop(self.name, None)
  235. def __repr__(self) -> str:
  236. return f"<{type(self).__name__} {self.name}>"
  237. def _cookie_quote(b: bytes) -> bytes:
  238. buf = bytearray()
  239. all_legal = True
  240. _lookup = _cookie_quoting_map.get
  241. _push = buf.extend
  242. for char_int in b:
  243. char = char_int.to_bytes(1, sys.byteorder)
  244. if char not in _legal_cookie_chars:
  245. all_legal = False
  246. char = _lookup(char, char)
  247. _push(char)
  248. if all_legal:
  249. return bytes(buf)
  250. return bytes(b'"' + buf + b'"')
  251. def _cookie_unquote(b: bytes) -> bytes:
  252. if len(b) < 2:
  253. return b
  254. if b[:1] != b'"' or b[-1:] != b'"':
  255. return b
  256. b = b[1:-1]
  257. i = 0
  258. n = len(b)
  259. rv = bytearray()
  260. _push = rv.extend
  261. while 0 <= i < n:
  262. o_match = _octal_re.search(b, i)
  263. q_match = _quote_re.search(b, i)
  264. if not o_match and not q_match:
  265. rv.extend(b[i:])
  266. break
  267. j = k = -1
  268. if o_match:
  269. j = o_match.start(0)
  270. if q_match:
  271. k = q_match.start(0)
  272. if q_match and (not o_match or k < j):
  273. _push(b[i:k])
  274. _push(b[k + 1 : k + 2])
  275. i = k + 2
  276. else:
  277. _push(b[i:j])
  278. rv.append(int(b[j + 1 : j + 4], 8))
  279. i = j + 4
  280. return bytes(rv)
  281. def _cookie_parse_impl(b: bytes) -> t.Iterator[t.Tuple[bytes, bytes]]:
  282. """Lowlevel cookie parsing facility that operates on bytes."""
  283. i = 0
  284. n = len(b)
  285. while i < n:
  286. match = _cookie_re.search(b + b";", i)
  287. if not match:
  288. break
  289. key = match.group("key").strip()
  290. value = match.group("val") or b""
  291. i = match.end(0)
  292. yield key, _cookie_unquote(value)
  293. def _encode_idna(domain: str) -> bytes:
  294. # If we're given bytes, make sure they fit into ASCII
  295. if isinstance(domain, bytes):
  296. domain.decode("ascii")
  297. return domain
  298. # Otherwise check if it's already ascii, then return
  299. try:
  300. return domain.encode("ascii")
  301. except UnicodeError:
  302. pass
  303. # Otherwise encode each part separately
  304. return b".".join(p.encode("idna") for p in domain.split("."))
  305. def _decode_idna(domain: t.Union[str, bytes]) -> str:
  306. # If the input is a string try to encode it to ascii to do the idna
  307. # decoding. If that fails because of a unicode error, then we
  308. # already have a decoded idna domain.
  309. if isinstance(domain, str):
  310. try:
  311. domain = domain.encode("ascii")
  312. except UnicodeError:
  313. return domain # type: ignore
  314. # Decode each part separately. If a part fails, try to decode it
  315. # with ascii and silently ignore errors. This makes sense because
  316. # the idna codec does not have error handling.
  317. def decode_part(part: bytes) -> str:
  318. try:
  319. return part.decode("idna")
  320. except UnicodeError:
  321. return part.decode("ascii", "ignore")
  322. return ".".join(decode_part(p) for p in domain.split(b"."))
  323. @typing.overload
  324. def _make_cookie_domain(domain: None) -> None:
  325. ...
  326. @typing.overload
  327. def _make_cookie_domain(domain: str) -> bytes:
  328. ...
  329. def _make_cookie_domain(domain: t.Optional[str]) -> t.Optional[bytes]:
  330. if domain is None:
  331. return None
  332. domain = _encode_idna(domain)
  333. if b":" in domain:
  334. domain = domain.split(b":", 1)[0]
  335. if b"." in domain:
  336. return domain
  337. raise ValueError(
  338. "Setting 'domain' for a cookie on a server running locally (ex: "
  339. "localhost) is not supported by complying browsers. You should "
  340. "have something like: '127.0.0.1 localhost dev.localhost' on "
  341. "your hosts file and then point your server to run on "
  342. "'dev.localhost' and also set 'domain' for 'dev.localhost'"
  343. )
  344. def _easteregg(app: t.Optional["WSGIApplication"] = None) -> "WSGIApplication":
  345. """Like the name says. But who knows how it works?"""
  346. def bzzzzzzz(gyver: bytes) -> str:
  347. import base64
  348. import zlib
  349. return zlib.decompress(base64.b64decode(gyver)).decode("ascii")
  350. gyver = "\n".join(
  351. [
  352. x + (77 - len(x)) * " "
  353. for x in bzzzzzzz(
  354. b"""
  355. eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m
  356. 9Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz
  357. 4QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY
  358. jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc
  359. q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5
  360. jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317
  361. 8P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ
  362. v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r
  363. XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu
  364. LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk
  365. iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI
  366. tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE
  367. 1R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI
  368. GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN
  369. Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A
  370. QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG
  371. 8MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu
  372. jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz
  373. DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8
  374. MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4
  375. GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i
  376. RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi
  377. Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c
  378. NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS
  379. pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ
  380. sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm
  381. p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ
  382. krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/
  383. nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt
  384. mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p
  385. 7f2zLkGNv8b191cD/3vs9Q833z8t"""
  386. ).splitlines()
  387. ]
  388. )
  389. def easteregged(
  390. environ: "WSGIEnvironment", start_response: "StartResponse"
  391. ) -> t.Iterable[bytes]:
  392. def injecting_start_response(
  393. status: str, headers: t.List[t.Tuple[str, str]], exc_info: t.Any = None
  394. ) -> t.Callable[[bytes], t.Any]:
  395. headers.append(("X-Powered-By", "Werkzeug"))
  396. return start_response(status, headers, exc_info)
  397. if app is not None and environ.get("QUERY_STRING") != "macgybarchakku":
  398. return app(environ, injecting_start_response)
  399. injecting_start_response("200 OK", [("Content-Type", "text/html")])
  400. return [
  401. f"""\
  402. <!doctype html>
  403. <html lang=en>
  404. <head>
  405. <title>About Werkzeug</title>
  406. <style type="text/css">
  407. body {{ font: 15px Georgia, serif; text-align: center; }}
  408. a {{ color: #333; text-decoration: none; }}
  409. h1 {{ font-size: 30px; margin: 20px 0 10px 0; }}
  410. p {{ margin: 0 0 30px 0; }}
  411. pre {{ font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; }}
  412. </style>
  413. </head>
  414. <body>
  415. <h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1>
  416. <p>the Swiss Army knife of Python web development.</p>
  417. <pre>{gyver}\n\n\n</pre>
  418. </body>
  419. </html>""".encode(
  420. "latin1"
  421. )
  422. ]
  423. return easteregged