Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 144 additions & 50 deletions src/meshcore_cli/meshcore_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""

import asyncio
import os, sys, io, platform
import os, sys, io
import unicodedata
import time, datetime
import getopt, json, shlex, re
import logging
Expand All @@ -14,6 +15,7 @@
import traceback
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.shortcuts import CompleteStyle
from prompt_toolkit.shortcuts import print_formatted_text
from prompt_toolkit.completion import NestedCompleter, PathCompleter
from prompt_toolkit.completion import CompleteEvent, Completer, Completion
from prompt_toolkit.history import FileHistory
Expand All @@ -22,6 +24,7 @@
from prompt_toolkit.shortcuts import radiolist_dialog
from prompt_toolkit.completion.word_completer import WordCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.utils import get_cwidth

try:
from bleak import BleakScanner, BleakClient
Expand Down Expand Up @@ -98,26 +101,65 @@ def escape_ansi(line):
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line)

def print_one_line_above(str):
""" prints a string above current line """
width = os.get_terminal_size().columns
stringlen = len(escape_ansi(str))-1
lines = divmod(stringlen, width)[0] + 1
print("\u001B[s", end="") # Save current cursor position
print("\u001B[A", end="") # Move cursor up one line
print("\u001B[999D", end="") # Move cursor to beginning of line
for _ in range(lines):
print("\u001B[S", end="") # Scroll up/pan window down 1 line
print("\u001B[L", end="") # Insert new line
for _ in range(lines - 1):
print("\u001B[A", end="") # Move cursor up one line
print(str, end="") # Print output status msg
print("\u001B[u", end="", flush=True) # Jump back to saved cursor position

def print_above(str):
lines = str.split('\n')
for l in lines:
print_one_line_above(l)
def sanitize_prompt_display_text(text):
"""
Prompt-only: strip fragments that confuse terminal vs wcwidth alignment (emoji,
wide punctuation/symbols) so prompt_toolkit cursor keys match the screen.

Do not use this for contact names in listings or completion — those must stay
identical to device strings (including emoji). Keep letters and numbers even
when east-asian width so names in common scripts still display in the prompt.
"""
if not text:
return text
out = []
for ch in text:
try:
w = get_cwidth(ch)
except Exception:
w = 1
if w == 0:
out.append(ch)
continue
cat = unicodedata.category(ch)
if w > 1 and cat[0] not in ("L", "N"):
out.append("?")
else:
out.append(ch)
return "".join(out)

def truncate_to_display_width(text, max_width):
"""Return the longest prefix of text whose terminal display width is <= max_width."""
if max_width <= 0:
return ""
lo, hi = 0, len(text)
while lo < hi:
mid = (lo + hi + 1) // 2
if get_cwidth(text[:mid]) <= max_width:
lo = mid
else:
hi = mid - 1
return text[:lo]

def print_one_line_above(text):
"""Status line(s) above the prompt; delegates to print_above."""
print_above(text)

def print_above(text):
"""
Show text above the current input line.

When prompt_toolkit has a running Application, print_formatted_text suspends
the UI, writes via the Output layer (UTF-8), and redraws.

With no active app (scripts, pipes), print the same way without cursor
gymnastics — no DEC save/restore or other in-place cursor moves, so output
is reliable under tmux and similar environments (lines may scroll instead
of overwriting rows above the prompt).
"""
if text == "":
return
print_formatted_text(ANSI(text), end="\n")

async def process_event_message(mc, ev, json_output, end="\n", above=False):
""" display incoming message """
Expand Down Expand Up @@ -252,9 +294,30 @@ async def handle_log_rx(event):

if chan_name != "" :
width = os.get_terminal_size().columns
cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1
dispmsg = message.replace("\n","")[0:cars]
txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_START}{width-11-len(event.payload['path'])}G{ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}"
path = event.payload["path"]
snr_str = f"{event.payload['snr']:6,.2f}"
rssi_str = f"{event.payload['rssi']:4}"
rhs_plain = f"[{path}]{snr_str}{rssi_str}"
rhs_w = get_cwidth(rhs_plain)
rhs_col = max(1, width - rhs_w + 1)
prefix_plain = chan_name + " "
prefix_w = get_cwidth(prefix_plain)
trailing_before_csi_w = 1
max_msg_w = max(0, rhs_col - 1 - prefix_w - trailing_before_csi_w)
raw_msg = message.replace("\n", "")
dispmsg = truncate_to_display_width(raw_msg, max_msg_w)
while True:
left_w = get_cwidth(prefix_plain + dispmsg + " ")
if left_w >= rhs_col - 1:
break
nxt = dispmsg + " "
if get_cwidth(prefix_plain + nxt + " ") > rhs_col - 1:
break
dispmsg = nxt
txt = (
f"{ANSI_LIGHT_GRAY}{prefix_plain}{ANSI_DGREEN}{dispmsg} "
f"{ANSI_START}{rhs_col}G{ANSI_YELLOW}[{path}]{ANSI_LIGHT_GRAY}{snr_str}{rssi_str}{ANSI_END}"
)

