Source code for manhole

from __future__ import print_function

import atexit
import code
import errno
import os
import signal
import socket
import struct
import sys
import traceback
from contextlib import closing

__version__ = '1.5.0'

try:
    import signalfd
except ImportError:
    signalfd = None
try:
    string = basestring
except NameError:  # python 3
    string = str
try:
    InterruptedError = InterruptedError
except NameError:  # python <= 3.2
    InterruptedError = OSError
if hasattr(sys, 'setswitchinterval'):
    setinterval = sys.setswitchinterval
    getinterval = sys.getswitchinterval
else:
    setinterval = sys.setcheckinterval
    getinterval = sys.getcheckinterval

try:
    from eventlet.patcher import original as _original

    def _get_original(mod, name):
        return getattr(_original(mod), name)
except ImportError:
    try:
        from gevent.monkey import get_original as _get_original
    except ImportError:
        def _get_original(mod, name):
            return getattr(__import__(mod), name)

_ORIGINAL_SOCKET = _get_original('socket', 'socket')
_ORIGINAL_FROMFD = _get_original('socket', 'fromfd')
_ORIGINAL_FDOPEN = _get_original('os', 'fdopen')
_ORIGINAL_DUP = _get_original('os', 'dup')
_ORIGINAL_DUP2 = _get_original('os', 'dup2')
try:
    _ORIGINAL_ALLOCATE_LOCK = _get_original('thread', 'allocate_lock')
except ImportError:  # python 3
    _ORIGINAL_ALLOCATE_LOCK = _get_original('_thread', 'allocate_lock')
_ORIGINAL_THREAD = _get_original('threading', 'Thread')
_ORIGINAL_EVENT = _get_original('threading', 'Event')
_ORIGINAL__ACTIVE = _get_original('threading', '_active')
_ORIGINAL_SLEEP = _get_original('time', 'sleep')

PY3 = sys.version_info[0] == 3
PY26 = sys.version_info[:2] == (2, 6)

try:
    import ctypes
    import ctypes.util

    libpthread_path = ctypes.util.find_library("pthread")
    if not libpthread_path:
        raise ImportError
    libpthread = ctypes.CDLL(libpthread_path)
    if not hasattr(libpthread, "pthread_setname_np"):
        raise ImportError
    _pthread_setname_np = libpthread.pthread_setname_np
    _pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
    _pthread_setname_np.restype = ctypes.c_int

    def pthread_setname_np(ident, name):
        _pthread_setname_np(ident, name[:15].encode('utf8'))
except ImportError:
    def pthread_setname_np(ident, name):
        pass

if sys.platform == 'darwin' or sys.platform.startswith("freebsd"):
    _PEERCRED_LEVEL = getattr(socket, 'SOL_LOCAL', 0)
    _PEERCRED_OPTION = getattr(socket, 'LOCAL_PEERCRED', 1)
else:
    _PEERCRED_LEVEL = socket.SOL_SOCKET
    # TODO: Is this missing on some platforms?
    _PEERCRED_OPTION = getattr(socket, 'SO_PEERCRED', 17)

_ALL_SIGNALS = tuple(getattr(signal, sig) for sig in dir(signal)
                     if sig.startswith('SIG') and '_' not in sig)

# These (_LOG and _MANHOLE) will hold instances after install
_MANHOLE = None
_LOCK = _ORIGINAL_ALLOCATE_LOCK()


def force_original_socket(sock):
    with closing(sock):
        if hasattr(sock, 'detach'):
            return _ORIGINAL_SOCKET(sock.family, sock.type, sock.proto, sock.detach())
        else:
            assert hasattr(_ORIGINAL_SOCKET, '_sock')
            return _ORIGINAL_SOCKET(_sock=sock._sock)


