console.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import code
  2. import sys
  3. import typing as t
  4. from contextvars import ContextVar
  5. from types import CodeType
  6. from markupsafe import escape
  7. from .repr import debug_repr
  8. from .repr import dump
  9. from .repr import helper
  10. if t.TYPE_CHECKING:
  11. import codeop # noqa: F401
  12. _stream: ContextVar["HTMLStringO"] = ContextVar("werkzeug.debug.console.stream")
  13. _ipy: ContextVar = ContextVar("werkzeug.debug.console.ipy")
  14. class HTMLStringO:
  15. """A StringO version that HTML escapes on write."""
  16. def __init__(self) -> None:
  17. self._buffer: t.List[str] = []
  18. def isatty(self) -> bool:
  19. return False
  20. def close(self) -> None:
  21. pass
  22. def flush(self) -> None:
  23. pass
  24. def seek(self, n: int, mode: int = 0) -> None:
  25. pass
  26. def readline(self) -> str:
  27. if len(self._buffer) == 0:
  28. return ""
  29. ret = self._buffer[0]
  30. del self._buffer[0]
  31. return ret
  32. def reset(self) -> str:
  33. val = "".join(self._buffer)
  34. del self._buffer[:]
  35. return val
  36. def _write(self, x: str) -> None:
  37. if isinstance(x, bytes):
  38. x = x.decode("utf-8", "replace")
  39. self._buffer.append(x)
  40. def write(self, x: str) -> None:
  41. self._write(escape(x))
  42. def writelines(self, x: t.Iterable[str]) -> None:
  43. self._write(escape("".join(x)))
  44. class ThreadedStream:
  45. """Thread-local wrapper for sys.stdout for the interactive console."""
  46. @staticmethod
  47. def push() -> None:
  48. if not isinstance(sys.stdout, ThreadedStream):
  49. sys.stdout = t.cast(t.TextIO, ThreadedStream())
  50. _stream.set(HTMLStringO())
  51. @staticmethod
  52. def fetch() -> str:
  53. try:
  54. stream = _stream.get()
  55. except LookupError:
  56. return ""
  57. return stream.reset()
  58. @staticmethod
  59. def displayhook(obj: object) -> None:
  60. try:
  61. stream = _stream.get()
  62. except LookupError:
  63. return _displayhook(obj) # type: ignore
  64. # stream._write bypasses escaping as debug_repr is
  65. # already generating HTML for us.
  66. if obj is not None:
  67. _ipy.get().locals["_"] = obj
  68. stream._write(debug_repr(obj))
  69. def __setattr__(self, name: str, value: t.Any) -> None:
  70. raise AttributeError(f"read only attribute {name}")
  71. def __dir__(self) -> t.List[str]:
  72. return dir(sys.__stdout__)
  73. def __getattribute__(self, name: str) -> t.Any:
  74. try:
  75. stream = _stream.get()
  76. except LookupError:
  77. stream = sys.__stdout__ # type: ignore[assignment]
  78. return getattr(stream, name)
  79. def __repr__(self) -> str:
  80. return repr(sys.__stdout__)
  81. # add the threaded stream as display hook
  82. _displayhook = sys.displayhook
  83. sys.displayhook = ThreadedStream.displayhook
  84. class _ConsoleLoader:
  85. def __init__(self) -> None:
  86. self._storage: t.Dict[int, str] = {}
  87. def register(self, code: CodeType, source: str) -> None:
  88. self._storage[id(code)] = source
  89. # register code objects of wrapped functions too.
  90. for var in code.co_consts:
  91. if isinstance(var, CodeType):
  92. self._storage[id(var)] = source
  93. def get_source_by_code(self, code: CodeType) -> t.Optional[str]:
  94. try:
  95. return self._storage[id(code)]
  96. except KeyError:
  97. return None
  98. class _InteractiveConsole(code.InteractiveInterpreter):
  99. locals: t.Dict[str, t.Any]
  100. def __init__(self, globals: t.Dict[str, t.Any], locals: t.Dict[str, t.Any]) -> None:
  101. self.loader = _ConsoleLoader()
  102. locals = {
  103. **globals,
  104. **locals,
  105. "dump": dump,
  106. "help": helper,
  107. "__loader__": self.loader,
  108. }
  109. super().__init__(locals)
  110. original_compile = self.compile
  111. def compile(source: str, filename: str, symbol: str) -> t.Optional[CodeType]:
  112. code = original_compile(source, filename, symbol)
  113. if code is not None:
  114. self.loader.register(code, source)
  115. return code
  116. self.compile = compile # type: ignore[assignment]
  117. self.more = False
  118. self.buffer: t.List[str] = []
  119. def runsource(self, source: str, **kwargs: t.Any) -> str: # type: ignore
  120. source = f"{source.rstrip()}\n"
  121. ThreadedStream.push()
  122. prompt = "... " if self.more else ">>> "
  123. try:
  124. source_to_eval = "".join(self.buffer + [source])
  125. if super().runsource(source_to_eval, "<debugger>", "single"):
  126. self.more = True
  127. self.buffer.append(source)
  128. else:
  129. self.more = False
  130. del self.buffer[:]
  131. finally:
  132. output = ThreadedStream.fetch()
  133. return f"{prompt}{escape(source)}{output}"
  134. def runcode(self, code: CodeType) -> None:
  135. try:
  136. exec(code, self.locals)
  137. except Exception:
  138. self.showtraceback()
  139. def showtraceback(self) -> None:
  140. from .tbtools import DebugTraceback
  141. exc = t.cast(BaseException, sys.exc_info()[1])
  142. te = DebugTraceback(exc, skip=1)
  143. sys.stdout._write(te.render_traceback_html()) # type: ignore
  144. def showsyntaxerror(self, filename: t.Optional[str] = None) -> None:
  145. from .tbtools import DebugTraceback
  146. exc = t.cast(BaseException, sys.exc_info()[1])
  147. te = DebugTraceback(exc, skip=4)
  148. sys.stdout._write(te.render_traceback_html()) # type: ignore
  149. def write(self, data: str) -> None:
  150. sys.stdout.write(data)
  151. class Console:
  152. """An interactive console."""
  153. def __init__(
  154. self,
  155. globals: t.Optional[t.Dict[str, t.Any]] = None,
  156. locals: t.Optional[t.Dict[str, t.Any]] = None,
  157. ) -> None:
  158. if locals is None:
  159. locals = {}
  160. if globals is None:
  161. globals = {}
  162. self._ipy = _InteractiveConsole(globals, locals)
  163. def eval(self, code: str) -> str:
  164. _ipy.set(self._ipy)
  165. old_sys_stdout = sys.stdout
  166. try:
  167. return self._ipy.runsource(code)
  168. finally:
  169. sys.stdout = old_sys_stdout