123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- import typing as t
- from datetime import datetime
- from datetime import timedelta
- from datetime import timezone
- from http import HTTPStatus
- from .._internal import _to_str
- from ..datastructures import Headers
- from ..datastructures import HeaderSet
- from ..http import dump_cookie
- from ..http import HTTP_STATUS_CODES
- from ..utils import get_content_type
- from werkzeug.datastructures import CallbackDict
- from werkzeug.datastructures import ContentRange
- from werkzeug.datastructures import ContentSecurityPolicy
- from werkzeug.datastructures import ResponseCacheControl
- from werkzeug.datastructures import WWWAuthenticate
- from werkzeug.http import COEP
- from werkzeug.http import COOP
- from werkzeug.http import dump_age
- from werkzeug.http import dump_header
- from werkzeug.http import dump_options_header
- from werkzeug.http import http_date
- from werkzeug.http import parse_age
- from werkzeug.http import parse_cache_control_header
- from werkzeug.http import parse_content_range_header
- from werkzeug.http import parse_csp_header
- from werkzeug.http import parse_date
- from werkzeug.http import parse_options_header
- from werkzeug.http import parse_set_header
- from werkzeug.http import parse_www_authenticate_header
- from werkzeug.http import quote_etag
- from werkzeug.http import unquote_etag
- from werkzeug.utils import header_property
- def _set_property(name: str, doc: t.Optional[str] = None) -> property:
- def fget(self: "Response") -> HeaderSet:
- def on_update(header_set: HeaderSet) -> None:
- if not header_set and name in self.headers:
- del self.headers[name]
- elif header_set:
- self.headers[name] = header_set.to_header()
- return parse_set_header(self.headers.get(name), on_update)
- def fset(
- self: "Response",
- value: t.Optional[
- t.Union[str, t.Dict[str, t.Union[str, int]], t.Iterable[str]]
- ],
- ) -> None:
- if not value:
- del self.headers[name]
- elif isinstance(value, str):
- self.headers[name] = value
- else:
- self.headers[name] = dump_header(value)
- return property(fget, fset, doc=doc)
- class Response:
- """Represents the non-IO parts of an HTTP response, specifically the
- status and headers but not the body.
- This class is not meant for general use. It should only be used when
- implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
- provides a WSGI implementation at :cls:`werkzeug.wrappers.Response`.
- :param status: The status code for the response. Either an int, in
- which case the default status message is added, or a string in
- the form ``{code} {message}``, like ``404 Not Found``. Defaults
- to 200.
- :param headers: A :class:`~werkzeug.datastructures.Headers` object,
- or a list of ``(key, value)`` tuples that will be converted to a
- ``Headers`` object.
- :param mimetype: The mime type (content type without charset or
- other parameters) of the response. If the value starts with
- ``text/`` (or matches some other special cases), the charset
- will be added to create the ``content_type``.
- :param content_type: The full content type of the response.
- Overrides building the value from ``mimetype``.
- .. versionadded:: 2.0
- """
- #: the charset of the response.
- charset = "utf-8"
- #: the default status if none is provided.
- default_status = 200
- #: the default mimetype if none is provided.
- default_mimetype: t.Optional[str] = "text/plain"
- #: Warn if a cookie header exceeds this size. The default, 4093, should be
- #: safely `supported by most browsers <cookie_>`_. A cookie larger than
- #: this size will still be sent, but it may be ignored or handled
- #: incorrectly by some browsers. Set to 0 to disable this check.
- #:
- #: .. versionadded:: 0.13
- #:
- #: .. _`cookie`: http://browsercookielimits.squawky.net/
- max_cookie_size = 4093
- # A :class:`Headers` object representing the response headers.
- headers: Headers
- def __init__(
- self,
- status: t.Optional[t.Union[int, str, HTTPStatus]] = None,
- headers: t.Optional[
- t.Union[
- t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]],
- t.Iterable[t.Tuple[str, t.Union[str, int]]],
- ]
- ] = None,
- mimetype: t.Optional[str] = None,
- content_type: t.Optional[str] = None,
- ) -> None:
- if isinstance(headers, Headers):
- self.headers = headers
- elif not headers:
- self.headers = Headers()
- else:
- self.headers = Headers(headers)
- if content_type is None:
- if mimetype is None and "content-type" not in self.headers:
- mimetype = self.default_mimetype
- if mimetype is not None:
- mimetype = get_content_type(mimetype, self.charset)
- content_type = mimetype
- if content_type is not None:
- self.headers["Content-Type"] = content_type
- if status is None:
- status = self.default_status
- self.status = status # type: ignore
- def __repr__(self) -> str:
- return f"<{type(self).__name__} [{self.status}]>"
- @property
- def status_code(self) -> int:
- """The HTTP status code as a number."""
- return self._status_code
- @status_code.setter
- def status_code(self, code: int) -> None:
- self.status = code # type: ignore
- @property
- def status(self) -> str:
- """The HTTP status code as a string."""
- return self._status
- @status.setter
- def status(self, value: t.Union[str, int, HTTPStatus]) -> None:
- if not isinstance(value, (str, bytes, int, HTTPStatus)):
- raise TypeError("Invalid status argument")
- self._status, self._status_code = self._clean_status(value)
- def _clean_status(self, value: t.Union[str, int, HTTPStatus]) -> t.Tuple[str, int]:
- if isinstance(value, HTTPStatus):
- value = int(value)
- status = _to_str(value, self.charset)
- split_status = status.split(None, 1)
- if len(split_status) == 0:
- raise ValueError("Empty status argument")
- try:
- status_code = int(split_status[0])
- except ValueError:
- # only message
- return f"0 {status}", 0
- if len(split_status) > 1:
- # code and message
- return status, status_code
- # only code, look up message
- try:
- status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}"
- except KeyError:
- status = f"{status_code} UNKNOWN"
- return status, status_code
- def set_cookie(
- self,
- key: str,
- value: str = "",
- max_age: t.Optional[t.Union[timedelta, int]] = None,
- expires: t.Optional[t.Union[str, datetime, int, float]] = None,
- path: t.Optional[str] = "/",
- domain: t.Optional[str] = None,
- secure: bool = False,
- httponly: bool = False,
- samesite: t.Optional[str] = None,
- ) -> None:
- """Sets a cookie.
- A warning is raised if the size of the cookie header exceeds
- :attr:`max_cookie_size`, but the header will still be set.
- :param key: the key (name) of the cookie to be set.
- :param value: the value of the cookie.
- :param max_age: should be a number of seconds, or `None` (default) if
- the cookie should last only as long as the client's
- browser session.
- :param expires: should be a `datetime` object or UNIX timestamp.
- :param path: limits the cookie to a given path, per default it will
- span the whole domain.
- :param domain: if you want to set a cross-domain cookie. For example,
- ``domain=".example.com"`` will set a cookie that is
- readable by the domain ``www.example.com``,
- ``foo.example.com`` etc. Otherwise, a cookie will only
- be readable by the domain that set it.
- :param secure: If ``True``, the cookie will only be available
- via HTTPS.
- :param httponly: Disallow JavaScript access to the cookie.
- :param samesite: Limit the scope of the cookie to only be
- attached to requests that are "same-site".
- """
- self.headers.add(
- "Set-Cookie",
- dump_cookie(
- key,
- value=value,
- max_age=max_age,
- expires=expires,
- path=path,
- domain=domain,
- secure=secure,
- httponly=httponly,
- charset=self.charset,
- max_size=self.max_cookie_size,
- samesite=samesite,
- ),
- )
- def delete_cookie(
- self,
- key: str,
- path: str = "/",
- domain: t.Optional[str] = None,
- secure: bool = False,
- httponly: bool = False,
- samesite: t.Optional[str] = None,
- ) -> None:
- """Delete a cookie. Fails silently if key doesn't exist.
- :param key: the key (name) of the cookie to be deleted.
- :param path: if the cookie that should be deleted was limited to a
- path, the path has to be defined here.
- :param domain: if the cookie that should be deleted was limited to a
- domain, that domain has to be defined here.
- :param secure: If ``True``, the cookie will only be available
- via HTTPS.
- :param httponly: Disallow JavaScript access to the cookie.
- :param samesite: Limit the scope of the cookie to only be
- attached to requests that are "same-site".
- """
- self.set_cookie(
- key,
- expires=0,
- max_age=0,
- path=path,
- domain=domain,
- secure=secure,
- httponly=httponly,
- samesite=samesite,
- )
- @property
- def is_json(self) -> bool:
- """Check if the mimetype indicates JSON data, either
- :mimetype:`application/json` or :mimetype:`application/*+json`.
- """
- mt = self.mimetype
- return mt is not None and (
- mt == "application/json"
- or mt.startswith("application/")
- and mt.endswith("+json")
- )
- # Common Descriptors
- @property
- def mimetype(self) -> t.Optional[str]:
- """The mimetype (content type without charset etc.)"""
- ct = self.headers.get("content-type")
- if ct:
- return ct.split(";")[0].strip()
- else:
- return None
- @mimetype.setter
- def mimetype(self, value: str) -> None:
- self.headers["Content-Type"] = get_content_type(value, self.charset)
- @property
- def mimetype_params(self) -> t.Dict[str, str]:
- """The mimetype parameters as dict. For example if the
- content type is ``text/html; charset=utf-8`` the params would be
- ``{'charset': 'utf-8'}``.
- .. versionadded:: 0.5
- """
- def on_update(d: CallbackDict) -> None:
- self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
- d = parse_options_header(self.headers.get("content-type", ""))[1]
- return CallbackDict(d, on_update)
- location = header_property[str](
- "Location",
- doc="""The Location response-header field is used to redirect
- the recipient to a location other than the Request-URI for
- completion of the request or identification of a new
- resource.""",
- )
- age = header_property(
- "Age",
- None,
- parse_age,
- dump_age, # type: ignore
- doc="""The Age response-header field conveys the sender's
- estimate of the amount of time since the response (or its
- revalidation) was generated at the origin server.
- Age values are non-negative decimal integers, representing time
- in seconds.""",
- )
- content_type = header_property[str](
- "Content-Type",
- doc="""The Content-Type entity-header field indicates the media
- type of the entity-body sent to the recipient or, in the case of
- the HEAD method, the media type that would have been sent had
- the request been a GET.""",
- )
- content_length = header_property(
- "Content-Length",
- None,
- int,
- str,
- doc="""The Content-Length entity-header field indicates the size
- of the entity-body, in decimal number of OCTETs, sent to the
- recipient or, in the case of the HEAD method, the size of the
- entity-body that would have been sent had the request been a
- GET.""",
- )
- content_location = header_property[str](
- "Content-Location",
- doc="""The Content-Location entity-header field MAY be used to
- supply the resource location for the entity enclosed in the
- message when that entity is accessible from a location separate
- from the requested resource's URI.""",
- )
- content_encoding = header_property[str](
- "Content-Encoding",
- doc="""The Content-Encoding entity-header field is used as a
- modifier to the media-type. When present, its value indicates
- what additional content codings have been applied to the
- entity-body, and thus what decoding mechanisms must be applied
- in order to obtain the media-type referenced by the Content-Type
- header field.""",
- )
- content_md5 = header_property[str](
- "Content-MD5",
- doc="""The Content-MD5 entity-header field, as defined in
- RFC 1864, is an MD5 digest of the entity-body for the purpose of
- providing an end-to-end message integrity check (MIC) of the
- entity-body. (Note: a MIC is good for detecting accidental
- modification of the entity-body in transit, but is not proof
- against malicious attacks.)""",
- )
- date = header_property(
- "Date",
- None,
- parse_date,
- http_date,
- doc="""The Date general-header field represents the date and
- time at which the message was originated, having the same
- semantics as orig-date in RFC 822.
- .. versionchanged:: 2.0
- The datetime object is timezone-aware.
- """,
- )
- expires = header_property(
- "Expires",
- None,
- parse_date,
- http_date,
- doc="""The Expires entity-header field gives the date/time after
- which the response is considered stale. A stale cache entry may
- not normally be returned by a cache.
- .. versionchanged:: 2.0
- The datetime object is timezone-aware.
- """,
- )
- last_modified = header_property(
- "Last-Modified",
- None,
- parse_date,
- http_date,
- doc="""The Last-Modified entity-header field indicates the date
- and time at which the origin server believes the variant was
- last modified.
- .. versionchanged:: 2.0
- The datetime object is timezone-aware.
- """,
- )
- @property
- def retry_after(self) -> t.Optional[datetime]:
- """The Retry-After response-header field can be used with a
- 503 (Service Unavailable) response to indicate how long the
- service is expected to be unavailable to the requesting client.
- Time in seconds until expiration or date.
- .. versionchanged:: 2.0
- The datetime object is timezone-aware.
- """
- value = self.headers.get("retry-after")
- if value is None:
- return None
- try:
- seconds = int(value)
- except ValueError:
- return parse_date(value)
- return datetime.now(timezone.utc) + timedelta(seconds=seconds)
- @retry_after.setter
- def retry_after(self, value: t.Optional[t.Union[datetime, int, str]]) -> None:
- if value is None:
- if "retry-after" in self.headers:
- del self.headers["retry-after"]
- return
- elif isinstance(value, datetime):
- value = http_date(value)
- else:
- value = str(value)
- self.headers["Retry-After"] = value
- vary = _set_property(
- "Vary",
- doc="""The Vary field value indicates the set of request-header
- fields that fully determines, while the response is fresh,
- whether a cache is permitted to use the response to reply to a
- subsequent request without revalidation.""",
- )
- content_language = _set_property(
- "Content-Language",
- doc="""The Content-Language entity-header field describes the
- natural language(s) of the intended audience for the enclosed
- entity. Note that this might not be equivalent to all the
- languages used within the entity-body.""",
- )
- allow = _set_property(
- "Allow",
- doc="""The Allow entity-header field lists the set of methods
- supported by the resource identified by the Request-URI. The
- purpose of this field is strictly to inform the recipient of
- valid methods associated with the resource. An Allow header
- field MUST be present in a 405 (Method Not Allowed)
- response.""",
- )
- # ETag
- @property
- def cache_control(self) -> ResponseCacheControl:
- """The Cache-Control general-header field is used to specify
- directives that MUST be obeyed by all caching mechanisms along the
- request/response chain.
- """
- def on_update(cache_control: ResponseCacheControl) -> None:
- if not cache_control and "cache-control" in self.headers:
- del self.headers["cache-control"]
- elif cache_control:
- self.headers["Cache-Control"] = cache_control.to_header()
- return parse_cache_control_header(
- self.headers.get("cache-control"), on_update, ResponseCacheControl
- )
- def set_etag(self, etag: str, weak: bool = False) -> None:
- """Set the etag, and override the old one if there was one."""
- self.headers["ETag"] = quote_etag(etag, weak)
- def get_etag(self) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]:
- """Return a tuple in the form ``(etag, is_weak)``. If there is no
- ETag the return value is ``(None, None)``.
- """
- return unquote_etag(self.headers.get("ETag"))
- accept_ranges = header_property[str](
- "Accept-Ranges",
- doc="""The `Accept-Ranges` header. Even though the name would
- indicate that multiple values are supported, it must be one
- string token only.
- The values ``'bytes'`` and ``'none'`` are common.
- .. versionadded:: 0.7""",
- )
- @property
- def content_range(self) -> ContentRange:
- """The ``Content-Range`` header as a
- :class:`~werkzeug.datastructures.ContentRange` object. Available
- even if the header is not set.
- .. versionadded:: 0.7
- """
- def on_update(rng: ContentRange) -> None:
- if not rng:
- del self.headers["content-range"]
- else:
- self.headers["Content-Range"] = rng.to_header()
- rv = parse_content_range_header(self.headers.get("content-range"), on_update)
- # always provide a content range object to make the descriptor
- # more user friendly. It provides an unset() method that can be
- # used to remove the header quickly.
- if rv is None:
- rv = ContentRange(None, None, None, on_update=on_update)
- return rv
- @content_range.setter
- def content_range(self, value: t.Optional[t.Union[ContentRange, str]]) -> None:
- if not value:
- del self.headers["content-range"]
- elif isinstance(value, str):
- self.headers["Content-Range"] = value
- else:
- self.headers["Content-Range"] = value.to_header()
- # Authorization
- @property
- def www_authenticate(self) -> WWWAuthenticate:
- """The ``WWW-Authenticate`` header in a parsed form."""
- def on_update(www_auth: WWWAuthenticate) -> None:
- if not www_auth and "www-authenticate" in self.headers:
- del self.headers["www-authenticate"]
- elif www_auth:
- self.headers["WWW-Authenticate"] = www_auth.to_header()
- header = self.headers.get("www-authenticate")
- return parse_www_authenticate_header(header, on_update)
- # CSP
- @property
- def content_security_policy(self) -> ContentSecurityPolicy:
- """The ``Content-Security-Policy`` header as a
- :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
- even if the header is not set.
- The Content-Security-Policy header adds an additional layer of
- security to help detect and mitigate certain types of attacks.
- """
- def on_update(csp: ContentSecurityPolicy) -> None:
- if not csp:
- del self.headers["content-security-policy"]
- else:
- self.headers["Content-Security-Policy"] = csp.to_header()
- rv = parse_csp_header(self.headers.get("content-security-policy"), on_update)
- if rv is None:
- rv = ContentSecurityPolicy(None, on_update=on_update)
- return rv
- @content_security_policy.setter
- def content_security_policy(
- self, value: t.Optional[t.Union[ContentSecurityPolicy, str]]
- ) -> None:
- if not value:
- del self.headers["content-security-policy"]
- elif isinstance(value, str):
- self.headers["Content-Security-Policy"] = value
- else:
- self.headers["Content-Security-Policy"] = value.to_header()
- @property
- def content_security_policy_report_only(self) -> ContentSecurityPolicy:
- """The ``Content-Security-policy-report-only`` header as a
- :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
- even if the header is not set.
- The Content-Security-Policy-Report-Only header adds a csp policy
- that is not enforced but is reported thereby helping detect
- certain types of attacks.
- """
- def on_update(csp: ContentSecurityPolicy) -> None:
- if not csp:
- del self.headers["content-security-policy-report-only"]
- else:
- self.headers["Content-Security-policy-report-only"] = csp.to_header()
- rv = parse_csp_header(
- self.headers.get("content-security-policy-report-only"), on_update
- )
- if rv is None:
- rv = ContentSecurityPolicy(None, on_update=on_update)
- return rv
- @content_security_policy_report_only.setter
- def content_security_policy_report_only(
- self, value: t.Optional[t.Union[ContentSecurityPolicy, str]]
- ) -> None:
- if not value:
- del self.headers["content-security-policy-report-only"]
- elif isinstance(value, str):
- self.headers["Content-Security-policy-report-only"] = value
- else:
- self.headers["Content-Security-policy-report-only"] = value.to_header()
- # CORS
- @property
- def access_control_allow_credentials(self) -> bool:
- """Whether credentials can be shared by the browser to
- JavaScript code. As part of the preflight request it indicates
- whether credentials can be used on the cross origin request.
- """
- return "Access-Control-Allow-Credentials" in self.headers
- @access_control_allow_credentials.setter
- def access_control_allow_credentials(self, value: t.Optional[bool]) -> None:
- if value is True:
- self.headers["Access-Control-Allow-Credentials"] = "true"
- else:
- self.headers.pop("Access-Control-Allow-Credentials", None)
- access_control_allow_headers = header_property(
- "Access-Control-Allow-Headers",
- load_func=parse_set_header,
- dump_func=dump_header,
- doc="Which headers can be sent with the cross origin request.",
- )
- access_control_allow_methods = header_property(
- "Access-Control-Allow-Methods",
- load_func=parse_set_header,
- dump_func=dump_header,
- doc="Which methods can be used for the cross origin request.",
- )
- access_control_allow_origin = header_property[str](
- "Access-Control-Allow-Origin",
- doc="The origin or '*' for any origin that may make cross origin requests.",
- )
- access_control_expose_headers = header_property(
- "Access-Control-Expose-Headers",
- load_func=parse_set_header,
- dump_func=dump_header,
- doc="Which headers can be shared by the browser to JavaScript code.",
- )
- access_control_max_age = header_property(
- "Access-Control-Max-Age",
- load_func=int,
- dump_func=str,
- doc="The maximum age in seconds the access control settings can be cached for.",
- )
- cross_origin_opener_policy = header_property[COOP](
- "Cross-Origin-Opener-Policy",
- load_func=lambda value: COOP(value),
- dump_func=lambda value: value.value,
- default=COOP.UNSAFE_NONE,
- doc="""Allows control over sharing of browsing context group with cross-origin
- documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""",
- )
- cross_origin_embedder_policy = header_property[COEP](
- "Cross-Origin-Embedder-Policy",
- load_func=lambda value: COEP(value),
- dump_func=lambda value: value.value,
- default=COEP.UNSAFE_NONE,
- doc="""Prevents a document from loading any cross-origin resources that do not
- explicitly grant the document permission. Values must be a member of the
- :class:`werkzeug.http.COEP` enum.""",
- )
|