123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- import contextvars
- import sys
- import typing as t
- from functools import update_wrapper
- from types import TracebackType
- from werkzeug.exceptions import HTTPException
- from . import typing as ft
- from .globals import _cv_app
- from .globals import _cv_request
- from .signals import appcontext_popped
- from .signals import appcontext_pushed
- if t.TYPE_CHECKING: # pragma: no cover
- from .app import Flask
- from .sessions import SessionMixin
- from .wrappers import Request
- # a singleton sentinel value for parameter defaults
- _sentinel = object()
- class _AppCtxGlobals:
- """A plain object. Used as a namespace for storing data during an
- application context.
- Creating an app context automatically creates this object, which is
- made available as the :data:`g` proxy.
- .. describe:: 'key' in g
- Check whether an attribute is present.
- .. versionadded:: 0.10
- .. describe:: iter(g)
- Return an iterator over the attribute names.
- .. versionadded:: 0.10
- """
- # Define attr methods to let mypy know this is a namespace object
- # that has arbitrary attributes.
- def __getattr__(self, name: str) -> t.Any:
- try:
- return self.__dict__[name]
- except KeyError:
- raise AttributeError(name) from None
- def __setattr__(self, name: str, value: t.Any) -> None:
- self.__dict__[name] = value
- def __delattr__(self, name: str) -> None:
- try:
- del self.__dict__[name]
- except KeyError:
- raise AttributeError(name) from None
- def get(self, name: str, default: t.Optional[t.Any] = None) -> t.Any:
- """Get an attribute by name, or a default value. Like
- :meth:`dict.get`.
- :param name: Name of attribute to get.
- :param default: Value to return if the attribute is not present.
- .. versionadded:: 0.10
- """
- return self.__dict__.get(name, default)
- def pop(self, name: str, default: t.Any = _sentinel) -> t.Any:
- """Get and remove an attribute by name. Like :meth:`dict.pop`.
- :param name: Name of attribute to pop.
- :param default: Value to return if the attribute is not present,
- instead of raising a ``KeyError``.
- .. versionadded:: 0.11
- """
- if default is _sentinel:
- return self.__dict__.pop(name)
- else:
- return self.__dict__.pop(name, default)
- def setdefault(self, name: str, default: t.Any = None) -> t.Any:
- """Get the value of an attribute if it is present, otherwise
- set and return a default value. Like :meth:`dict.setdefault`.
- :param name: Name of attribute to get.
- :param default: Value to set and return if the attribute is not
- present.
- .. versionadded:: 0.11
- """
- return self.__dict__.setdefault(name, default)
- def __contains__(self, item: str) -> bool:
- return item in self.__dict__
- def __iter__(self) -> t.Iterator[str]:
- return iter(self.__dict__)
- def __repr__(self) -> str:
- ctx = _cv_app.get(None)
- if ctx is not None:
- return f"<flask.g of '{ctx.app.name}'>"
- return object.__repr__(self)
- def after_this_request(f: ft.AfterRequestCallable) -> ft.AfterRequestCallable:
- """Executes a function after this request. This is useful to modify
- response objects. The function is passed the response object and has
- to return the same or a new one.
- Example::
- @app.route('/')
- def index():
- @after_this_request
- def add_header(response):
- response.headers['X-Foo'] = 'Parachute'
- return response
- return 'Hello World!'
- This is more useful if a function other than the view function wants to
- modify a response. For instance think of a decorator that wants to add
- some headers without converting the return value into a response object.
- .. versionadded:: 0.9
- """
- ctx = _cv_request.get(None)
- if ctx is None:
- raise RuntimeError(
- "'after_this_request' can only be used when a request"
- " context is active, such as in a view function."
- )
- ctx._after_request_functions.append(f)
- return f
- def copy_current_request_context(f: t.Callable) -> t.Callable:
- """A helper function that decorates a function to retain the current
- request context. This is useful when working with greenlets. The moment
- the function is decorated a copy of the request context is created and
- then pushed when the function is called. The current session is also
- included in the copied request context.
- Example::
- import gevent
- from flask import copy_current_request_context
- @app.route('/')
- def index():
- @copy_current_request_context
- def do_some_work():
- # do some work here, it can access flask.request or
- # flask.session like you would otherwise in the view function.
- ...
- gevent.spawn(do_some_work)
- return 'Regular response'
- .. versionadded:: 0.10
- """
- ctx = _cv_request.get(None)
- if ctx is None:
- raise RuntimeError(
- "'copy_current_request_context' can only be used when a"
- " request context is active, such as in a view function."
- )
- ctx = ctx.copy()
- def wrapper(*args, **kwargs):
- with ctx:
- return ctx.app.ensure_sync(f)(*args, **kwargs)
- return update_wrapper(wrapper, f)
- def has_request_context() -> bool:
- """If you have code that wants to test if a request context is there or
- not this function can be used. For instance, you may want to take advantage
- of request information if the request object is available, but fail
- silently if it is unavailable.
- ::
- class User(db.Model):
- def __init__(self, username, remote_addr=None):
- self.username = username
- if remote_addr is None and has_request_context():
- remote_addr = request.remote_addr
- self.remote_addr = remote_addr
- Alternatively you can also just test any of the context bound objects
- (such as :class:`request` or :class:`g`) for truthness::
- class User(db.Model):
- def __init__(self, username, remote_addr=None):
- self.username = username
- if remote_addr is None and request:
- remote_addr = request.remote_addr
- self.remote_addr = remote_addr
- .. versionadded:: 0.7
- """
- return _cv_request.get(None) is not None
- def has_app_context() -> bool:
- """Works like :func:`has_request_context` but for the application
- context. You can also just do a boolean check on the
- :data:`current_app` object instead.
- .. versionadded:: 0.9
- """
- return _cv_app.get(None) is not None
- class AppContext:
- """The app context contains application-specific information. An app
- context is created and pushed at the beginning of each request if
- one is not already active. An app context is also pushed when
- running CLI commands.
- """
- def __init__(self, app: "Flask") -> None:
- self.app = app
- self.url_adapter = app.create_url_adapter(None)
- self.g: _AppCtxGlobals = app.app_ctx_globals_class()
- self._cv_tokens: t.List[contextvars.Token] = []
- def push(self) -> None:
- """Binds the app context to the current context."""
- self._cv_tokens.append(_cv_app.set(self))
- appcontext_pushed.send(self.app)
- def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore
- """Pops the app context."""
- try:
- if len(self._cv_tokens) == 1:
- if exc is _sentinel:
- exc = sys.exc_info()[1]
- self.app.do_teardown_appcontext(exc)
- finally:
- ctx = _cv_app.get()
- _cv_app.reset(self._cv_tokens.pop())
- if ctx is not self:
- raise AssertionError(
- f"Popped wrong app context. ({ctx!r} instead of {self!r})"
- )
- appcontext_popped.send(self.app)
- def __enter__(self) -> "AppContext":
- self.push()
- return self
- def __exit__(
- self,
- exc_type: t.Optional[type],
- exc_value: t.Optional[BaseException],
- tb: t.Optional[TracebackType],
- ) -> None:
- self.pop(exc_value)
- class RequestContext:
- """The request context contains per-request information. The Flask
- app creates and pushes it at the beginning of the request, then pops
- it at the end of the request. It will create the URL adapter and
- request object for the WSGI environment provided.
- Do not attempt to use this class directly, instead use
- :meth:`~flask.Flask.test_request_context` and
- :meth:`~flask.Flask.request_context` to create this object.
- When the request context is popped, it will evaluate all the
- functions registered on the application for teardown execution
- (:meth:`~flask.Flask.teardown_request`).
- The request context is automatically popped at the end of the
- request. When using the interactive debugger, the context will be
- restored so ``request`` is still accessible. Similarly, the test
- client can preserve the context after the request ends. However,
- teardown functions may already have closed some resources such as
- database connections.
- """
- def __init__(
- self,
- app: "Flask",
- environ: dict,
- request: t.Optional["Request"] = None,
- session: t.Optional["SessionMixin"] = None,
- ) -> None:
- self.app = app
- if request is None:
- request = app.request_class(environ)
- request.json_module = app.json # type: ignore[misc]
- self.request: Request = request
- self.url_adapter = None
- try:
- self.url_adapter = app.create_url_adapter(self.request)
- except HTTPException as e:
- self.request.routing_exception = e
- self.flashes: t.Optional[t.List[t.Tuple[str, str]]] = None
- self.session: t.Optional["SessionMixin"] = session
- # Functions that should be executed after the request on the response
- # object. These will be called before the regular "after_request"
- # functions.
- self._after_request_functions: t.List[ft.AfterRequestCallable] = []
- self._cv_tokens: t.List[t.Tuple[contextvars.Token, t.Optional[AppContext]]] = []
- def copy(self) -> "RequestContext":
- """Creates a copy of this request context with the same request object.
- This can be used to move a request context to a different greenlet.
- Because the actual request object is the same this cannot be used to
- move a request context to a different thread unless access to the
- request object is locked.
- .. versionadded:: 0.10
- .. versionchanged:: 1.1
- The current session object is used instead of reloading the original
- data. This prevents `flask.session` pointing to an out-of-date object.
- """
- return self.__class__(
- self.app,
- environ=self.request.environ,
- request=self.request,
- session=self.session,
- )
- def match_request(self) -> None:
- """Can be overridden by a subclass to hook into the matching
- of the request.
- """
- try:
- result = self.url_adapter.match(return_rule=True) # type: ignore
- self.request.url_rule, self.request.view_args = result # type: ignore
- except HTTPException as e:
- self.request.routing_exception = e
- def push(self) -> None:
- # Before we push the request context we have to ensure that there
- # is an application context.
- app_ctx = _cv_app.get(None)
- if app_ctx is None or app_ctx.app is not self.app:
- app_ctx = self.app.app_context()
- app_ctx.push()
- else:
- app_ctx = None
- self._cv_tokens.append((_cv_request.set(self), app_ctx))
- # Open the session at the moment that the request context is available.
- # This allows a custom open_session method to use the request context.
- # Only open a new session if this is the first time the request was
- # pushed, otherwise stream_with_context loses the session.
- if self.session is None:
- session_interface = self.app.session_interface
- self.session = session_interface.open_session(self.app, self.request)
- if self.session is None:
- self.session = session_interface.make_null_session(self.app)
- # Match the request URL after loading the session, so that the
- # session is available in custom URL converters.
- if self.url_adapter is not None:
- self.match_request()
- def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore
- """Pops the request context and unbinds it by doing that. This will
- also trigger the execution of functions registered by the
- :meth:`~flask.Flask.teardown_request` decorator.
- .. versionchanged:: 0.9
- Added the `exc` argument.
- """
- clear_request = len(self._cv_tokens) == 1
- try:
- if clear_request:
- if exc is _sentinel:
- exc = sys.exc_info()[1]
- self.app.do_teardown_request(exc)
- request_close = getattr(self.request, "close", None)
- if request_close is not None:
- request_close()
- finally:
- ctx = _cv_request.get()
- token, app_ctx = self._cv_tokens.pop()
- _cv_request.reset(token)
- # get rid of circular dependencies at the end of the request
- # so that we don't require the GC to be active.
- if clear_request:
- ctx.request.environ["werkzeug.request"] = None
- if app_ctx is not None:
- app_ctx.pop(exc)
- if ctx is not self:
- raise AssertionError(
- f"Popped wrong request context. ({ctx!r} instead of {self!r})"
- )
- def __enter__(self) -> "RequestContext":
- self.push()
- return self
- def __exit__(
- self,
- exc_type: t.Optional[type],
- exc_value: t.Optional[BaseException],
- tb: t.Optional[TracebackType],
- ) -> None:
- self.pop(exc_value)
- def __repr__(self) -> str:
- return (
- f"<{type(self).__name__} {self.request.url!r}"
- f" [{self.request.method}] of {self.app.name}>"
- )
|