timed.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. import time
  2. import typing
  3. import typing as _t
  4. from datetime import datetime
  5. from datetime import timezone
  6. from .encoding import base64_decode
  7. from .encoding import base64_encode
  8. from .encoding import bytes_to_int
  9. from .encoding import int_to_bytes
  10. from .encoding import want_bytes
  11. from .exc import BadSignature
  12. from .exc import BadTimeSignature
  13. from .exc import SignatureExpired
  14. from .serializer import Serializer
  15. from .signer import Signer
  16. _t_str_bytes = _t.Union[str, bytes]
  17. _t_opt_str_bytes = _t.Optional[_t_str_bytes]
  18. _t_opt_int = _t.Optional[int]
  19. if _t.TYPE_CHECKING:
  20. import typing_extensions as _te
  21. class TimestampSigner(Signer):
  22. """Works like the regular :class:`.Signer` but also records the time
  23. of the signing and can be used to expire signatures. The
  24. :meth:`unsign` method can raise :exc:`.SignatureExpired` if the
  25. unsigning failed because the signature is expired.
  26. """
  27. def get_timestamp(self) -> int:
  28. """Returns the current timestamp. The function must return an
  29. integer.
  30. """
  31. return int(time.time())
  32. def timestamp_to_datetime(self, ts: int) -> datetime:
  33. """Convert the timestamp from :meth:`get_timestamp` into an
  34. aware :class`datetime.datetime` in UTC.
  35. .. versionchanged:: 2.0
  36. The timestamp is returned as a timezone-aware ``datetime``
  37. in UTC rather than a naive ``datetime`` assumed to be UTC.
  38. """
  39. return datetime.fromtimestamp(ts, tz=timezone.utc)
  40. def sign(self, value: _t_str_bytes) -> bytes:
  41. """Signs the given string and also attaches time information."""
  42. value = want_bytes(value)
  43. timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
  44. sep = want_bytes(self.sep)
  45. value = value + sep + timestamp
  46. return value + sep + self.get_signature(value)
  47. # Ignore overlapping signatures check, return_timestamp is the only
  48. # parameter that affects the return type.
  49. @typing.overload
  50. def unsign( # type: ignore
  51. self,
  52. signed_value: _t_str_bytes,
  53. max_age: _t_opt_int = None,
  54. return_timestamp: "_te.Literal[False]" = False,
  55. ) -> bytes:
  56. ...
  57. @typing.overload
  58. def unsign(
  59. self,
  60. signed_value: _t_str_bytes,
  61. max_age: _t_opt_int = None,
  62. return_timestamp: "_te.Literal[True]" = True,
  63. ) -> _t.Tuple[bytes, datetime]:
  64. ...
  65. def unsign(
  66. self,
  67. signed_value: _t_str_bytes,
  68. max_age: _t_opt_int = None,
  69. return_timestamp: bool = False,
  70. ) -> _t.Union[_t.Tuple[bytes, datetime], bytes]:
  71. """Works like the regular :meth:`.Signer.unsign` but can also
  72. validate the time. See the base docstring of the class for
  73. the general behavior. If ``return_timestamp`` is ``True`` the
  74. timestamp of the signature will be returned as an aware
  75. :class:`datetime.datetime` object in UTC.
  76. .. versionchanged:: 2.0
  77. The timestamp is returned as a timezone-aware ``datetime``
  78. in UTC rather than a naive ``datetime`` assumed to be UTC.
  79. """
  80. try:
  81. result = super().unsign(signed_value)
  82. sig_error = None
  83. except BadSignature as e:
  84. sig_error = e
  85. result = e.payload or b""
  86. sep = want_bytes(self.sep)
  87. # If there is no timestamp in the result there is something
  88. # seriously wrong. In case there was a signature error, we raise
  89. # that one directly, otherwise we have a weird situation in
  90. # which we shouldn't have come except someone uses a time-based
  91. # serializer on non-timestamp data, so catch that.
  92. if sep not in result:
  93. if sig_error:
  94. raise sig_error
  95. raise BadTimeSignature("timestamp missing", payload=result)
  96. value, ts_bytes = result.rsplit(sep, 1)
  97. ts_int: _t_opt_int = None
  98. ts_dt: _t.Optional[datetime] = None
  99. try:
  100. ts_int = bytes_to_int(base64_decode(ts_bytes))
  101. except Exception:
  102. pass
  103. # Signature is *not* okay. Raise a proper error now that we have
  104. # split the value and the timestamp.
  105. if sig_error is not None:
  106. if ts_int is not None:
  107. try:
  108. ts_dt = self.timestamp_to_datetime(ts_int)
  109. except (ValueError, OSError, OverflowError) as exc:
  110. # Windows raises OSError
  111. # 32-bit raises OverflowError
  112. raise BadTimeSignature(
  113. "Malformed timestamp", payload=value
  114. ) from exc
  115. raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt)
  116. # Signature was okay but the timestamp is actually not there or
  117. # malformed. Should not happen, but we handle it anyway.
  118. if ts_int is None:
  119. raise BadTimeSignature("Malformed timestamp", payload=value)
  120. # Check timestamp is not older than max_age
  121. if max_age is not None:
  122. age = self.get_timestamp() - ts_int
  123. if age > max_age:
  124. raise SignatureExpired(
  125. f"Signature age {age} > {max_age} seconds",
  126. payload=value,
  127. date_signed=self.timestamp_to_datetime(ts_int),
  128. )
  129. if age < 0:
  130. raise SignatureExpired(
  131. f"Signature age {age} < 0 seconds",
  132. payload=value,
  133. date_signed=self.timestamp_to_datetime(ts_int),
  134. )
  135. if return_timestamp:
  136. return value, self.timestamp_to_datetime(ts_int)
  137. return value
  138. def validate(self, signed_value: _t_str_bytes, max_age: _t_opt_int = None) -> bool:
  139. """Only validates the given signed value. Returns ``True`` if
  140. the signature exists and is valid."""
  141. try:
  142. self.unsign(signed_value, max_age=max_age)
  143. return True
  144. except BadSignature:
  145. return False
  146. class TimedSerializer(Serializer):
  147. """Uses :class:`TimestampSigner` instead of the default
  148. :class:`.Signer`.
  149. """
  150. default_signer: _t.Type[TimestampSigner] = TimestampSigner
  151. def iter_unsigners(
  152. self, salt: _t_opt_str_bytes = None
  153. ) -> _t.Iterator[TimestampSigner]:
  154. return _t.cast("_t.Iterator[TimestampSigner]", super().iter_unsigners(salt))
  155. # TODO: Signature is incompatible because parameters were added
  156. # before salt.
  157. def loads( # type: ignore
  158. self,
  159. s: _t_str_bytes,
  160. max_age: _t_opt_int = None,
  161. return_timestamp: bool = False,
  162. salt: _t_opt_str_bytes = None,
  163. ) -> _t.Any:
  164. """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the
  165. signature validation fails. If a ``max_age`` is provided it will
  166. ensure the signature is not older than that time in seconds. In
  167. case the signature is outdated, :exc:`.SignatureExpired` is
  168. raised. All arguments are forwarded to the signer's
  169. :meth:`~TimestampSigner.unsign` method.
  170. """
  171. s = want_bytes(s)
  172. last_exception = None
  173. for signer in self.iter_unsigners(salt):
  174. try:
  175. base64d, timestamp = signer.unsign(
  176. s, max_age=max_age, return_timestamp=True
  177. )
  178. payload = self.load_payload(base64d)
  179. if return_timestamp:
  180. return payload, timestamp
  181. return payload
  182. except SignatureExpired:
  183. # The signature was unsigned successfully but was
  184. # expired. Do not try the next signer.
  185. raise
  186. except BadSignature as err:
  187. last_exception = err
  188. raise _t.cast(BadSignature, last_exception)
  189. def loads_unsafe( # type: ignore
  190. self,
  191. s: _t_str_bytes,
  192. max_age: _t_opt_int = None,
  193. salt: _t_opt_str_bytes = None,
  194. ) -> _t.Tuple[bool, _t.Any]:
  195. return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age})