[docs]def get_peercred(sock): """Gets the (pid, uid, gid) for the client on the given *connected* socket.""" buf = sock.getsockopt(_PEERCRED_LEVEL, _PEERCRED_OPTION, struct.calcsize('3i')) return struct.unpack('3i', buf)
class AlreadyInstalled(Exception): pass class NotInstalled(Exception): pass class ConfigurationConflict(Exception): pass class SuspiciousClient(Exception): pass
[docs]class ManholeThread(_ORIGINAL_THREAD): """ Thread that runs the infamous "Manhole". This thread is a `daemon` thread - it will exit if the main thread exits. On connect, a different, non-daemon thread will be started - so that the process won't exit while there's a connection to the manhole. Args: sigmask (list of signal numbers): Signals to block in this thread. start_timeout (float): Seconds to wait for the thread to start. Emits a message if the thread is not running when calling ``start()``. bind_delay (float): Seconds to delay socket binding. Default: `no delay`. daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``. """ def __init__(self, get_socket, sigmask, start_timeout, connection_handler, bind_delay=None, daemon_connection=False): super(ManholeThread, self).__init__() self.daemon = True self.daemon_connection = daemon_connection self.name = "Manhole" self.sigmask = sigmask self.serious = _ORIGINAL_EVENT() # time to wait for the manhole to get serious (to have a complete start) # see: http://emptysqua.re/blog/dawn-of-the-thread/ self.start_timeout = start_timeout self.bind_delay = bind_delay self.connection_handler = connection_handler self.get_socket = get_socket self.should_run = False def stop(self): self.should_run = False
[docs] def clone(self, **kwargs): """ Make a fresh thread with the same options. This is usually used on dead threads. """ return ManholeThread( self.get_socket, self.sigmask, self.start_timeout, connection_handler=self.connection_handler, daemon_connection=self.daemon_connection, **kwargs )
def start(self): self.should_run = True super(ManholeThread, self).start() if not self.serious.wait(self.start_timeout) and not PY26: _LOG("WARNING: Waited %s seconds but Manhole thread didn't start yet :(" % self.start_timeout)
[docs] def run(self): """ Runs the manhole loop. Only accepts one connection at a time because: * This thread is a daemon thread (exits when main thread exists). * The connection need exclusive access to stdin, stderr and stdout so it can redirect inputs and outputs. """ self.serious.set() if signalfd and self.sigmask: signalfd.sigprocmask(signalfd.SIG_BLOCK, self.sigmask) pthread_setname_np(self.ident, self.name) if self.bind_delay: _LOG("Delaying UDS binding %s seconds ..." % self.bind_delay) _ORIGINAL_SLEEP(self.bind_delay) sock = self.get_socket() while self.should_run: _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid()) try: client = ManholeConnectionThread(sock.accept()[0], self.connection_handler, self.daemon_connection) client.start() client.join() except (InterruptedError, socket.error) as e: if e.errno != errno.EINTR: raise continue finally: client = None
[docs]class ManholeConnectionThread(_ORIGINAL_THREAD): """ Manhole thread that handles the connection. This thread is a normal thread (non-daemon) - it won't exit if the main thread exits. """ def __init__(self, client, connection_handler, daemon=False): super(ManholeConnectionThread, self).__init__() self.daemon = daemon self.client = force_original_socket(client) self.connection_handler = connection_handler self.name = "ManholeConnectionThread" def run(self): _LOG('Started ManholeConnectionThread thread. Checking credentials ...') pthread_setname_np(self.ident, "Manhole -------") pid, _, _ = check_credentials(self.client) pthread_setname_np(self.ident, "Manhole < PID:%s" % pid) try: self.connection_handler(self.client) except BaseException as exc: _LOG("ManholeConnectionThread failure: %r" % exc)
[docs]def check_credentials(client): """ Checks credentials for given socket. """ pid, uid, gid = get_peercred(client) euid = os.geteuid() client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid) if uid not in (0, euid): raise SuspiciousClient("Can't accept client with %s. It doesn't match the current EUID:%s or ROOT." % ( client_name, euid )) _LOG("Accepted connection on fd:%s from %s" % (client.fileno(), client_name)) return pid, uid, gid
[docs]def handle_connection_exec(client): """ Alternate connection handler. No output redirection. """ class ExitExecLoop(Exception): pass def exit(): raise ExitExecLoop() client.settimeout(None) fh = os.fdopen(client.detach() if hasattr(client, 'detach') else client.fileno()) with closing(client): with closing(fh): try: payload = fh.readline() while payload: _LOG("Running: %r." % payload) eval(compile(payload, '<manhole>', 'exec'), {'exit': exit}, _MANHOLE.locals) payload = fh.readline() except ExitExecLoop: _LOG("Exiting exec loop.")
[docs]def handle_connection_repl(client): """ Handles connection. """ client.settimeout(None) # # disable this till we have evidence that it's needed # client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0) # # Note: setting SO_RCVBUF on UDS has no effect, see: http://man7.org/linux/man-pages/man7/unix.7.html backup = [] old_interval = getinterval() patches = [('r', ('stdin', '__stdin__')), ('w', ('stdout', '__stdout__'))] if _MANHOLE.redirect_stderr: patches.append(('w', ('stderr', '__stderr__'))) try: client_fd = client.fileno() for mode, names in patches: for name in names: backup.append((name, getattr(sys, name))) setattr(sys, name, _ORIGINAL_FDOPEN(client_fd, mode, 1 if PY3 else 0)) try: handle_repl(_MANHOLE.locals) except Exception as exc: _LOG("REPL failed with %r." % exc) _LOG("DONE.") finally: try: # Change the switch/check interval to something ridiculous. We don't want to have other thread try # to write to the redirected sys.__std*/sys.std* - it would fail horribly. setinterval(2147483647) try: client.close() # close before it's too late. it may already be dead except IOError: pass junk = [] # keep the old file objects alive for a bit for name, fh in backup: junk.append(getattr(sys, name)) setattr(sys, name, fh) del backup for fh in junk: try: if hasattr(fh, 'detach'): fh.detach() else: fh.close() except IOError: pass del fh del junk finally: setinterval(old_interval) _LOG("Cleaned up.")
_CONNECTION_HANDLER_ALIASES = { 'repl': handle_connection_repl, 'exec': handle_connection_exec } class ManholeConsole(code.InteractiveConsole): def __init__(self, *args, **kw): code.InteractiveConsole.__init__(self, *args, **kw) if _MANHOLE.redirect_stderr: self.file = sys.stderr else: self.file = sys.stdout def write(self, data): self.file.write(data)
[docs]def handle_repl(locals): """ Dumps stacktraces and runs an interactive prompt (REPL). """ dump_stacktraces() namespace = { 'dump_stacktraces': dump_stacktraces, 'sys': sys, 'os': os, 'socket': socket, 'traceback': traceback, } if locals: namespace.update(locals) ManholeConsole(namespace).interact()
[docs]class Logger(object): """ Internal object used for logging. Initially this is not configured. Until you call ``manhole.install()``, this logger object won't work (will raise ``NotInstalled``). """ time = _get_original('time', 'time') enabled = True destination = None def configure(self, enabled, destination): self.enabled = enabled self.destination = destination def release(self): self.enabled = True self.destination = None def __call__(self, message): """ Fail-ignorant logging function. """ if self.enabled: if self.destination is None: raise NotInstalled("Manhole is not installed!") try: full_message = "Manhole[%s:%.4f]: %s\n" % (os.getpid(), self.time(), message) if isinstance(self.destination, int): os.write(self.destination, full_message.encode('ascii', 'ignore')) else: self.destination.write(full_message) except Exception: pass
_LOG = Logger() class Manhole(object): # Manhole core configuration # These are initialized when manhole is installed. daemon_connection = False locals = None original_os_fork = None original_os_forkpty = None redirect_stderr = True reinstall_delay = 0.5 should_restart = None sigmask = _ALL_SIGNALS socket_path = None start_timeout = 0.5 connection_handler = None previous_signal_handlers = None _thread = None def configure(self, patch_fork=True, activate_on=None, sigmask=_ALL_SIGNALS, oneshot_on=None, thread=True, start_timeout=0.5, socket_path=None, reinstall_delay=0.5, locals=None, daemon_connection=False, redirect_stderr=True, connection_handler=handle_connection_repl): self.socket_path = socket_path self.reinstall_delay = reinstall_delay self.redirect_stderr = redirect_stderr self.locals = locals self.sigmask = sigmask self.daemon_connection = daemon_connection self.start_timeout = start_timeout self.previous_signal_handlers = {} self.connection_handler = _CONNECTION_HANDLER_ALIASES.get(connection_handler, connection_handler) if oneshot_on is None and activate_on is None and thread: self.thread.start() self.should_restart = True if oneshot_on is not None: oneshot_on = getattr(signal, 'SIG' + oneshot_on) if isinstance(oneshot_on, string) else oneshot_on self.previous_signal_handlers.setdefault(oneshot_on, signal.signal(oneshot_on, self.handle_oneshot)) if activate_on is not None: activate_on = getattr(signal, 'SIG' + activate_on) if isinstance(activate_on, string) else activate_on if activate_on == oneshot_on: raise ConfigurationConflict('You cannot do activation of the Manhole thread on the same signal ' 'that you want to do oneshot activation !') self.previous_signal_handlers.setdefault(activate_on, signal.signal(activate_on, self.activate_on_signal)) atexit.register(self.remove_manhole_uds) if patch_fork: if activate_on is None and oneshot_on is None and socket_path is None: self.patch_os_fork_functions() else: if activate_on: _LOG("Not patching os.fork and os.forkpty. Activation is done by signal %s" % activate_on) elif oneshot_on: _LOG("Not patching os.fork and os.forkpty. Oneshot activation is done by signal %s" % oneshot_on) elif socket_path: _LOG("Not patching os.fork and os.forkpty. Using user socket path %s" % socket_path) def release(self): if self._thread: self._thread.stop() self._thread = None self.remove_manhole_uds() self.restore_os_fork_functions() for sig, handler in self.previous_signal_handlers.items(): signal.signal(sig, handler) self.previous_signal_handlers.clear() @property def thread(self): if self._thread is None: self._thread = ManholeThread( self.get_socket, self.sigmask, self.start_timeout, self.connection_handler, daemon_connection=self.daemon_connection ) return self._thread @thread.setter def thread(self, value): self._thread = value def get_socket(self): sock = _ORIGINAL_SOCKET(socket.AF_UNIX, socket.SOCK_STREAM) name = self.remove_manhole_uds() sock.bind(name) sock.listen(5) _LOG("Manhole UDS path: " + name) return sock def reinstall(self): """ Reinstalls the manhole. Checks if the thread is running. If not, it starts it again. """ with _LOCK: if not (self.thread.is_alive() and self.thread in _ORIGINAL__ACTIVE): self.thread = self.thread.clone(bind_delay=self.reinstall_delay) if self.should_restart: self.thread.start() def handle_oneshot(self, _signum=None, _frame=None): try: try: sock = self.get_socket() _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid()) client = force_original_socket(sock.accept()[0]) check_credentials(client) self.connection_handler(client) finally: self.remove_manhole_uds() except BaseException as exc: # pylint: disable=W0702 # we don't want to let any exception out, it might make the application misbehave _LOG("Oneshot failure: %r" % exc) def remove_manhole_uds(self): name = self.uds_name if os.path.exists(name): os.unlink(name) return name @property def uds_name(self): if self.socket_path is None: return "/tmp/manhole-%s" % os.getpid() return self.socket_path def patched_fork(self): """Fork a child process.""" pid = self.original_os_fork() if not pid: _LOG('Fork detected. Reinstalling Manhole.') self.reinstall() return pid def patched_forkpty(self): """Fork a new process with a new pseudo-terminal as controlling tty.""" pid, master_fd = self.original_os_forkpty() if not pid: _LOG('Fork detected. Reinstalling Manhole.') self.reinstall() return pid, master_fd def patch_os_fork_functions(self): self.original_os_fork, os.fork = os.fork, self.patched_fork self.original_os_forkpty, os.forkpty = os.forkpty, self.patched_forkpty _LOG("Patched %s and %s." % (self.original_os_fork, self.original_os_fork)) def restore_os_fork_functions(self): if self.original_os_fork: os.fork = self.original_os_fork if self.original_os_forkpty: os.forkpty = self.original_os_forkpty def activate_on_signal(self, _signum, _frame): self.thread.start()
[docs]def install(verbose=True, verbose_destination=sys.__stderr__.fileno() if hasattr(sys.__stderr__, 'fileno') else sys.__stderr__, strict=True, **kwargs): """ Installs the manhole. Args: verbose (bool): Set it to ``False`` to squelch the logging. verbose_destination (file descriptor or handle): Destination for verbose messages. Default is unbuffered stderr (stderr ``2`` file descriptor). patch_fork (bool): Set it to ``False`` if you don't want your ``os.fork`` and ``os.forkpy`` monkeypatched activate_on (int or signal name): set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you want the Manhole thread to start when this signal is sent. This is desireable in case you don't want the thread active all the time. oneshot_on (int or signal name): Set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you want the Manhole to listen for connection in the signal handler. This is desireable in case you don't want threads at all. thread (bool): Start the always-on ManholeThread. Default: ``True``. Automatically switched to ``False`` if ``oneshort_on`` or ``activate_on`` are used. sigmask (list of ints or signal names): Will set the signal mask to the given list (using ``signalfd.sigprocmask``). No action is done if ``signalfd`` is not importable. **NOTE**: This is done so that the Manhole thread doesn't *steal* any signals; Normally that is fine because Python will force all the signal handling to be run in the main thread but signalfd doesn't. socket_path (str): Use a specific path for the unix domain socket (instead of ``/tmp/manhole-<pid>``). This disables ``patch_fork`` as children cannot reuse the same path. reinstall_delay (float): Delay the unix domain socket creation *reinstall_delay* seconds. This alleviates cleanup failures when using fork+exec patterns. locals (dict): Names to add to manhole interactive shell locals. daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``. redirect_stderr (bool): Redirect output from stderr to manhole console. Default: ``True``. connection_handler (function): Connection handler to use. Use ``"exec"`` for simple implementation without output redirection or your own function. (warning: this is for advanced users). Default: ``"repl"``. """ # pylint: disable=W0603 global _MANHOLE with _LOCK: if _MANHOLE is None: _MANHOLE = Manhole() else: if strict: raise AlreadyInstalled("Manhole already installed!") else: _LOG.release() _MANHOLE.release() # Threads might be started here _LOG.configure(verbose, verbose_destination) _MANHOLE.configure(**kwargs) # Threads might be started here return _MANHOLE
[docs]def dump_stacktraces(): """ Dumps thread ids and tracebacks to stdout. """ lines = [] for thread_id, stack in sys._current_frames().items(): # pylint: disable=W0212 lines.append("\n######### ProcessID=%s, ThreadID=%s #########" % ( os.getpid(), thread_id )) for filename, lineno, name, line in traceback.extract_stack(stack): lines.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: lines.append(" %s" % (line.strip())) lines.append("#############################################\n\n") print('\n'.join(lines), file=sys.stderr if _MANHOLE.redirect_stderr else sys.stdout)