###############################################################################
#
# Crossbar.io Shell
# Copyright (c) typedef int GmbH. Licensed under EUPLv1.2.
#
###############################################################################
import os
import shlex
import sys
from collections import defaultdict
import click
import click._bashcomplete
import click.parser
import six
from autobahn.wamp.exception import ApplicationError
from prompt_toolkit import prompt
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.history import InMemoryHistory
from prompt_toolkit.styles import style_from_pygments_dict
from pygments.token import Token
from crossbar.shell.util import style_error
[docs]
_style = style_from_pygments_dict(
{
Token.Toolbar: "#ffffff bg:#333333",
}
)
[docs]
class InternalCommandException(Exception):
pass
[docs]
class ExitReplException(InternalCommandException):
pass
[docs]
_internal_commands = dict()
[docs]
def _register_internal_command(names, target, description=None):
if not hasattr(target, "__call__"):
raise ValueError("Internal command must be a callable")
if isinstance(names, six.string_types):
names = [names]
elif not isinstance(names, (list, tuple)):
raise ValueError('"names" must be a string or a list / tuple')
for name in names:
_internal_commands[name] = (target, description)
[docs]
def _get_registered_target(name, default=None):
target_info = _internal_commands.get(name)
if target_info:
return target_info[0]
return default
[docs]
def _exit_internal():
raise ExitReplException()
[docs]
def _help_internal():
formatter = click.HelpFormatter()
formatter.write_heading("REPL help")
formatter.indent()
with formatter.section("External Commands"):
formatter.write_text('prefix external commands with "!"')
with formatter.section("Internal Commands"):
formatter.write_text('prefix internal commands with ":"')
info_table = defaultdict(list) # type: ignore
for mnemonic, target_info in six.iteritems(_internal_commands):
info_table[target_info[1]].append(mnemonic)
formatter.write_dl(
(", ".join((":{0}".format(mnemonic) for mnemonic in sorted(mnemonics))), description)
for description, mnemonics in six.iteritems(info_table)
)
return formatter.getvalue()
_register_internal_command(["q", "quit", "exit"], _exit_internal, "exits the repl")
_register_internal_command(["?", "h", "help"], _help_internal, "displays general help information")
[docs]
class ClickCompleter(Completer):
def __init__(self, cli):
[docs]
def get_completions(self, document, complete_event=None):
# Code analogous to click._bashcomplete.do_complete
try:
args = shlex.split(document.text_before_cursor)
except ValueError:
# Invalid command, perhaps caused by missing closing quotation.
return
cursor_within_command = document.text_before_cursor.rstrip() == document.text_before_cursor
if args and cursor_within_command:
# We've entered some text and no space, give completions for the
# current word.
incomplete = args.pop()
else:
# We've not entered anything, either at all or for the current
# command, so give all relevant completions for this context.
incomplete = ""
# FIXME
_bc = click._bashcomplete # type: ignore
ctx = _bc.resolve_ctx(self.cli, "", args)
if ctx is None:
return
cmds = []
c = ctx
while c:
cmds.append(c.command.name)
c = c.parent
cmds.reverse()
# print(cmds)
# if ctx.parent:
# print('COMMAND: ', ctx.parent.command.name)
# pprint(dir(ctx.command))
# print(document.get_word_before_cursor())
# print(document.get_word_before_cursor(WORD=True))
choices = []
for param in ctx.command.params:
if not isinstance(param, click.Option):
continue
for options in (param.opts, param.secondary_opts):
for o in options:
choices.append(Completion(o, -len(incomplete), display_meta=param.help))
if isinstance(ctx.command, click.MultiCommand):
for name in ctx.command.list_commands(ctx):
command = ctx.command.get_command(ctx, name) # type: ignore
choices.append(Completion(name, -len(incomplete), display_meta=getattr(command, "short_help")))
for item in choices:
if item.text.startswith(incomplete):
yield item
[docs]
def continuation_tokens(cli, width):
"The continuation: display dots before all the following lines."
# (make sure that the width of the continuation does not exceed the given
# width. -- It is the prompt that decides the width of the left margin.)
return [(Token, "." * (width - 1) + " ")]
[docs]
async def repl(
old_ctx,
prompt_kwargs=None,
allow_system_commands=True,
allow_internal_commands=True,
once=False,
get_bottom_toolbar_tokens=_get_bottom_toolbar_tokens,
get_prompt_tokens=None,
style=_style,
):
"""
Start an interactive shell. All subcommands are available in it.
:param old_ctx: The current Click context.
:param prompt_kwargs: Parameters passed to
:py:func:`prompt_toolkit.shortcuts.prompt`.
If stdin is not a TTY, no prompt will be printed, but only commands read
from stdin.
"""
# parent should be available, but we're not going to bother if not
group_ctx = old_ctx.parent or old_ctx
group = group_ctx.command
isatty = sys.stdin.isatty()
# Delete the REPL command from those available, as we don't want to allow
# nesting REPLs (note: pass `None` to `pop` as we don't want to error if
# REPL command already not present for some reason).
repl_command_name = old_ctx.command.name
available_commands = group_ctx.command.commands
available_commands.pop(repl_command_name, None)
if isatty:
prompt_kwargs = prompt_kwargs or {}
if not get_prompt_tokens:
prompt_kwargs.setdefault("message", ">> ")
history = prompt_kwargs.pop("history", None) or InMemoryHistory()
completer = prompt_kwargs.pop("completer", None) or ClickCompleter(group)
def get_command():
return prompt(
completer=completer,
history=history,
# patch_stdout=False,
# https://github.com/jonathanslenders/python-prompt-toolkit/blob/master/examples/get-multiline-input.py
# multiline=True,
# get_continuation_tokens=continuation_tokens,
# get_bottom_toolbar_tokens=get_bottom_toolbar_tokens,
# get_prompt_tokens=get_prompt_tokens,
style=style,
async_=True,
**prompt_kwargs,
)
else:
get_command = sys.stdin.readline
stopped = False
while not stopped:
try:
command = await get_command()
except KeyboardInterrupt:
continue
except EOFError:
break
finally:
if once:
stopped = True
if not command:
if isatty:
continue
else:
break
if allow_system_commands and dispatch_repl_commands(command):
continue
if allow_internal_commands:
try:
result = handle_internal_commands(command)
if isinstance(result, six.string_types):
click.echo(result)
continue
except ExitReplException:
break
args = shlex.split(command)
try:
with group.make_context(None, args, parent=group_ctx) as ctx:
f = group.invoke(ctx)
if f:
await f
ctx.exit()
except ApplicationError as e:
click.echo(style_error("[{}] {}".format(e.error, e.args[0])))
except click.ClickException as e:
e.show()
except SystemExit:
pass
[docs]
def register_repl(group, name="repl"):
"""Register :func:`repl()` as sub-command *name* of *group*."""
group.command(name=name)(click.pass_context(repl))
[docs]
def dispatch_repl_commands(command):
"""Execute system commands entered in the repl.
System commands are all commands starting with "!".
"""
if command.startswith("!"):
os.system(command[1:]) # nosec
return True
return False
[docs]
def handle_internal_commands(command):
"""Run repl-internal commands.
Repl-internal commands are all commands starting with ":".
"""
if command.startswith(":"):
target = _get_registered_target(command[1:], default=None)
if target:
return target()