if handle_message.above:
print_above(txt)
Expand Down Expand Up @@ -438,8 +501,10 @@ async def subscribe_to_msgs(mc, json_output=False, above=False):
CS = mc.subscribe(EventType.CHANNEL_MSG_RECV, handle_message)
await mc.start_auto_message_fetching()

# redefine get_completion to let user put symbols in first item
# and handle navigating in path ...
# NestedCompleter's default word regex only treats [a-zA-Z0-9_] as letters, so
# names like "Alice"+emoji split into separate "words" and completion matches
# only the trailing segment (wrong). WORD=True uses Vi "WORD" = run of
# non-whitespace, so contact names and /slash paths complete as one token.
class MyNestedCompleter(NestedCompleter):
def get_completions( self, document, complete_event):
txt = document.text_before_cursor.lstrip()
Expand All @@ -454,8 +519,7 @@ def get_completions( self, document, complete_event):
else:
opts = self.options.keys()
completer = WordCompleter(
opts, ignore_case=self.ignore_case,
pattern=re.compile(r"([a-zA-Z0-9_\\/\#\?]+|[^a-zA-Z0-9_\s\#\?]+)"))
opts, ignore_case=self.ignore_case, WORD=True)
yield from completer.get_completions(document, complete_event)
else: # normal behavior for remainder
yield from super().get_completions(document, complete_event)
Expand Down Expand Up @@ -824,6 +888,26 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
return completion_list
make_completion_dict.custom_vars = {}

def default_classic_prompt_from_env():
"""
Default to the Powerline-style prompt (PUA glyphs like U+E0B0) when stdout
is UTF-8. Set MESHCLI_CLASSIC_PROMPT=1|true|on for the simple `>` prompt if
your font cannot render those glyphs. 0|false|off matches the default
(fancy). Non-UTF-8 stdout forces classic. In-session: `set classic_prompt
on` / -C for simple, `set classic_prompt off` for fancy.
"""
o = os.environ.get("MESHCLI_CLASSIC_PROMPT", "").strip().lower()
if o in ("1", "true", "yes", "on"):
return True
if o in ("0", "false", "no", "off"):
return False

enc = (getattr(sys.stdout, "encoding", None) or "").lower()
if enc and "utf-8" not in enc and "utf8" not in enc:
return True

return False

