123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- import hashlib
- import typing as t
- import warnings
- from collections.abc import MutableMapping
- from datetime import datetime
- from datetime import timezone
- from itsdangerous import BadSignature
- from itsdangerous import URLSafeTimedSerializer
- from werkzeug.datastructures import CallbackDict
- from .helpers import is_ip
- from .json.tag import TaggedJSONSerializer
- if t.TYPE_CHECKING: # pragma: no cover
- import typing_extensions as te
- from .app import Flask
- from .wrappers import Request, Response
- class SessionMixin(MutableMapping):
- """Expands a basic dictionary with session attributes."""
- @property
- def permanent(self) -> bool:
- """This reflects the ``'_permanent'`` key in the dict."""
- return self.get("_permanent", False)
- @permanent.setter
- def permanent(self, value: bool) -> None:
- self["_permanent"] = bool(value)
- #: Some implementations can detect whether a session is newly
- #: created, but that is not guaranteed. Use with caution. The mixin
- # default is hard-coded ``False``.
- new = False
- #: Some implementations can detect changes to the session and set
- #: this when that happens. The mixin default is hard coded to
- #: ``True``.
- modified = True
- #: Some implementations can detect when session data is read or
- #: written and set this when that happens. The mixin default is hard
- #: coded to ``True``.
- accessed = True
- class SecureCookieSession(CallbackDict, SessionMixin):
- """Base class for sessions based on signed cookies.
- This session backend will set the :attr:`modified` and
- :attr:`accessed` attributes. It cannot reliably track whether a
- session is new (vs. empty), so :attr:`new` remains hard coded to
- ``False``.
- """
- #: When data is changed, this is set to ``True``. Only the session
- #: dictionary itself is tracked; if the session contains mutable
- #: data (for example a nested dict) then this must be set to
- #: ``True`` manually when modifying that data. The session cookie
- #: will only be written to the response if this is ``True``.
- modified = False
- #: When data is read or written, this is set to ``True``. Used by
- # :class:`.SecureCookieSessionInterface` to add a ``Vary: Cookie``
- #: header, which allows caching proxies to cache different pages for
- #: different users.
- accessed = False
- def __init__(self, initial: t.Any = None) -> None:
- def on_update(self) -> None:
- self.modified = True
- self.accessed = True
- super().__init__(initial, on_update)
- def __getitem__(self, key: str) -> t.Any:
- self.accessed = True
- return super().__getitem__(key)
- def get(self, key: str, default: t.Any = None) -> t.Any:
- self.accessed = True
- return super().get(key, default)
- def setdefault(self, key: str, default: t.Any = None) -> t.Any:
- self.accessed = True
- return super().setdefault(key, default)
- class NullSession(SecureCookieSession):
- """Class used to generate nicer error messages if sessions are not
- available. Will still allow read-only access to the empty session
- but fail on setting.
- """
- def _fail(self, *args: t.Any, **kwargs: t.Any) -> "te.NoReturn":
- raise RuntimeError(
- "The session is unavailable because no secret "
- "key was set. Set the secret_key on the "
- "application to something unique and secret."
- )
- __setitem__ = __delitem__ = clear = pop = popitem = update = setdefault = _fail # type: ignore # noqa: B950
- del _fail
- class SessionInterface:
- """The basic interface you have to implement in order to replace the
- default session interface which uses werkzeug's securecookie
- implementation. The only methods you have to implement are
- :meth:`open_session` and :meth:`save_session`, the others have
- useful defaults which you don't need to change.
- The session object returned by the :meth:`open_session` method has to
- provide a dictionary like interface plus the properties and methods
- from the :class:`SessionMixin`. We recommend just subclassing a dict
- and adding that mixin::
- class Session(dict, SessionMixin):
- pass
- If :meth:`open_session` returns ``None`` Flask will call into
- :meth:`make_null_session` to create a session that acts as replacement
- if the session support cannot work because some requirement is not
- fulfilled. The default :class:`NullSession` class that is created
- will complain that the secret key was not set.
- To replace the session interface on an application all you have to do
- is to assign :attr:`flask.Flask.session_interface`::
- app = Flask(__name__)
- app.session_interface = MySessionInterface()
- Multiple requests with the same session may be sent and handled
- concurrently. When implementing a new session interface, consider
- whether reads or writes to the backing store must be synchronized.
- There is no guarantee on the order in which the session for each
- request is opened or saved, it will occur in the order that requests
- begin and end processing.
- .. versionadded:: 0.8
- """
- #: :meth:`make_null_session` will look here for the class that should
- #: be created when a null session is requested. Likewise the
- #: :meth:`is_null_session` method will perform a typecheck against
- #: this type.
- null_session_class = NullSession
- #: A flag that indicates if the session interface is pickle based.
- #: This can be used by Flask extensions to make a decision in regards
- #: to how to deal with the session object.
- #:
- #: .. versionadded:: 0.10
- pickle_based = False
- def make_null_session(self, app: "Flask") -> NullSession:
- """Creates a null session which acts as a replacement object if the
- real session support could not be loaded due to a configuration
- error. This mainly aids the user experience because the job of the
- null session is to still support lookup without complaining but
- modifications are answered with a helpful error message of what
- failed.
- This creates an instance of :attr:`null_session_class` by default.
- """
- return self.null_session_class()
- def is_null_session(self, obj: object) -> bool:
- """Checks if a given object is a null session. Null sessions are
- not asked to be saved.
- This checks if the object is an instance of :attr:`null_session_class`
- by default.
- """
- return isinstance(obj, self.null_session_class)
- def get_cookie_name(self, app: "Flask") -> str:
- """The name of the session cookie. Uses``app.config["SESSION_COOKIE_NAME"]``."""
- return app.config["SESSION_COOKIE_NAME"]
- def get_cookie_domain(self, app: "Flask") -> t.Optional[str]:
- """Returns the domain that should be set for the session cookie.
- Uses ``SESSION_COOKIE_DOMAIN`` if it is configured, otherwise
- falls back to detecting the domain based on ``SERVER_NAME``.
- Once detected (or if not set at all), ``SESSION_COOKIE_DOMAIN`` is
- updated to avoid re-running the logic.
- """
- rv = app.config["SESSION_COOKIE_DOMAIN"]
- # set explicitly, or cached from SERVER_NAME detection
- # if False, return None
- if rv is not None:
- return rv if rv else None
- rv = app.config["SERVER_NAME"]
- # server name not set, cache False to return none next time
- if not rv:
- app.config["SESSION_COOKIE_DOMAIN"] = False
- return None
- # chop off the port which is usually not supported by browsers
- # remove any leading '.' since we'll add that later
- rv = rv.rsplit(":", 1)[0].lstrip(".")
- if "." not in rv:
- # Chrome doesn't allow names without a '.'. This should only
- # come up with localhost. Hack around this by not setting
- # the name, and show a warning.
- warnings.warn(
- f"{rv!r} is not a valid cookie domain, it must contain"
- " a '.'. Add an entry to your hosts file, for example"
- f" '{rv}.localdomain', and use that instead."
- )
- app.config["SESSION_COOKIE_DOMAIN"] = False
- return None
- ip = is_ip(rv)
- if ip:
- warnings.warn(
- "The session cookie domain is an IP address. This may not work"
- " as intended in some browsers. Add an entry to your hosts"
- ' file, for example "localhost.localdomain", and use that'
- " instead."
- )
- # if this is not an ip and app is mounted at the root, allow subdomain
- # matching by adding a '.' prefix
- if self.get_cookie_path(app) == "/" and not ip:
- rv = f".{rv}"
- app.config["SESSION_COOKIE_DOMAIN"] = rv
- return rv
- def get_cookie_path(self, app: "Flask") -> str:
- """Returns the path for which the cookie should be valid. The
- default implementation uses the value from the ``SESSION_COOKIE_PATH``
- config var if it's set, and falls back to ``APPLICATION_ROOT`` or
- uses ``/`` if it's ``None``.
- """
- return app.config["SESSION_COOKIE_PATH"] or app.config["APPLICATION_ROOT"]
- def get_cookie_httponly(self, app: "Flask") -> bool:
- """Returns True if the session cookie should be httponly. This
- currently just returns the value of the ``SESSION_COOKIE_HTTPONLY``
- config var.
- """
- return app.config["SESSION_COOKIE_HTTPONLY"]
- def get_cookie_secure(self, app: "Flask") -> bool:
- """Returns True if the cookie should be secure. This currently
- just returns the value of the ``SESSION_COOKIE_SECURE`` setting.
- """
- return app.config["SESSION_COOKIE_SECURE"]
- def get_cookie_samesite(self, app: "Flask") -> str:
- """Return ``'Strict'`` or ``'Lax'`` if the cookie should use the
- ``SameSite`` attribute. This currently just returns the value of
- the :data:`SESSION_COOKIE_SAMESITE` setting.
- """
- return app.config["SESSION_COOKIE_SAMESITE"]
- def get_expiration_time(
- self, app: "Flask", session: SessionMixin
- ) -> t.Optional[datetime]:
- """A helper method that returns an expiration date for the session
- or ``None`` if the session is linked to the browser session. The
- default implementation returns now + the permanent session
- lifetime configured on the application.
- """
- if session.permanent:
- return datetime.now(timezone.utc) + app.permanent_session_lifetime
- return None
- def should_set_cookie(self, app: "Flask", session: SessionMixin) -> bool:
- """Used by session backends to determine if a ``Set-Cookie`` header
- should be set for this session cookie for this response. If the session
- has been modified, the cookie is set. If the session is permanent and
- the ``SESSION_REFRESH_EACH_REQUEST`` config is true, the cookie is
- always set.
- This check is usually skipped if the session was deleted.
- .. versionadded:: 0.11
- """
- return session.modified or (
- session.permanent and app.config["SESSION_REFRESH_EACH_REQUEST"]
- )
- def open_session(
- self, app: "Flask", request: "Request"
- ) -> t.Optional[SessionMixin]:
- """This is called at the beginning of each request, after
- pushing the request context, before matching the URL.
- This must return an object which implements a dictionary-like
- interface as well as the :class:`SessionMixin` interface.
- This will return ``None`` to indicate that loading failed in
- some way that is not immediately an error. The request
- context will fall back to using :meth:`make_null_session`
- in this case.
- """
- raise NotImplementedError()
- def save_session(
- self, app: "Flask", session: SessionMixin, response: "Response"
- ) -> None:
- """This is called at the end of each request, after generating
- a response, before removing the request context. It is skipped
- if :meth:`is_null_session` returns ``True``.
- """
- raise NotImplementedError()
- session_json_serializer = TaggedJSONSerializer()
- class SecureCookieSessionInterface(SessionInterface):
- """The default session interface that stores sessions in signed cookies
- through the :mod:`itsdangerous` module.
- """
- #: the salt that should be applied on top of the secret key for the
- #: signing of cookie based sessions.
- salt = "cookie-session"
- #: the hash function to use for the signature. The default is sha1
- digest_method = staticmethod(hashlib.sha1)
- #: the name of the itsdangerous supported key derivation. The default
- #: is hmac.
- key_derivation = "hmac"
- #: A python serializer for the payload. The default is a compact
- #: JSON derived serializer with support for some extra Python types
- #: such as datetime objects or tuples.
- serializer = session_json_serializer
- session_class = SecureCookieSession
- def get_signing_serializer(
- self, app: "Flask"
- ) -> t.Optional[URLSafeTimedSerializer]:
- if not app.secret_key:
- return None
- signer_kwargs = dict(
- key_derivation=self.key_derivation, digest_method=self.digest_method
- )
- return URLSafeTimedSerializer(
- app.secret_key,
- salt=self.salt,
- serializer=self.serializer,
- signer_kwargs=signer_kwargs,
- )
- def open_session(
- self, app: "Flask", request: "Request"
- ) -> t.Optional[SecureCookieSession]:
- s = self.get_signing_serializer(app)
- if s is None:
- return None
- val = request.cookies.get(self.get_cookie_name(app))
- if not val:
- return self.session_class()
- max_age = int(app.permanent_session_lifetime.total_seconds())
- try:
- data = s.loads(val, max_age=max_age)
- return self.session_class(data)
- except BadSignature:
- return self.session_class()
- def save_session(
- self, app: "Flask", session: SessionMixin, response: "Response"
- ) -> None:
- name = self.get_cookie_name(app)
- domain = self.get_cookie_domain(app)
- path = self.get_cookie_path(app)
- secure = self.get_cookie_secure(app)
- samesite = self.get_cookie_samesite(app)
- httponly = self.get_cookie_httponly(app)
- # If the session is modified to be empty, remove the cookie.
- # If the session is empty, return without setting the cookie.
- if not session:
- if session.modified:
- response.delete_cookie(
- name,
- domain=domain,
- path=path,
- secure=secure,
- samesite=samesite,
- httponly=httponly,
- )
- return
- # Add a "Vary: Cookie" header if the session was accessed at all.
- if session.accessed:
- response.vary.add("Cookie")
- if not self.should_set_cookie(app, session):
- return
- expires = self.get_expiration_time(app, session)
- val = self.get_signing_serializer(app).dumps(dict(session)) # type: ignore
- response.set_cookie(
- name,
- val, # type: ignore
- expires=expires,
- httponly=httponly,
- domain=domain,
- path=path,
- secure=secure,
- samesite=samesite,
- )
|