123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- """
- WSGI Protocol Linter
- ====================
- This module provides a middleware that performs sanity checks on the
- behavior of the WSGI server and application. It checks that the
- :pep:`3333` WSGI spec is properly implemented. It also warns on some
- common HTTP errors such as non-empty responses for 304 status codes.
- .. autoclass:: LintMiddleware
- :copyright: 2007 Pallets
- :license: BSD-3-Clause
- """
- import typing as t
- from types import TracebackType
- from urllib.parse import urlparse
- from warnings import warn
- from ..datastructures import Headers
- from ..http import is_entity_header
- from ..wsgi import FileWrapper
- if t.TYPE_CHECKING:
- from _typeshed.wsgi import StartResponse
- from _typeshed.wsgi import WSGIApplication
- from _typeshed.wsgi import WSGIEnvironment
- class WSGIWarning(Warning):
- """Warning class for WSGI warnings."""
- class HTTPWarning(Warning):
- """Warning class for HTTP warnings."""
- def check_type(context: str, obj: object, need: t.Type = str) -> None:
- if type(obj) is not need:
- warn(
- f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.",
- WSGIWarning,
- stacklevel=3,
- )
- class InputStream:
- def __init__(self, stream: t.IO[bytes]) -> None:
- self._stream = stream
- def read(self, *args: t.Any) -> bytes:
- if len(args) == 0:
- warn(
- "WSGI does not guarantee an EOF marker on the input stream, thus making"
- " calls to 'wsgi.input.read()' unsafe. Conforming servers may never"
- " return from this call.",
- WSGIWarning,
- stacklevel=2,
- )
- elif len(args) != 1:
- warn(
- "Too many parameters passed to 'wsgi.input.read()'.",
- WSGIWarning,
- stacklevel=2,
- )
- return self._stream.read(*args)
- def readline(self, *args: t.Any) -> bytes:
- if len(args) == 0:
- warn(
- "Calls to 'wsgi.input.readline()' without arguments are unsafe. Use"
- " 'wsgi.input.read()' instead.",
- WSGIWarning,
- stacklevel=2,
- )
- elif len(args) == 1:
- warn(
- "'wsgi.input.readline()' was called with a size hint. WSGI does not"
- " support this, although it's available on all major servers.",
- WSGIWarning,
- stacklevel=2,
- )
- else:
- raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.")
- return self._stream.readline(*args)
- def __iter__(self) -> t.Iterator[bytes]:
- try:
- return iter(self._stream)
- except TypeError:
- warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2)
- return iter(())
- def close(self) -> None:
- warn("The application closed the input stream!", WSGIWarning, stacklevel=2)
- self._stream.close()
- class ErrorStream:
- def __init__(self, stream: t.IO[str]) -> None:
- self._stream = stream
- def write(self, s: str) -> None:
- check_type("wsgi.error.write()", s, str)
- self._stream.write(s)
- def flush(self) -> None:
- self._stream.flush()
- def writelines(self, seq: t.Iterable[str]) -> None:
- for line in seq:
- self.write(line)
- def close(self) -> None:
- warn("The application closed the error stream!", WSGIWarning, stacklevel=2)
- self._stream.close()
- class GuardedWrite:
- def __init__(self, write: t.Callable[[bytes], object], chunks: t.List[int]) -> None:
- self._write = write
- self._chunks = chunks
- def __call__(self, s: bytes) -> None:
- check_type("write()", s, bytes)
- self._write(s)
- self._chunks.append(len(s))
- class GuardedIterator:
- def __init__(
- self,
- iterator: t.Iterable[bytes],
- headers_set: t.Tuple[int, Headers],
- chunks: t.List[int],
- ) -> None:
- self._iterator = iterator
- self._next = iter(iterator).__next__
- self.closed = False
- self.headers_set = headers_set
- self.chunks = chunks
- def __iter__(self) -> "GuardedIterator":
- return self
- def __next__(self) -> bytes:
- if self.closed:
- warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2)
- rv = self._next()
- if not self.headers_set:
- warn(
- "The application returned before it started the response.",
- WSGIWarning,
- stacklevel=2,
- )
- check_type("application iterator items", rv, bytes)
- self.chunks.append(len(rv))
- return rv
- def close(self) -> None:
- self.closed = True
- if hasattr(self._iterator, "close"):
- self._iterator.close() # type: ignore
- if self.headers_set:
- status_code, headers = self.headers_set
- bytes_sent = sum(self.chunks)
- content_length = headers.get("content-length", type=int)
- if status_code == 304:
- for key, _value in headers:
- key = key.lower()
- if key not in ("expires", "content-location") and is_entity_header(
- key
- ):
- warn(
- f"Entity header {key!r} found in 304 response.", HTTPWarning
- )
- if bytes_sent:
- warn("304 responses must not have a body.", HTTPWarning)
- elif 100 <= status_code < 200 or status_code == 204:
- if content_length != 0:
- warn(
- f"{status_code} responses must have an empty content length.",
- HTTPWarning,
- )
- if bytes_sent:
- warn(f"{status_code} responses must not have a body.", HTTPWarning)
- elif content_length is not None and content_length != bytes_sent:
- warn(
- "Content-Length and the number of bytes sent to the"
- " client do not match.",
- WSGIWarning,
- )
- def __del__(self) -> None:
- if not self.closed:
- try:
- warn(
- "Iterator was garbage collected before it was closed.", WSGIWarning
- )
- except Exception:
- pass
- class LintMiddleware:
- """Warns about common errors in the WSGI and HTTP behavior of the
- server and wrapped application. Some of the issues it checks are:
- - invalid status codes
- - non-bytes sent to the WSGI server
- - strings returned from the WSGI application
- - non-empty conditional responses
- - unquoted etags
- - relative URLs in the Location header
- - unsafe calls to wsgi.input
- - unclosed iterators
- Error information is emitted using the :mod:`warnings` module.
- :param app: The WSGI application to wrap.
- .. code-block:: python
- from werkzeug.middleware.lint import LintMiddleware
- app = LintMiddleware(app)
- """
- def __init__(self, app: "WSGIApplication") -> None:
- self.app = app
- def check_environ(self, environ: "WSGIEnvironment") -> None:
- if type(environ) is not dict:
- warn(
- "WSGI environment is not a standard Python dict.",
- WSGIWarning,
- stacklevel=4,
- )
- for key in (
- "REQUEST_METHOD",
- "SERVER_NAME",
- "SERVER_PORT",
- "wsgi.version",
- "wsgi.input",
- "wsgi.errors",
- "wsgi.multithread",
- "wsgi.multiprocess",
- "wsgi.run_once",
- ):
- if key not in environ:
- warn(
- f"Required environment key {key!r} not found",
- WSGIWarning,
- stacklevel=3,
- )
- if environ["wsgi.version"] != (1, 0):
- warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3)
- script_name = environ.get("SCRIPT_NAME", "")
- path_info = environ.get("PATH_INFO", "")
- if script_name and script_name[0] != "/":
- warn(
- f"'SCRIPT_NAME' does not start with a slash: {script_name!r}",
- WSGIWarning,
- stacklevel=3,
- )
- if path_info and path_info[0] != "/":
- warn(
- f"'PATH_INFO' does not start with a slash: {path_info!r}",
- WSGIWarning,
- stacklevel=3,
- )
- def check_start_response(
- self,
- status: str,
- headers: t.List[t.Tuple[str, str]],
- exc_info: t.Optional[
- t.Tuple[t.Type[BaseException], BaseException, TracebackType]
- ],
- ) -> t.Tuple[int, Headers]:
- check_type("status", status, str)
- status_code_str = status.split(None, 1)[0]
- if len(status_code_str) != 3 or not status_code_str.isdecimal():
- warn("Status code must be three digits.", WSGIWarning, stacklevel=3)
- if len(status) < 4 or status[3] != " ":
- warn(
- f"Invalid value for status {status!r}. Valid status strings are three"
- " digits, a space and a status explanation.",
- WSGIWarning,
- stacklevel=3,
- )
- status_code = int(status_code_str)
- if status_code < 100:
- warn("Status code < 100 detected.", WSGIWarning, stacklevel=3)
- if type(headers) is not list:
- warn("Header list is not a list.", WSGIWarning, stacklevel=3)
- for item in headers:
- if type(item) is not tuple or len(item) != 2:
- warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3)
- name, value = item
- if type(name) is not str or type(value) is not str:
- warn(
- "Header keys and values must be strings.", WSGIWarning, stacklevel=3
- )
- if name.lower() == "status":
- warn(
- "The status header is not supported due to"
- " conflicts with the CGI spec.",
- WSGIWarning,
- stacklevel=3,
- )
- if exc_info is not None and not isinstance(exc_info, tuple):
- warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3)
- headers = Headers(headers)
- self.check_headers(headers)
- return status_code, headers
- def check_headers(self, headers: Headers) -> None:
- etag = headers.get("etag")
- if etag is not None:
- if etag.startswith(("W/", "w/")):
- if etag.startswith("w/"):
- warn(
- "Weak etag indicator should be upper case.",
- HTTPWarning,
- stacklevel=4,
- )
- etag = etag[2:]
- if not (etag[:1] == etag[-1:] == '"'):
- warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4)
- location = headers.get("location")
- if location is not None:
- if not urlparse(location).netloc:
- warn(
- "Absolute URLs required for location header.",
- HTTPWarning,
- stacklevel=4,
- )
- def check_iterator(self, app_iter: t.Iterable[bytes]) -> None:
- if isinstance(app_iter, bytes):
- warn(
- "The application returned a bytestring. The response will send one"
- " character at a time to the client, which will kill performance."
- " Return a list or iterable instead.",
- WSGIWarning,
- stacklevel=3,
- )
- def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]:
- if len(args) != 2:
- warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2)
- if kwargs:
- warn(
- "A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2
- )
- environ: "WSGIEnvironment" = args[0]
- start_response: "StartResponse" = args[1]
- self.check_environ(environ)
- environ["wsgi.input"] = InputStream(environ["wsgi.input"])
- environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"])
- # Hook our own file wrapper in so that applications will always
- # iterate to the end and we can check the content length.
- environ["wsgi.file_wrapper"] = FileWrapper
- headers_set: t.List[t.Any] = []
- chunks: t.List[int] = []
- def checking_start_response(
- *args: t.Any, **kwargs: t.Any
- ) -> t.Callable[[bytes], None]:
- if len(args) not in {2, 3}:
- warn(
- f"Invalid number of arguments: {len(args)}, expected 2 or 3.",
- WSGIWarning,
- stacklevel=2,
- )
- if kwargs:
- warn("'start_response' does not take keyword arguments.", WSGIWarning)
- status: str = args[0]
- headers: t.List[t.Tuple[str, str]] = args[1]
- exc_info: t.Optional[
- t.Tuple[t.Type[BaseException], BaseException, TracebackType]
- ] = (args[2] if len(args) == 3 else None)
- headers_set[:] = self.check_start_response(status, headers, exc_info)
- return GuardedWrite(start_response(status, headers, exc_info), chunks)
- app_iter = self.app(environ, t.cast("StartResponse", checking_start_response))
- self.check_iterator(app_iter)
- return GuardedIterator(
- app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks
- )
|