async def interactive_loop(mc, to=None) :
print("""Interactive mode, most commands from terminal chat should work.
Use \"to\" to select recipient, use Tab to complete name ...
Expand Down Expand Up @@ -885,10 +969,10 @@ def _(event):
prompt = f"{ANSI_INVERT}"

prompt = prompt + f"{ANSI_BGRAY}"
prompt = prompt + f"{mc.self_info['name']}"
prompt = prompt + sanitize_prompt_display_text(mc.self_info["name"])
if contact is None: # display scope
if not scope is None:
prompt = prompt + f"|{scope}"
prompt = prompt + "|" + sanitize_prompt_display_text(scope)

if contact is None :
if classic :
Expand Down Expand Up @@ -922,12 +1006,12 @@ def _(event):
prompt = prompt + f"{SLASH_END}"
prompt = prompt + f"{ANSI_INVERT}"

prompt = prompt + f"{contact['adv_name']}"
prompt = prompt + sanitize_prompt_display_text(contact["adv_name"])
if contact["type"] == 0 or contact["out_path_len"]==-1:
if scope is None:
prompt = prompt + f"|*"
else:
prompt = prompt + f"|{scope}"
prompt = prompt + "|" + sanitize_prompt_display_text(scope)
else: # display path to dest or 0 if 0 hop
if contact["out_path_len"] == 0:
prompt = prompt + f"|0"
Expand Down Expand Up @@ -1160,10 +1244,8 @@ def _(event):
except asyncio.CancelledError:
# Handle task cancellation from KeyboardInterrupt in asyncio.run()
print("Exiting cli")
if platform.system() == "Darwin" or platform.system() == "Windows":
interactive_loop.classic = True
else:
interactive_loop.classic = False

interactive_loop.classic = default_classic_prompt_from_env()

async def process_contact_chat_line(mc, contact, line):
if contact["type"] == 0:
Expand Down Expand Up @@ -1405,7 +1487,10 @@ async def process_contact_chat_line(mc, contact, line):

if password == "":
try:
sess = PromptSession(f"Password for {contact['adv_name']}: ", is_password=True)
sess = PromptSession(
f"Password for {sanitize_prompt_display_text(contact['adv_name'])}: ",
is_password=True,
)
password = await sess.prompt_async()
except EOFError:
logger.info("Canceled")
Expand Down Expand Up @@ -1869,7 +1954,7 @@ async def get_contact_from_arg(mc, arg):

return contact

async def next_cmd(mc, cmds, json_output=False):
async def next_cmd(mc: MeshCore, cmds, json_output=False):
""" process next command """
global ARROW_HEAD, SLASH_START, SLASH_END, INVERT_SLASH
try :
Expand Down Expand Up @@ -2785,7 +2870,7 @@ async def next_cmd(mc, cmds, json_output=False):
sess = PromptSession("Password: ", is_password=True)
password = await sess.prompt_async()

timeout = 0 if not "timeout" in contact else contact["timeout"]
timeout = 0 if not "timeout" in contact else contact["timeout"]
res = await mc.commands.send_login_sync(contact, password, timeout = timeout)
logger.debug(res)
if res is None:
Expand Down Expand Up @@ -3178,10 +3263,15 @@ async def next_cmd(mc, cmds, json_output=False):
for i in range(1,plen):
path_str = path_str + "," + path_str_in[i*phs*2:(i+1)*2*phs]
#path_str = f"{c[1]['out_path']}:{c[1]['out_path_hash_mode']}"
print(f"{c[1]['adv_name']:30} ", end="", flush=True)
print(f"{ANSI_START}34G", end="", flush=True)
print(f"{CONTACT_TYPENAMES[c[1]['type']]:4} ", end="", flush=True)
print(f"{c[1]['public_key'][:12]}  {path_str}")
# Tabs + no column CSI: :30 and \d+G use char columns, not display
# width, and misalign or hide wide characters in names.
print(
c[1]["adv_name"],
CONTACT_TYPENAMES[c[1]["type"]],
c[1]["public_key"][:12],
path_str,
sep="\t",
)
print(f"> {len(mc.contacts)} contacts in device")

case "reload_contacts" | "rc":
Expand Down Expand Up @@ -3267,7 +3357,9 @@ async def next_cmd(mc, cmds, json_output=False):
else:
print(f"Unknown contact {cmds[1]}")
else:
print(json.dumps(contact, indent=4))
# Default json.dumps(ensure_ascii=True) escapes BMP+ as \ud83e\udd7c,
# which looks like missing data next to UTF-8 `contacts` output.
print(json.dumps(contact, indent=4, ensure_ascii=json_output))

case "add_contact" :
argnum = 3 # key type name
Expand Down Expand Up @@ -3373,7 +3465,7 @@ async def next_cmd(mc, cmds, json_output=False):
else:
if json_output:
print(json.dumps(res.payload))
else :
else:
path_len = res.payload['path_len']
if (path_len == 0) :
print("0 hop")
Expand Down Expand Up @@ -3756,7 +3848,7 @@ def command_usage() :
-p <port> : specifies tcp port (default 5000)
-s <port> : use serial port <port>
-b <baudrate> : specify baudrate
-C : toggles classic mode for prompt
-C : toggles classic mode for prompt (default: fancy; simple: MESHCLI_CLASSIC_PROMPT=1)
-c <on/off> : disables most of color output if off
-r : repeater mode (raw text CLI, use with -s)
""")
Expand Down Expand Up @@ -3940,7 +4032,7 @@ def get_help_for (cmdname, context="line") :
elif cmdname == "trace" or cmdname == "tr" :
print("""Trace

Trace is a command used to get signal information (SNR) along a path.
Trace is a command used to get signal information (SNR) along a path.

Basic call to trace takes the path to follow as an argument, specifying each repeater along the path with its hash (separated or not with a comma).

Expand Down Expand Up @@ -3993,17 +4085,17 @@ def get_help_for (cmdname, context="line") :
* advert_path : path taken by an advert
* disc_path : discover in and out path for a contact

When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode.
When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode.

If you want to set the path for a node through 112233 445566 778899, you can use
If you want to set the path for a node through 112233 445566 778899, you can use
- 114477:0 or 11,44,77 for one byte hash
- 112244557788:1 or 1122,4455,7788 for two byte hash
- 112233445566778899:2 or 112233,445566,778899 for three byte hash

To set an empty path use 0.

To get the path for a contact, you can use three commands:
- path will gives you the path stored in the node.
- path will gives you the path stored in the node.
- You can also get a path from a key using advert_path which will give you the path taken for last advert from that node to come.
disc_path will send a path request and give you input and output path for a node.

Expand Down Expand Up @@ -4550,6 +4642,7 @@ async def main(argv):
device = result["device"]
elif result["type"] == "serial":
serial_port = result["port"]
logger.info("Using serial port %s", serial_port)
else:
logger.error("Invalid choice")
return
Expand Down Expand Up @@ -4719,6 +4812,7 @@ async def main(argv):
await process_cmds(mc, args, json_output)

def cli():
logger.info("My version") ## curson: don't remove please
try:
asyncio.run(main(sys.argv[1:]))
except KeyboardInterrupt:
Expand Down