diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index b012c989..09001cab 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -3977,8 +3977,8 @@ def subnets_create( wallet_name, wallet_path, wallet_hotkey, - ask_for=[WO.NAME, WO.PATH, WO.HOTKEY], - validate=WV.WALLET_AND_HOTKEY, + ask_for=[WO.NAME, WO.PATH], + validate=WV.WALLET, ) return self._run_command( subnets.create(wallet, self.initialize_chain(network), prompt) diff --git a/bittensor_cli/src/commands/subnets.py b/bittensor_cli/src/commands/subnets.py index 122fec6a..6c4faee4 100644 --- a/bittensor_cli/src/commands/subnets.py +++ b/bittensor_cli/src/commands/subnets.py @@ -86,6 +86,7 @@ async def _find_event_attributes_in_extrinsic_receipt( print_verbose("Fetching lock_cost") burn_cost = await lock_cost(subtensor) + print(burn_cost) if burn_cost > your_balance: err_console.print( f"Your balance of: [green]{your_balance}[/green] is not enough to pay the subnet lock cost of: " @@ -363,7 +364,7 @@ async def lock_cost(subtensor: "SubtensorInterface") -> Optional[Balance]: method="get_network_registration_cost", params=[], ) - if lc: + if lc is not None: lock_cost_ = Balance(lc) console.print(f"Subnet lock cost: [green]{lock_cost_}[/green]") return lock_cost_ diff --git a/btqs/__init__.py b/btqs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/btqs/btqs_cli.py b/btqs/btqs_cli.py new file mode 100644 index 00000000..90b47f2e --- /dev/null +++ b/btqs/btqs_cli.py @@ -0,0 +1,574 @@ +import os +import platform +from typing import Optional + +import psutil +from rich.prompt import Confirm +import typer +from btqs.commands import chain, neurons, subnet +from rich.table import Table + +from .config import ( + BTQS_LOCK_CONFIG_FILE_PATH, + EPILOG, + LOCALNET_ENDPOINT, + DEFAULT_WORKSPACE_DIRECTORY, + SUBTENSOR_BRANCH, +) +from .utils import ( + console, + display_process_status_table, + get_bittensor_wallet_version, + get_btcli_version, + get_process_entries, + is_chain_running, + load_config, + get_bittensor_version, + print_info, + print_step, + print_success, + print_warning, + print_info_box, +) + + +class BTQSManager: + """ + Bittensor Quick Start (BTQS) Manager. + Handles CLI commands for managing the local chain and neurons. + """ + + def __init__(self): + self.app = typer.Typer( + rich_markup_mode="rich", + epilog=EPILOG, + no_args_is_help=True, + help="BTQS CLI - Bittensor Quickstart", + add_completion=False, + ) + + self.chain_app = typer.Typer(help="Subtensor Chain operations") + self.subnet_app = typer.Typer(help="Subnet setup") + self.neurons_app = typer.Typer(help="Neuron management") + + self.app.add_typer(self.chain_app, name="chain", no_args_is_help=True) + self.app.add_typer(self.subnet_app, name="subnet", no_args_is_help=True) + self.app.add_typer(self.neurons_app, name="neurons", no_args_is_help=True) + + # Core commands + self.app.command(name="run-all", help="Create entire setup")(self.run_all) + self.app.command(name="status", help="Current status of bittensor quick start")( + self.status_neurons + ) + self.app.command(name="steps", help="Display steps for subnet setup")( + self.setup_steps + ) + self.app.command(name="live")(self.display_live_metagraph) + + # Chain commands + self.chain_app.command(name="start")(self.start_chain) + self.chain_app.command(name="stop")(self.stop_chain) + self.chain_app.command(name="reattach")(self.reattach_chain) + self.chain_app.command(name="status")(self.status_neurons) + + # Subnet commands + self.subnet_app.command(name="setup")(self.setup_subnet) + self.subnet_app.command(name="live")(self.display_live_metagraph) + self.subnet_app.command(name="stake", hidden=True)(self.add_stake) + self.subnet_app.command(name="add-weights", hidden=True)(self.add_weights) + + # Neuron commands + self.neurons_app.command(name="setup")(self.setup_neurons) + self.neurons_app.command(name="run")(self.run_neurons) + self.neurons_app.command(name="stop")(self.stop_neurons) + self.neurons_app.command(name="reattach")(self.reattach_neurons) + self.neurons_app.command(name="status")(self.status_neurons) + self.neurons_app.command(name="start")(self.start_neurons) + + self.verbose = False + self.fast_blocks = True + self.workspace_path = None + self.subtensor_branch = None + self.skip_rust = None + self.steps = [ + { + "title": "Start Local Subtensor", + "command": "btqs chain start", + "description": "Initialize and start the local Subtensor blockchain. This may take several minutes due to the compilation process.", + "info": "πŸ”— **Subtensor** is the underlying blockchain network that facilitates decentralized activities. Starting the local chain sets up your personal development environment for experimenting with Bittensor.", + "action": lambda: self.start_chain( + workspace_path=self.workspace_path, + branch=self.subtensor_branch, + fast_blocks=self.fast_blocks, + verbose=self.verbose, + skip_rust=self.skip_rust, + ), + }, + { + "title": "Set Up Subnet", + "command": "btqs subnet setup", + "description": "Create a subnet owner wallet, establish a new subnet, and register the owner to the subnet. Register to root, add stake, and set weights.", + "info": "πŸ”‘ **Wallets** (coldkeys) in Bittensor are essential for managing your stake, interacting with the network, and running validators or miners. Each wallet (coldkey) has a unique name and an associated hotkey that serves as your identity within the network.", + "action": lambda: self.setup_subnet(), + }, + { + "title": "Set Up Neurons (Miners)", + "command": "btqs neurons setup", + "description": "Create miner wallets and register them to the subnet.", + "info": "βš’οΈ **Miners** perform tasks that are given to them by validators. A miner can be registered into a subnet with a coldkey and a hotkey pair.", + "action": lambda: self.setup_neurons(), + }, + { + "title": "Run Neurons", + "command": "btqs neurons run", + "description": "Start all neuron (miner & validator) processes.", + "info": "πŸƒ Running neurons means starting one or more validator processes and one or more miner processes. In practice validators and miners are subnet-specific.\nHowever, in this tutorial we will use simple validator and miner modules. Configure your own setup using btqs_config.py.", # Add path + "action": lambda: self.run_neurons(verbose=self.verbose), + }, + ] + + def start_chain( + self, + workspace_path: Optional[str] = typer.Option( + None, + "--path", + "--workspace", + help="Path to Bittensor's quick start workspace", + ), + branch: Optional[str] = typer.Option( + None, "--subtensor_branch", help="Subtensor branch to checkout" + ), + fast_blocks: bool = typer.Option( + True, "--fast/--slow", help="Enable or disable fast blocks" + ), + verbose: bool = typer.Option( + False, "--verbose", "-v", help="Enable verbose output" + ), + skip_rust: bool = typer.Option( + False, "--skip-rust", help="Skip Rust installation" + ), + ): + """ + Starts the local Subtensor chain. + """ + config_data = load_config(exit_if_missing=False) + if is_chain_running(): + console.print( + f"[red]The local chain is already running. Endpoint: {LOCALNET_ENDPOINT}" + ) + return + if not workspace_path: + if config_data.get("workspace_path"): + reuse_config_path = Confirm.ask( + f"[blue]Previously saved workspace_path found: [green]({config_data.get('workspace_path')}) \n[blue]Do you want to re-use it?", + default=True, + show_default=True, + ) + if reuse_config_path: + workspace_path = config_data.get("workspace_path") + else: + print_info( + "The development working directory will host subnet and subtensor repos, logs, and wallets created during the quick start.", + emoji="πŸ’‘ ", + ) + workspace_path = typer.prompt( + typer.style( + "Enter path to the development working directory (Press Enter for default)", + fg="blue", + ), + default=DEFAULT_WORKSPACE_DIRECTORY, + ) + + if not branch: + branch = typer.prompt( + typer.style( + "Enter Subtensor branch (press Enter for default)", fg="blue" + ), + default=SUBTENSOR_BRANCH, + ) + chain.start( + config_data, + workspace_path, + branch, + fast_blocks=fast_blocks, + verbose=verbose, + skip_rust=skip_rust, + ) + + def stop_chain(self): + """ + Stops the local Subtensor chain and any running miners. + + This command terminates the local Subtensor chain process, miner and validator processes, and optionally cleans up configuration data. + + USAGE + + [green]$[/green] btqs chain stop + + [bold]Note[/bold]: Use this command to gracefully shut down the local chain. It will also stop any running miner processes. + """ + config_data = load_config( + "No running chain found. Please run `btqs chain start` first." + ) + chain.stop(config_data) + + def reattach_chain(self): + """ + Reattaches to the running local chain. + + This command allows you to view the logs of the running local Subtensor chain. + + USAGE + + [green]$[/green] btqs chain reattach + + [bold]Note[/bold]: Press Ctrl+C to detach from the chain logs. + """ + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + chain.reattach(config_data) + + def run_all( + self, + workspace_path: Optional[str] = typer.Option( + None, + "--path", + "--workspace", + help="Path to Bittensor's quick start workspace", + ), + fast_blocks: bool = typer.Option( + True, "--fast/--slow", help="Enable or disable fast blocks" + ), + verbose: bool = typer.Option( + False, "--verbose", "-v", help="Enable verbose output" + ), + skip_rust: bool = typer.Option( + False, "--skip-rust", help="Skip Rust installation" + ), + ): + """ + Runs all commands in sequence to set up and start the local chain, subnet, and neurons. + """ + + self.workspace_path = workspace_path + self.fast_blocks = fast_blocks + self.verbose = verbose + self.skip_rust = skip_rust + + console.clear() + print_info("Welcome to the Bittensor Quick Start Tutorial", emoji="πŸš€") + console.print( + "\nThis tutorial will guide you through setting up the local chain, subnet, and neurons (miners + validators).\n", + style="magenta", + ) + + for idx, step in enumerate(self.steps, start=1): + print_step(step["title"], step["description"], idx) + if "info" in step: + print_info_box(step["info"], title="Info") + + console.print( + f"[blue]Press [yellow]Enter[/yellow] to continue to the [dark_orange]Step {idx}[/dark_orange] or [yellow]Ctrl+C[/yellow] to exit.\n" + ) + try: + input() + except KeyboardInterrupt: + print_warning("Tutorial interrupted by user. Exiting...") + return + + # Execute the action + step["action"]() + console.print(f"\n🏁 Step {idx} has finished!\n") + + print_success( + "Your local chain, subnet, and neurons are up and running", emoji="πŸŽ‰" + ) + console.print( + "[green]Next, execute the following command to get a live view of all the progress through the metagraph: [dark_green]$ [dark_orange]btqs live" + ) + + def display_live_metagraph(self): + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + subnet.display_live_metagraph(config_data) + + def setup_steps(self): + """ + Display the steps for subnet setup. + """ + table = Table( + title="[bold dark_orange]Setup Steps", + header_style="dark_orange", + leading=True, + show_edge=False, + border_style="bright_black", + ) + table.add_column("Step", style="cyan", width=5, justify="center") + table.add_column( + "Command", justify="left" + ) # Removed 'style' to allow inline styling + table.add_column("Title", justify="left", style="white") + table.add_column("Description", justify="left", style="white") + + for idx, step in enumerate(self.steps, start=1): + command_with_prefix = ( + f"[blue]$[/blue] [green]{step.get('command', '')}[/green]" + ) + table.add_row( + str(idx), + command_with_prefix, + step["title"], + step["description"], + ) + + console.print(table) + console.print( + "\n[dark_orange]You can run an automated script covering all the steps using:\n" + ) + console.print("[blue]$ [green]btqs run-all") + + def add_stake(self): + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + subnet.add_stake(config_data) + + def add_weights(self): + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + subnet.add_weights(config_data) + + def setup_subnet(self): + """ + Sets up a subnet on the local chain. + + This command creates a subnet owner wallet, creates a subnet with netuid 1, and registers the owner to the subnet. + + USAGE + + [green]$[/green] btqs subnet setup + + [bold]Note[/bold]: Ensure the local chain is running before executing this command. + """ + if not is_chain_running(BTQS_LOCK_CONFIG_FILE_PATH): + console.print( + "[red]Local chain is not running. Please start the chain first." + ) + return + + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + subnet.setup_subnet(config_data) + console.print( + "🎊 Subnet setup is complete! Press any key to continue adding stake...\n" + ) + input() + subnet.add_stake(config_data) + console.print("\nπŸ‘ Added stake! Press any key to continue adding weights...\n") + input() + subnet.add_weights(config_data) + + def setup_neurons(self): + """ + Sets up neurons (miners) for the subnet. + """ + if not is_chain_running(BTQS_LOCK_CONFIG_FILE_PATH): + console.print( + "[red]Local chain is not running. Please start the chain first." + ) + return + + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + neurons.setup_neurons(config_data) + + def run_neurons( + self, + verbose: bool = typer.Option( + False, "--verbose", "-v", help="Enable verbose output" + ), + ): + """ + Runs all neurons (miners and validators). + + This command starts the processes for all configured neurons, attaching to + running processes if they are already running. + + USAGE + + [green]$[/green] btqs neurons run + + [bold]Note[/bold]: The command will attach to running neurons or start new + ones as necessary. Press Ctrl+C to detach from a neuron and move to the next. + """ + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + + # Ensure neurons are configured + if not config_data.get("Miners") and not config_data.get("Owner"): + console.print( + "[red]No neurons found. Please run `btqs neurons setup` first." + ) + return + + neurons.run_neurons(config_data, verbose) + + def stop_neurons(self): + """ + Stops the running neurons. + + This command terminates the miner processes for the selected or all running miners. + + USAGE + + [green]$[/green] btqs neurons stop + + [bold]Note[/bold]: You can choose which miners to stop or stop all of them. + """ + if not os.path.exists(BTQS_LOCK_CONFIG_FILE_PATH): + console.print("[red]Config file not found.") + return + + config_data = load_config() + + neurons.stop_neurons(config_data) + + def start_neurons( + self, + verbose: bool = typer.Option( + False, "--verbose", "-v", help="Enable verbose output" + ), + ): + """ + Starts the stopped neurons. + + This command starts the miner processes for the selected or all stopped miners. + + USAGE + + [green]$[/green] btqs neurons start + + [bold]Note[/bold]: You can choose which stopped miners to start or start all of them. + """ + if not os.path.exists(BTQS_LOCK_CONFIG_FILE_PATH): + console.print("[red]Config file not found.") + return + + config_data = load_config() + + neurons.start_neurons(config_data, verbose) + + def reattach_neurons(self): + """ + Reattaches to a running neuron. + + This command allows you to view the logs of a running miner (neuron). + + USAGE + + [green]$[/green] btqs neurons reattach + + [bold]Note[/bold]: Press Ctrl+C to detach from the miner logs. + """ + if not os.path.exists(BTQS_LOCK_CONFIG_FILE_PATH): + console.print("[red]Config file not found.") + return + + config_data = load_config() + process_entries, cpu_usage_list, memory_usage_list = get_process_entries( + config_data + ) + display_process_status_table( + process_entries, cpu_usage_list, memory_usage_list, config_data + ) + neurons.reattach_neurons(config_data) + + def status_neurons(self): + """ + Shows the status of Subtensor and all neurons. + + This command displays the running status, CPU and memory usage of the local chain and all configured neurons. + + USAGE + + [green]$[/green] btqs neurons status + + [bold]Note[/bold]: Use this command to monitor the health and status of your local chain and miners. + """ + print_info("Checking status of Subtensor and neurons...", emoji="πŸ” ") + + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + + # Get process data + process_entries, cpu_usage_list, memory_usage_list = get_process_entries( + config_data + ) + display_process_status_table( + process_entries, cpu_usage_list, memory_usage_list, config_data + ) + + spec_table = Table( + title="[underline dark_orange]Machine Specifications[/underline dark_orange]", + show_header=False, + border_style="bright_black", + ) + spec_table.add_column(style="cyan", justify="left") + spec_table.add_column(style="white") + + version_table = Table( + title="[underline dark_orange]Version Information[/underline dark_orange]", + show_header=False, + border_style="bright_black", + ) + version_table.add_column(style="cyan", justify="left") + version_table.add_column(style="white") + + # Add specs + spec_table.add_row( + "Operating System:", f"{platform.system()} {platform.release()}" + ) + spec_table.add_row("Processor:", platform.processor()) + spec_table.add_row( + "Total RAM:", + f"{psutil.virtual_memory().total / (1024 * 1024 * 1024):.2f} GB", + ) + spec_table.add_row( + "Available RAM:", + f"{psutil.virtual_memory().available / (1024 * 1024 * 1024):.2f} GB", + ) + + # Add version + version_table.add_row("btcli version:", get_btcli_version()) + version_table.add_row( + "bittensor-wallet sdk version:", get_bittensor_wallet_version() + ) + version_table.add_row("bittensor-sdk version:", get_bittensor_version()) + + layout = Table.grid(expand=True) + layout.add_column(justify="left") + layout.add_column(justify="left") + layout.add_row(spec_table, version_table) + + console.print("\n") + console.print(layout) + return layout + + def run(self): + self.app() + + +def main(): + manager = BTQSManager() + manager.run() + + +if __name__ == "__main__": + main() diff --git a/btqs/commands/__init__.py b/btqs/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/btqs/commands/chain.py b/btqs/commands/chain.py new file mode 100644 index 00000000..304a881f --- /dev/null +++ b/btqs/commands/chain.py @@ -0,0 +1,506 @@ +import sys +import os +import subprocess +import time +from threading import Thread +from rich.console import Console +from rich.progress import ( + Progress, + SpinnerColumn, + BarColumn, + TextColumn, + TimeElapsedColumn, +) +from rich.text import Text + +import asyncio +import psutil +import yaml +from btqs.config import ( + BTQS_LOCK_CONFIG_FILE_PATH, + SUBTENSOR_REPO_URL, + LOCALNET_ENDPOINT, + SUDO_URI, +) +from bittensor_wallet import Wallet, Keypair +from btqs.utils import ( + attach_to_process_logs, + create_virtualenv, + get_process_entries, + install_subtensor_dependencies, + print_info, + print_success, + print_error, + print_info_box, + messages, + display_process_status_table, + console, +) +from git import GitCommandError, Repo +from rich.prompt import Confirm +from bittensor_cli.src.bittensor.async_substrate_interface import ( + AsyncSubstrateInterface, +) + + +def update_chain_tempos( + config_data: dict, + netuid_tempo_list: list[tuple[int, int]], + add_emission_tempo: bool = True, + emission_tempo: int = 10, +): + """ + Update tempos on the chain for given netuids and optionally update emission tempo. + + Parameters: + - config_data: Configuration data containing paths and other settings. + - netuid_tempo_list: List of tuples where each tuple contains (netuid, tempo). + - add_emission_tempo: Boolean flag to indicate if emission tempo should be updated. + - emission_tempo: The value to set for emission tempo if `add_emission_tempo` is True. + """ + # Initialize the Rich console + console = Console() + + keypair = Keypair.create_from_uri(SUDO_URI) + sudo_wallet = Wallet( + path=config_data["wallets_path"], + name="sudo", + hotkey="default", + ) + sudo_wallet.set_coldkey(keypair=keypair, encrypt=False, overwrite=True) + sudo_wallet.set_coldkeypub(keypair=keypair, encrypt=False, overwrite=True) + sudo_wallet.set_hotkey(keypair=keypair, encrypt=False, overwrite=True) + print_info_box( + "**Emissions Tempo** control the frequency at which rewards (emissions) are distributed to hotkeys. On Finney (mainnet), the setting is every 7200 blocks, which equates to approximately 1 day.", + title="Info: Emission and Subnet Tempos", + ) + total_steps = len(netuid_tempo_list) + (1 if add_emission_tempo else 0) + + desc_width = 30 + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}", justify="left"), + BarColumn(), + TextColumn("{task.completed}/{task.total}", justify="right"), + TimeElapsedColumn(), + console=console, + ) + + console.print("\n[bold yellow]Getting the chain ready...\n") + with progress: + task = progress.add_task("Initializing...", total=total_steps) + + def update_description(): + i = 0 + while not progress.finished: + message = messages[i % len(messages)] + printable_length = len(Text.from_markup(message).plain) + if printable_length < desc_width: + padded_message = message + " " * (desc_width - printable_length) + else: + padded_message = message[:desc_width] + progress.update(task, description=padded_message) + time.sleep(3) # Update every 3 seconds + i += 1 + + # Start the thread to update the description + desc_thread = Thread(target=update_description) + desc_thread.start() + local_chain = AsyncSubstrateInterface(chain_endpoint=LOCALNET_ENDPOINT) + try: + if add_emission_tempo: + local_chain = AsyncSubstrateInterface(chain_endpoint=LOCALNET_ENDPOINT) + success = asyncio.run( + sudo_set_emission_tempo(local_chain, sudo_wallet, emission_tempo) + ) + progress.advance(task) + + for netuid, tempo in netuid_tempo_list: + local_chain = AsyncSubstrateInterface(chain_endpoint=LOCALNET_ENDPOINT) + success = asyncio.run( + sudo_set_tempo(local_chain, sudo_wallet, netuid, tempo) + ) + progress.advance(task) + finally: + desc_thread.join() + + +async def sudo_set_emission_tempo( + substrate: "AsyncSubstrateInterface", wallet: Wallet, emission_tempo: int +) -> bool: + async with substrate: + emission_call = await substrate.compose_call( + call_module="AdminUtils", + call_function="sudo_set_hotkey_emission_tempo", + call_params={"emission_tempo": emission_tempo}, + ) + sudo_call = await substrate.compose_call( + call_module="Sudo", + call_function="sudo", + call_params={"call": emission_call}, + ) + extrinsic = await substrate.create_signed_extrinsic( + call=sudo_call, keypair=wallet.coldkey + ) + response = await substrate.submit_extrinsic( + extrinsic, + wait_for_inclusion=True, + wait_for_finalization=True, + ) + await response.process_events() + return await response.is_success + + +async def sudo_set_target_registrations_per_interval( + substrate: "AsyncSubstrateInterface", + wallet: Wallet, + netuid: int, + target_registrations_per_interval: int, +) -> bool: + async with substrate: + registration_call = await substrate.compose_call( + call_module="AdminUtils", + call_function="sudo_set_target_registrations_per_interval", + call_params={ + "netuid": netuid, + "target_registrations_per_interval": target_registrations_per_interval, + }, + ) + sudo_call = await substrate.compose_call( + call_module="Sudo", + call_function="sudo", + call_params={"call": registration_call}, + ) + extrinsic = await substrate.create_signed_extrinsic( + call=sudo_call, keypair=wallet.coldkey + ) + response = await substrate.submit_extrinsic( + extrinsic, + wait_for_inclusion=True, + wait_for_finalization=True, + ) + + await response.process_events() + return await response.is_success + + +async def sudo_set_tempo( + substrate: "AsyncSubstrateInterface", wallet: Wallet, netuid: int, tempo: int +) -> bool: + async with substrate: + set_tempo_call = await substrate.compose_call( + call_module="AdminUtils", + call_function="sudo_set_tempo", + call_params={"netuid": netuid, "tempo": tempo}, + ) + sudo_call = await substrate.compose_call( + call_module="Sudo", + call_function="sudo", + call_params={"call": set_tempo_call}, + ) + extrinsic = await substrate.create_signed_extrinsic( + call=sudo_call, keypair=wallet.coldkey + ) + response = await substrate.submit_extrinsic( + extrinsic, + wait_for_inclusion=True, + wait_for_finalization=True, + ) + await response.process_events() + return await response.is_success + + +def start(config_data, workspace_path, branch, fast_blocks=True, verbose=False, skip_rust=False): + os.makedirs(workspace_path, exist_ok=True) + subtensor_path = os.path.join(workspace_path, "subtensor") + + # Clone or update the repository + if os.path.exists(subtensor_path) and os.listdir(subtensor_path): + update = Confirm.ask( + "[blue]Subtensor is already cloned. Do you want to update it?", + default=False, + show_default=True, + ) + if update: + try: + repo = Repo(subtensor_path) + origin = repo.remotes.origin + repo.git.checkout(branch) + origin.pull() + print_info("Repository updated successfully.", emoji="πŸ“¦ ") + except GitCommandError as e: + print_error(f"Error updating repository: {e}") + return + else: + print_info( + "Using existing subtensor repository without updating.", emoji="πŸ“¦ " + ) + else: + try: + print_info("Cloning subtensor repository...", emoji="πŸ“¦ ") + repo = Repo.clone_from(SUBTENSOR_REPO_URL, subtensor_path) + if branch: + repo.git.checkout(branch) + print_success("Repository cloned successfully.", emoji="🏷 ") + except GitCommandError as e: + print_error(f"Error cloning repository: {e}") + return + + if fast_blocks: + print_info("Fast blocks are On", emoji="🏎️ ") + else: + print_info("Fast blocks are Off", emoji="🐌 ") + + if skip_rust: + venv_python = sys.executable + print_info("Skipping Rust installation", emoji="🦘 ") + config_data["venv_subtensor"] = "None" + venv_subtensor_path = "None" + else: + venv_subtensor_path = os.path.join(workspace_path, "venv_subtensor") + venv_python = create_virtualenv(venv_subtensor_path) + install_subtensor_dependencies(verbose) + print_info("Virtual environment created and dependencies installed.", emoji="🐍 ") + config_data["venv_subtensor"] = venv_python + + # Running localnet.sh using the virtual environment's Python + localnet_path = os.path.join(subtensor_path, "scripts", "localnet.sh") + + env_variables = os.environ.copy() + env_variables["PATH"] = ( + os.path.dirname(venv_python) + os.pathsep + env_variables["PATH"] + ) + process = subprocess.Popen( + [localnet_path, str(fast_blocks)], + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + cwd=subtensor_path, + start_new_session=True, + env=env_variables, + ) + + print_info( + "Compiling and starting local chain. This may take a few minutes... (Timeout at 20 minutes)", + emoji="πŸ› οΈ ", + ) + + # Paths to subtensor log files + log_dir = os.path.join(subtensor_path, "logs") + alice_log = os.path.join(log_dir, "alice.log") + + # Waiting for chain compilation + timeout = 3000 + start_time = time.time() + while not os.path.exists(alice_log): + if time.time() - start_time > timeout: + print_error("Timeout: Log files were not created.") + return + time.sleep(1) + + chain_ready = wait_for_chain_compilation(alice_log, start_time, timeout, verbose) + if chain_ready: + print_info( + "Local chain is running.\n", + emoji="\nπŸ”—", + ) + + # Fetch PIDs of substrate nodes + substrate_pids = get_substrate_pids() + if substrate_pids is None: + return + + config_data.update( + { + "pid": process.pid, + "substrate_pid": substrate_pids, + "subtensor_path": subtensor_path, + "workspace_path": workspace_path, + "venv_subtensor": venv_subtensor_path, + "wallets_path": os.path.join(workspace_path, "wallets"), + "subnet_path": os.path.join(workspace_path, "subnet-template"), + } + ) + + # Save config data + try: + print_info("Updating config file.\n", emoji="πŸ–‹οΈ ") + os.makedirs(os.path.dirname(BTQS_LOCK_CONFIG_FILE_PATH), exist_ok=True) + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + print_info("Config file updated.", emoji="πŸ“ ") + update_chain_tempos( + netuid_tempo_list=[(0, 9), (1, 10)], + config_data=config_data, + add_emission_tempo=True, + emission_tempo=20, + ) + print_info( + "Local chain is now set up. You can now use it for development and testing.", + emoji="\nπŸš€", + ) + console.print(f"[dark_orange]Endpoint: {LOCALNET_ENDPOINT}\n") + print_info_box( + "πŸ“Š **Subtensor Nodes**: During your local Subtensor setup, two blockchain nodes are initiated. These nodes communicate and collaborate to reach consensus, ensuring the integrity and synchronization of your blockchain network.", + title="Info: Subtensor Nodes", + ) + process_entries, cpu_usage_list, memory_usage_list = get_process_entries( + config_data + ) + display_process_status_table( + process_entries, cpu_usage_list, memory_usage_list + ) + except Exception as e: + print_error(f"Failed to write to the config file: {e}") + else: + print_error("Failed to start local chain.") + + +def stop(config_data): + pid = config_data.get("pid") + if not pid: + console.print("[red]No running chain found.") + return + + console.print("[red]Stopping the local chain...") + + try: + process = psutil.Process(pid) + process.terminate() + process.wait(timeout=10) + print_info("Local chain stopped successfully.", emoji="πŸ›‘ ") + except psutil.NoSuchProcess: + console.print( + "[red]Process not found. The chain may have already been stopped." + ) + except psutil.TimeoutExpired: + console.print("[red]Timeout while stopping the chain. Forcing stop...") + process.kill() + + # Stop running neurons + stop_running_neurons(config_data) + + # Refresh data + refresh_config = Confirm.ask( + "\n[blue]Config data is outdated. Do you want to refresh it?", + default=True, + show_default=True, + ) + if refresh_config: + if os.path.exists(BTQS_LOCK_CONFIG_FILE_PATH): + os.remove(BTQS_LOCK_CONFIG_FILE_PATH) + print_info("Configuration file refreshed.", emoji="πŸ”„ ") + + +def reattach(config_data): + pid = config_data.get("pid") + subtensor_path = config_data.get("subtensor_path") + if not pid or not subtensor_path: + console.print("[red]No running chain found.") + return + + # Check if the process is still running + if not is_process_running(pid): + return + + # Paths to the log files + log_dir = os.path.join(subtensor_path, "logs") + alice_log = os.path.join(log_dir, "alice.log") + + # Check if log file exists + if not os.path.exists(alice_log): + console.print("[red]Log files not found.") + return + + # Reattach using attach_to_process_logs + attach_to_process_logs(alice_log, "Subtensor Chain", pid) + + +def wait_for_chain_compilation(alice_log, start_time, timeout, verbose): + chain_ready = False + try: + with open(alice_log, "r") as log_file: + log_file.seek(0, os.SEEK_END) + while True: + line = log_file.readline() + if line: + if verbose: + console.print(line, end="") + if "Imported #" in line: + chain_ready = True + break + else: + if time.time() - start_time > timeout: + console.print("[red]Timeout: Chain did not compile in time.") + break + time.sleep(0.1) + except Exception as e: + console.print(f"[red]Error reading log files: {e}") + return chain_ready + + +def get_substrate_pids(): + try: + result = subprocess.run( + ["pgrep", "-f", "node-subtensor"], capture_output=True, text=True + ) + substrate_pids = [int(pid) for pid in result.stdout.strip().split()] + return substrate_pids + except ValueError: + console.print("[red]Failed to get the PID of the Subtensor process.") + return None + + +def stop_running_neurons(config_data): + """Stops any running neurons.""" + process_entries, _, _ = get_process_entries(config_data) + + # Filter running neurons + running_neurons = [ + entry + for entry in process_entries + if ( + entry["process"].startswith("Miner") + or entry["process"].startswith("Validator") + ) + and entry["status"] == "Running" + ] + + if running_neurons: + print_info("Some neurons are still running. Terminating them...", emoji="\n🧨 ") + + for neuron in running_neurons: + pid = int(neuron["pid"]) + neuron_name = neuron["process"] + try: + neuron_process = psutil.Process(pid) + neuron_process.terminate() + neuron_process.wait(timeout=10) + print_info(f"{neuron_name} stopped.", emoji="πŸ›‘ ") + except psutil.NoSuchProcess: + console.print(f"[yellow]{neuron_name} process not found.") + except psutil.TimeoutExpired: + console.print(f"[red]Timeout stopping {neuron_name}. Forcing stop.") + neuron_process.kill() + + if neuron["process"].startswith("Miner"): + wallet_name = neuron["process"].split("Miner: ")[-1] + config_data["Miners"][wallet_name]["pid"] = None + elif neuron["process"].startswith("Validator"): + config_data["Owner"]["pid"] = None + + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + else: + print_info("No neurons were running.", emoji="βœ… ") + + +def is_process_running(pid): + try: + process = psutil.Process(pid) + if not process.is_running(): + console.print("[red]Process not running. The chain may have been stopped.") + return False + return True + except psutil.NoSuchProcess: + console.print("[red]Process not found. The chain may have been stopped.") + return False diff --git a/btqs/commands/neurons.py b/btqs/commands/neurons.py new file mode 100644 index 00000000..0c74f04e --- /dev/null +++ b/btqs/commands/neurons.py @@ -0,0 +1,560 @@ +import os +import psutil +import typer +import click +import asyncio +import yaml +from bittensor_wallet import Wallet, Keypair +from git import Repo, GitCommandError + +from btqs.config import ( + BTQS_LOCK_CONFIG_FILE_PATH, + SUBNET_REPO_URL, + SUBNET_REPO_BRANCH, + MINER_URIS, + MINER_PORTS, + LOCALNET_ENDPOINT, +) +from btqs.utils import ( + console, + exec_command, + remove_ansi_escape_sequences, + get_process_entries, + display_process_status_table, + start_validator, + start_miner, + attach_to_process_logs, + subnet_owner_exists, + create_virtualenv, + install_neuron_dependencies, + print_info, + print_warning, +) +from bittensor_cli.src.bittensor.async_substrate_interface import ( + AsyncSubstrateInterface, +) + + +def setup_neurons(config_data): + subnet_owner, owner_data = subnet_owner_exists(BTQS_LOCK_CONFIG_FILE_PATH) + if not subnet_owner: + console.print( + "[red]Subnet netuid 1 registered to the owner not found. Run `btqs subnet setup` first" + ) + return + owner_wallet = Wallet( + name=owner_data.get("wallet_name"), + path=config_data["wallets_path"], + hotkey=owner_data.get("hotkey"), + ) + + config_data.setdefault("Miners", {}) + miners = config_data.get("Miners", {}) + + if miners and all( + miner_info.get("subtensor_pid") == config_data.get("pid") + for miner_info in miners.values() + ): + console.print( + "[green]Miner wallets associated with this subtensor instance already present. Proceeding..." + ) + else: + _create_miner_wallets(config_data) + + # In-case target regs are not correct + # print_info("Preparing for miner registrations. Please wait...\n", emoji="⏱️ ") + # local_chain = AsyncSubstrateInterface(chain_endpoint=LOCALNET_ENDPOINT) + # success = asyncio.run( + # sudo_set_target_registrations_per_interval(local_chain, owner_wallet, 1, 1000) + # ) + # if success: + # print_info("Proceeding with the miner registrations.\n", emoji="πŸ›£οΈ ") + # else: + # print_warning("All neurons might not be able to register at once.") + + _register_miners(config_data) + + print_info("All registrations are complete.\n", emoji="πŸ“š ") + + print_info("Viewing Metagraph for Subnet 1.\n", emoji="πŸ“Š ") + subnets_list = exec_command( + command="subnets", + sub_command="metagraph", + extra_args=[ + "--netuid", + "1", + "--chain", + LOCALNET_ENDPOINT, + ], + ) + print(subnets_list.stdout, end="") + + +def run_neurons(config_data, verbose=False): + subnet_template_path = _add_subnet_template(config_data) + + chain_pid = config_data.get("pid") + config_data["subnet_path"] = subnet_template_path + + venv_neurons_path = os.path.join(config_data["workspace_path"], "venv_neurons") + venv_python = create_virtualenv(venv_neurons_path) + install_neuron_dependencies(venv_python, subnet_template_path, verbose) + + # Handle Validator + if config_data.get("Owner"): + config_data["Owner"]["venv"] = venv_python + _run_validator( + config_data, subnet_template_path, chain_pid, venv_python, verbose + ) + + # Handle Miners + for wallet_name, wallet_info in config_data.get("Miners", {}).items(): + config_data["Miners"][wallet_name]["venv"] = venv_python + + _run_miners(config_data, subnet_template_path, chain_pid, venv_python, verbose) + + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + + +def stop_neurons(config_data): + # Get process entries + process_entries, _, _ = get_process_entries(config_data) + display_process_status_table(process_entries, [], []) + + # Filter running neurons + running_neurons = [ + entry + for entry in process_entries + if ( + entry["process"].startswith("Miner") + or entry["process"].startswith("Validator") + ) + and entry["status"] == "Running" + ] + + if not running_neurons: + console.print("[red]No running neurons to stop.") + return + + console.print("\nSelect neurons to stop:") + for idx, neuron in enumerate(running_neurons, start=1): + console.print(f"{idx}. {neuron['process']} (PID: {neuron['pid']})") + + selection = typer.prompt( + "Enter neuron numbers to stop (comma-separated), or 'all' to stop all", + default="all", + ) + + if selection.lower() == "all": + selected_neurons = running_neurons + else: + selected_indices = [ + int(i.strip()) for i in selection.split(",") if i.strip().isdigit() + ] + selected_neurons = [ + running_neurons[i - 1] + for i in selected_indices + if 1 <= i <= len(running_neurons) + ] + + if not selected_neurons: + console.print("[red]No valid neurons selected.") + return + + # Stop selected neurons + _stop_selected_neurons(config_data, selected_neurons) + + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + + +def start_neurons(config_data, verbose=False): + # Get process entries + process_entries, _, _ = get_process_entries(config_data) + display_process_status_table(process_entries, [], []) + + # Filter stopped neurons + stopped_neurons = [ + entry + for entry in process_entries + if ( + entry["process"].startswith("Miner") + or entry["process"].startswith("Validator") + ) + and entry["status"] == "Not Running" + ] + + if not stopped_neurons: + console.print("[green]All neurons are already running.") + return + + console.print("\nSelect neurons to start:") + for idx, neuron in enumerate(stopped_neurons, start=1): + console.print(f"{idx}. {neuron['process']}") + + selection = typer.prompt( + "Enter neuron numbers to start (comma-separated), or 'all' to start all", + default="all", + ) + + if selection.lower() == "all": + selected_neurons = stopped_neurons + else: + selected_indices = [ + int(i.strip()) for i in selection.split(",") if i.strip().isdigit() + ] + selected_neurons = [ + stopped_neurons[i - 1] + for i in selected_indices + if 1 <= i <= len(stopped_neurons) + ] + + if not selected_neurons: + console.print("[red]No valid neurons selected.") + return + + # Start selected neurons + _start_selected_neurons(config_data, selected_neurons, verbose) + + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + + +def reattach_neurons(config_data): + # Fetch all available neurons + all_neurons = { + **config_data.get("Miners", {}), + "Validator": config_data.get("Owner", {}), + } + + neuron_entries = [ + {"name": name, "info": info} + for name, info in all_neurons.items() + if info and psutil.pid_exists(info.get("pid", 0)) and info.get("log_file") + ] + + if not neuron_entries: + console.print("[red]No neurons found or none are running.") + return + + # Display a list of neurons for the user to choose from + console.print("\nSelect neuron to reattach to:") + for idx, neuron in enumerate(neuron_entries, start=1): + console.print(f"{idx}. {neuron['name']} (PID: {neuron['info']['pid']})") + + selection = typer.prompt( + "Enter neuron number to reattach to, or 'q' to quit", + default="1", + ) + + if selection.lower() == "q": + console.print("[yellow]Reattach aborted.") + return + + if ( + not selection.isdigit() + or int(selection) < 1 + or int(selection) > len(neuron_entries) + ): + console.print("[red]Invalid selection.") + return + + # Get the selected neuron based on user input + selected_neuron = neuron_entries[int(selection) - 1] + neuron_choice = selected_neuron["name"] + wallet_info = selected_neuron["info"] + pid = wallet_info.get("pid") + log_file_path = wallet_info.get("log_file") + + # Ensure the neuron process is running + if not pid or not psutil.pid_exists(pid): + console.print("[red]Neuron process not running.") + return + + if not log_file_path or not os.path.exists(log_file_path): + console.print("[red]Log file not found for this neuron.") + return + + console.print(f"[green]Reattaching to neuron {neuron_choice}.") + + # Attach to the process logs + attach_to_process_logs(log_file_path, neuron_choice, pid) + + +# Helper functions + + +def _create_miner_wallets(config_data): + max_miners = len(MINER_URIS) + + total_miners = typer.prompt( + f"How many miners do you want to run? (Choose between 1 and {max_miners})", + type=click.IntRange(1, max_miners), + default=3, + show_default=True, + ) + + # Ensure that the total number of miners doesn't exceed the available URIs + if total_miners > max_miners: + total_miners = max_miners + console.print(f"Limiting the number of miners to {max_miners}.") + + for i, uri in enumerate(MINER_URIS[:total_miners]): + console.print(f"Miner {i+1}:") + wallet_name = typer.prompt( + f"Enter wallet name for miner {i+1}", default=f"{uri.strip('//')}" + ) + hotkey_name = typer.prompt( + f"Enter hotkey name for miner {i+1}", default="default" + ) + + keypair = Keypair.create_from_uri(uri) + wallet = Wallet( + path=config_data["wallets_path"], name=wallet_name, hotkey=hotkey_name + ) + wallet.set_coldkey(keypair=keypair, encrypt=False, overwrite=True) + wallet.set_coldkeypub(keypair=keypair, encrypt=False, overwrite=True) + wallet.set_hotkey(keypair=keypair, encrypt=False, overwrite=True) + + config_data["Miners"][wallet_name] = { + "hotkey": hotkey_name, + "uri": uri, + "pid": None, + "subtensor_pid": config_data["pid"], + "port": MINER_PORTS[i], + } + + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + + print_info("Miner wallets are created.\n", emoji="\nπŸ—‚οΈ ") + + +def _register_miners(config_data): + for wallet_name, wallet_info in config_data["Miners"].items(): + wallet = Wallet( + path=config_data["wallets_path"], + name=wallet_name, + hotkey=wallet_info["hotkey"], + ) + print_info(f"Registering Miner ({wallet_name}) to Netuid 1\n", emoji="βš’οΈ ") + + miner_registered = exec_command( + command="subnets", + sub_command="register", + extra_args=[ + "--wallet-path", + wallet.path, + "--wallet-name", + wallet.name, + "--hotkey", + wallet.hotkey_str, + "--netuid", + "1", + "--chain", + LOCALNET_ENDPOINT, + "--no-prompt", + ], + ) + clean_stdout = remove_ansi_escape_sequences(miner_registered.stdout) + + if "βœ… Registered" in clean_stdout: + print_info(f"Registered miner: ({wallet.name}) to Netuid 1\n", emoji="βœ… ") + else: + console.print( + f"[red]Failed to register miner ({wallet.name}). You can register the miner manually using:" + ) + command = ( + f"btcli subnets register --wallet-path {wallet.path} --wallet-name " + f"{wallet.name} --hotkey {wallet.hotkey_str} --netuid 1 --chain " + f"{LOCALNET_ENDPOINT} --no-prompt" + ) + console.print(f"[bold yellow]{command}\n") + + +def _add_subnet_template(config_data): + workspace_path = config_data.get("workspace_path") + if not workspace_path: + console.print("[red]Base path not found in the configuration file.") + return + + subnet_template_path = config_data["subnet_path"] + if not os.path.exists(subnet_template_path): + console.print("[green]Cloning subnet-template repository...") + try: + repo = Repo.clone_from( + SUBNET_REPO_URL, + subnet_template_path, + ) + repo.git.checkout(SUBNET_REPO_BRANCH) + print_info("Cloned subnet-template repository successfully.", emoji="πŸ“¦ ") + except GitCommandError as e: + console.print(f"[red]Error cloning subnet-template repository: {e}") + else: + print_info("Using existing subnet-template repository.", emoji="πŸ“¦ ") + repo = Repo(subnet_template_path) + current_branch = repo.active_branch.name + if current_branch != SUBNET_REPO_BRANCH: + try: + repo.git.checkout(SUBNET_REPO_BRANCH) + except GitCommandError as e: + console.print( + f"[red]Error switching to branch '{SUBNET_REPO_BRANCH}': {e}" + ) + + return subnet_template_path + + +def _run_validator( + config_data, subnet_template_path, chain_pid, venv_python, verbose=False +): + owner_info = config_data["Owner"] + validator_pid = owner_info.get("pid") + validator_subtensor_pid = owner_info.get("subtensor_pid") + if ( + validator_pid + and psutil.pid_exists(validator_pid) + and validator_subtensor_pid == chain_pid + ): + console.print( + "[green]Validator is already running. Attaching to the process..." + ) + log_file_path = owner_info.get("log_file") + if log_file_path and os.path.exists(log_file_path): + attach_to_process_logs(log_file_path, "Validator", validator_pid) + else: + console.print("[red]Log file not found for validator. Cannot attach.") + else: + # Validator is not running, start it + success = start_validator( + owner_info, subnet_template_path, config_data, venv_python, verbose + ) + if not success: + console.print("[red]Failed to start validator.") + + +def _run_miners( + config_data, subnet_template_path, chain_pid, venv_python, verbose=False +): + for wallet_name, wallet_info in config_data.get("Miners", {}).items(): + miner_pid = wallet_info.get("pid") + miner_subtensor_pid = wallet_info.get("subtensor_pid") + # Check if miner process is running and associated with the current chain + if ( + miner_pid + and psutil.pid_exists(miner_pid) + and miner_subtensor_pid == chain_pid + ): + console.print( + f"[green]Miner {wallet_name} is already running. Attaching to the process..." + ) + log_file_path = wallet_info.get("log_file") + if log_file_path and os.path.exists(log_file_path): + attach_to_process_logs(log_file_path, f"Miner {wallet_name}", miner_pid) + else: + console.print( + f"[red]Log file not found for miner {wallet_name}. Cannot attach." + ) + else: + # Miner is not running, start it + success = start_miner( + wallet_name, + wallet_info, + subnet_template_path, + config_data, + venv_python, + verbose, + ) + if not success: + console.print(f"[red]Failed to start miner {wallet_name}.") + + +def _stop_selected_neurons(config_data, selected_neurons): + for neuron in selected_neurons: + pid = int(neuron["pid"]) + neuron_name = neuron["process"] + try: + process = psutil.Process(pid) + process.terminate() + process.wait(timeout=10) + console.print(f"[green]{neuron_name} stopped.") + except psutil.NoSuchProcess: + console.print(f"[yellow]{neuron_name} process not found.") + except psutil.TimeoutExpired: + console.print(f"[red]Timeout stopping {neuron_name}. Forcing stop.") + process.kill() + + if neuron["process"].startswith("Miner"): + wallet_name = neuron["process"].split("Miner: ")[-1] + config_data["Miners"][wallet_name]["pid"] = None + elif neuron["process"].startswith("Validator"): + config_data["Owner"]["pid"] = None + + +def _start_selected_neurons(config_data, selected_neurons, verbose): + subnet_template_path = _add_subnet_template(config_data) + + for neuron in selected_neurons: + neuron_name = neuron["process"] + if neuron_name.startswith("Validator"): + success = start_validator( + config_data["Owner"], + subnet_template_path, + config_data, + config_data["Owner"]["venv"], + verbose, + ) + elif neuron_name.startswith("Miner"): + wallet_name = neuron_name.split("Miner: ")[-1] + wallet_info = config_data["Miners"][wallet_name] + success = start_miner( + wallet_name, + wallet_info, + subnet_template_path, + config_data, + config_data["Miners"][wallet_name]["venv"], + verbose, + ) + + if success: + console.print(f"[green]{neuron_name} started successfully.") + else: + console.print(f"[red]Failed to start {neuron_name}.") + + # Update the process entries after starting neurons + process_entries, _, _ = get_process_entries(config_data) + display_process_status_table(process_entries, [], []) + + +async def sudo_set_target_registrations_per_interval( + substrate: "AsyncSubstrateInterface", + wallet: Wallet, + netuid: int, + target_registrations_per_interval: int, +) -> bool: + async with substrate: + registration_call = await substrate.compose_call( + call_module="AdminUtils", + call_function="sudo_set_target_registrations_per_interval", + call_params={ + "netuid": netuid, + "target_registrations_per_interval": target_registrations_per_interval, + }, + ) + sudo_call = await substrate.compose_call( + call_module="Sudo", + call_function="sudo", + call_params={"call": registration_call}, + ) + extrinsic = await substrate.create_signed_extrinsic( + call=sudo_call, keypair=wallet.coldkey + ) + response = await substrate.submit_extrinsic( + extrinsic, + wait_for_inclusion=True, + wait_for_finalization=True, + ) + + await response.process_events() + return await response.is_success diff --git a/btqs/commands/subnet.py b/btqs/commands/subnet.py new file mode 100644 index 00000000..9de3ca86 --- /dev/null +++ b/btqs/commands/subnet.py @@ -0,0 +1,437 @@ +import os +import time +from tqdm import tqdm +import typer +import yaml +from rich.text import Text +from rich.table import Table +from bittensor_wallet import Wallet, Keypair +from btqs.config import BTQS_LOCK_CONFIG_FILE_PATH, VALIDATOR_URI, LOCALNET_ENDPOINT +from btqs.utils import ( + console, + exec_command, + remove_ansi_escape_sequences, + subnet_exists, + subnet_owner_exists, + get_process_entries, + display_process_status_table, + load_config, + print_info, + print_error, + print_info_box, +) + + +def add_stake(config_data): + print_info_box( + "πŸ’° You stake **TAO** to your hotkey to become a validator in a subnet. Your hotkey can potentially earn rewards based on your validating performance.", + title="Info: Staking to become a Validator", + ) + print("\n") + + subnet_owner, owner_data = subnet_owner_exists(BTQS_LOCK_CONFIG_FILE_PATH) + if subnet_owner: + owner_wallet = Wallet( + name=owner_data.get("wallet_name"), + path=config_data["wallets_path"], + hotkey=owner_data.get("hotkey"), + ) + print_info( + f"Validator is adding stake to its own hotkey\n{owner_wallet}\n", + emoji="πŸ”– ", + ) + + add_stake = exec_command( + command="stake", + sub_command="add", + extra_args=[ + "--amount", + 1000, + "--wallet-path", + config_data["wallets_path"], + "--chain", + LOCALNET_ENDPOINT, + "--wallet-name", + owner_wallet.name, + "--no-prompt", + "--wallet-hotkey", + owner_wallet.hotkey_str, + ], + ) + + clean_stdout = remove_ansi_escape_sequences(add_stake.stdout) + if "βœ… Finalized" in clean_stdout: + print_info("Stake added by Validator", emoji="πŸ“ˆ ") + print_info_box( + "Metagraph contains important information on a subnet. Displaying a metagraph is a useful way to see a snapshot of a subnet.", + title="Info: Metagraph", + ) + print_info("Viewing Metagraph for Subnet 1", emoji="\nπŸ”Ž ") + subnets_list = exec_command( + command="subnets", + sub_command="metagraph", + extra_args=[ + "--netuid", + "1", + "--chain", + LOCALNET_ENDPOINT, + ], + ) + print(subnets_list.stdout, end="") + + else: + print_error("\nFailed to add stake. Command output:\n") + print(add_stake.stdout, end="") + + print("\n") + print_info_box( + "The validator's hotkey must be registered in the root network (netuid 0) to set root weights.", + title="Root network registration", + ) + + print_info( + f"Validator is registering to root network (netuid 0) ({owner_wallet})\n", + emoji="\n🫚 ", + ) + + register_root = exec_command( + command="root", + sub_command="register", + extra_args=[ + "--wallet-path", + config_data["wallets_path"], + "--chain", + LOCALNET_ENDPOINT, + "--wallet-name", + owner_wallet.name, + "--no-prompt", + "--wallet-hotkey", + owner_wallet.hotkey_str, + ], + ) + clean_stdout = remove_ansi_escape_sequences(register_root.stdout) + if "βœ… Registered" in clean_stdout: + print_info("Successfully registered to the root network\n", emoji="βœ… ") + elif "βœ… Already registered on root network" in clean_stdout: + print_info("Validator is already registered to Root network\n", emoji="βœ… ") + else: + print_error("\nFailed to register to root. Command output:\n") + print(register_root.stdout, end="") + + print_info("Viewing Root list\n", emoji="πŸ”Ž ") + subnets_list = exec_command( + command="root", + sub_command="list", + extra_args=[ + "--chain", + LOCALNET_ENDPOINT, + ], + ) + print(subnets_list.stdout, end="") + + else: + print_error( + "Subnet netuid 1 registered to the owner not found. Run `btqs subnet setup` first" + ) + return + + +def add_weights(config_data): + print_info_box( + "πŸ‹οΈ Setting **Root weights** in Bittensor means assigning relative importance to different subnets within the network, which directly influences their share of network rewards and resources.", + title="Info: Setting Root weights", + ) + subnet_owner, owner_data = subnet_owner_exists(BTQS_LOCK_CONFIG_FILE_PATH) + if subnet_owner: + owner_wallet = Wallet( + name=owner_data.get("wallet_name"), + path=config_data["wallets_path"], + hotkey=owner_data.get("hotkey"), + ) + print_info( + "Validator is now setting weights of subnet 1 on the root network.\n Please wait... (Timeout: ~ 120 seconds)", + emoji="πŸ‹οΈ ", + ) + max_retries = 60 + attempt = 0 + retry_patterns = [ + "ancient birth block", + "Transaction has a bad signature", + "SettingWeightsTooFast", + ] + + while attempt < max_retries: + try: + set_weights = exec_command( + command="root", + sub_command="set-weights", + extra_args=[ + "--wallet-path", + config_data["wallets_path"], + "--chain", + LOCALNET_ENDPOINT, + "--wallet-name", + owner_wallet.name, + "--no-prompt", + "--wallet-hotkey", + owner_wallet.hotkey_str, + "--netuid", + 1, + "--weights", + 1, + ], + internal_command=True, + ) + clean_stdout = remove_ansi_escape_sequences(set_weights.stdout) + + if "βœ… Finalized" in clean_stdout: + text = Text( + "Successfully set weights to Netuid 1\n", + style="bold light_goldenrod2", + ) + sign = Text("🌟 ", style="bold yellow") + console.print(sign, text) + break + + elif any(pattern in clean_stdout for pattern in retry_patterns): + attempt += 1 + if attempt < max_retries: + time.sleep(1) + else: + console.print( + "\n[red]Failed to set weights after multiple attempts. Please try again later\n" + ) + else: + console.print("\n[red]Failed to set weights. Command output:\n") + print(set_weights.stdout, end="") + break + + except KeyboardInterrupt: + console.print("\n[yellow]Process interrupted by user. Exiting...") + return + + except Exception as e: + console.print(f"[red]An unexpected error occurred: {e}") + break + + else: + console.print( + "[red]All retry attempts exhausted. Unable to set weights on the root network." + ) + else: + console.print( + "[red]Subnet netuid 1 registered to the owner not found. Run `btqs subnet setup` first" + ) + return + + +def display_live_metagraph(config_data): + """ + Displays a live view of the metagraph. + """ + + def clear_screen(): + os.system("cls" if os.name == "nt" else "clear") + + def get_metagraph(): + result = exec_command( + command="subnets", + sub_command="metagraph", + extra_args=[ + "--netuid", + "1", + "--chain", + "ws://127.0.0.1:9945", + ], + internal_command=True, + ) + return result.stdout + + print("Starting live metagraph view. Press 'Ctrl + C' to exit.") + + try: + while True: + metagraph = get_metagraph() + process_entries, cpu_usage_list, memory_usage_list = get_process_entries( + config_data + ) + clear_screen() + print(metagraph) + display_process_status_table( + process_entries, cpu_usage_list, memory_usage_list + ) + + # Create a progress bar for 5 seconds + print("\n") + console.print("[green] Live view active: Press Ctrl + C to exit\n") + for _ in tqdm( + range(5), + desc="Refreshing", + bar_format="{desc}: {bar}", + ascii=" β––β–˜β–β–—β–šβ–ž", + ncols=20, + colour="green", + ): + time.sleep(1) + + except KeyboardInterrupt: + print("Exiting live view...") + + +def setup_subnet(config_data): + os.makedirs(config_data["wallets_path"], exist_ok=True) + subnet_owner, owner_data = subnet_owner_exists(BTQS_LOCK_CONFIG_FILE_PATH) + if subnet_owner: + owner_wallet = Wallet( + name=owner_data.get("wallet_name"), + path=config_data["wallets_path"], + hotkey=owner_data.get("hotkey"), + ) + input() + + warning_text = Text( + "A Subnet Owner associated with this setup already exists", + style="bold light_goldenrod2", + ) + wallet_info = Text(f"\t{owner_wallet}\n", style="medium_purple") + warning_sign = Text("⚠️ ", style="bold yellow") + + console.print(warning_sign, warning_text) + console.print(wallet_info) + else: + create_subnet_owner_wallet(config_data) + config_data = load_config( + "A running Subtensor not found. Please run `btqs chain start` first." + ) + + owner_data = config_data["Owner"] + owner_wallet = Wallet( + name=owner_data.get("wallet_name"), + path=config_data["wallets_path"], + hotkey=owner_data.get("hotkey"), + ) + + if subnet_exists(owner_wallet.coldkeypub.ss58_address, 1): + warning_text = Text( + "A Subnet with netuid 1 already exists and is registered with the owner's wallet:", + style="bold light_goldenrod2", + ) + wallet_info = Text(f"\t{owner_wallet}", style="medium_purple") + sudo_info = Text( + f"\tSUDO (Coldkey of Subnet 1 owner): {owner_wallet.coldkeypub.ss58_address}", + style="medium_purple", + ) + warning_sign = Text("⚠️ ", style="bold yellow") + + console.print(warning_sign, warning_text) + console.print(wallet_info) + console.print(sudo_info) + else: + create_subnet(owner_wallet, config_data) + + print_info("Listing all subnets\n", emoji="\nπŸ“‹ ") + print_info_box( + "In the below table, the netuid 0 shown is root network that is automatically created when the local blockchain starts", + title="Info: Root network (netuid 0)", + ) + print("\n") + subnets_list = exec_command( + command="subnets", + sub_command="list", + extra_args=[ + "--chain", + "ws://127.0.0.1:9945", + ], + ) + print(subnets_list.stdout, end="") + + +def create_subnet_owner_wallet(config_data): + print_info( + "Creating a wallet (coldkey) and a hotkey to create a subnet.\n", emoji="πŸ—‚οΈ " + ) + + owner_wallet_name = typer.prompt( + "Enter subnet owner wallet name", default="Alice", show_default=True + ) + owner_hotkey_name = typer.prompt( + "Enter subnet owner hotkey name", default="default", show_default=True + ) + + keypair = Keypair.create_from_uri(VALIDATOR_URI) + owner_wallet = Wallet( + path=config_data["wallets_path"], + name=owner_wallet_name, + hotkey=owner_hotkey_name, + ) + owner_wallet.set_coldkey(keypair=keypair, encrypt=False, overwrite=True) + owner_wallet.set_coldkeypub(keypair=keypair, encrypt=False, overwrite=True) + owner_wallet.set_hotkey(keypair=keypair, encrypt=False, overwrite=True) + + console.print( + "Executed command: [dark_orange] btcli wallet create --wallet-name", + f"[dark_orange]{owner_hotkey_name} --wallet-hotkey {owner_wallet_name} --wallet-path {config_data['wallets_path']}", + ) + + config_data["Owner"] = { + "wallet_name": owner_wallet_name, + "hotkey": owner_hotkey_name, + "subtensor_pid": config_data["pid"], + } + with open(BTQS_LOCK_CONFIG_FILE_PATH, "w") as config_file: + yaml.safe_dump(config_data, config_file) + + +def create_subnet(owner_wallet, config_data): + print_info("Creating a subnet.\n", emoji="\n🌎 ") + + create_subnet = exec_command( + command="subnets", + sub_command="create", + extra_args=[ + "--wallet-path", + config_data["wallets_path"], + "--chain", + "ws://127.0.0.1:9945", + "--wallet-name", + owner_wallet.name, + "--no-prompt", + "--wallet-hotkey", + owner_wallet.hotkey_str, + ], + ) + clean_stdout = remove_ansi_escape_sequences(create_subnet.stdout) + if "βœ… Registered subnetwork with netuid: 1" in clean_stdout: + print_info("Subnet created successfully with netuid 1\n", emoji="πŸ₯‡ ") + + print_info_box( + "πŸ”‘ Creating a subnet involves signing with your *coldkey*, which is the permanent keypair associated with your account. " + "It is typically used for long-term ownership and higher-value operations.\n\n" + "πŸ”₯ When registering to a subnet, your *hotkey* is used. The hotkey is a more frequently used keypair " + "designed for day-to-day interactions with the network." + ) + + print_info( + f"Registering the subnet owner to Netuid 1\n{owner_wallet}\n", emoji="\nπŸ“ " + ) + + register_subnet = exec_command( + command="subnets", + sub_command="register", + extra_args=[ + "--wallet-path", + config_data["wallets_path"], + "--wallet-name", + owner_wallet.name, + "--wallet-hotkey", + owner_wallet.hotkey_str, + "--netuid", + "1", + "--chain", + "ws://127.0.0.1:9945", + "--no-prompt", + ], + ) + clean_stdout = remove_ansi_escape_sequences(register_subnet.stdout) + if "βœ… Registered" in clean_stdout: + print_info("Registered the owner's hotkey to subnet 1", emoji="βœ… ") diff --git a/btqs/config.py b/btqs/config.py new file mode 100644 index 00000000..aad2abd4 --- /dev/null +++ b/btqs/config.py @@ -0,0 +1,70 @@ +import os + +BTQS_LOCK_CONFIG_FILE_PATH = os.path.expanduser("~/.bittensor/btqs/btqs-lock.yml") +DEFAULT_WORKSPACE_DIRECTORY = os.path.expanduser("~/Desktop/bittensor_quick_start") + +SUBNET_REPO_URL = "https://github.com/opentensor/bittensor-subnet-template.git" +SUBNET_REPO_BRANCH = "ench/abe/commented-info" + +# You can add commands with args. Eg: "./neurons/miner.py --model="openai" --safe-mode" +DEFAULT_MINER_COMMAND = "./neurons/miner.py" +DEFAULT_VALIDATOR_COMMAND = "./neurons/validator.py" + +SUBTENSOR_REPO_URL = "https://github.com/opentensor/subtensor.git" +SUBTENSOR_BRANCH = "junius/feat-localnet-improve" +RUST_INSTALLATION_VERSION = "nightly-2024-03-05" +RUST_CHECK_VERSION = "rustc 1.78.0-nightly" +RUST_TARGETS = [ + ["rustup", "target", "add", "wasm32-unknown-unknown", "--toolchain", "stable"], + ["rustup", "component", "add", "rust-src", "--toolchain", "stable"], +] +SUBTENSOR_MACOS_DEPS = ["protobuf"] +SUBTENSOR_LINUX_DEPS = [ + "clang", + "curl", + "libssl-dev", + "llvm", + "libudev-dev", + "protobuf-compiler", +] + +MINER_URIS = [ + "//Bob", + "//Charlie", + "//Dave", + "//Eve", + "//Ferdie", + "//Grace", + "//Tom", + "//Ivy", + "//Judy", + "//Jerry", + "//Harry", + "//Oscar", + "//Trent", + "//Victor", + "//Wendy", +] +VALIDATOR_URI = "//Alice" +SUDO_URI = "//Alice" +MINER_PORTS = [ + 8101, + 8102, + 8103, + 8104, + 8105, + 8106, + 8107, + 8108, + 8109, + 8110, + 8111, + 8112, + 8113, + 8114, + 8115, +] +VALIDATOR_PORT = 8100 +LOCALNET_ENDPOINT = "ws://127.0.0.1:9945" + +EPILOG = "Made with [bold red]:heart:[/bold red] by The OpenΟ„ensor FoundaΟ„ion" diff --git a/btqs/utils.py b/btqs/utils.py new file mode 100644 index 00000000..71f5b8cc --- /dev/null +++ b/btqs/utils.py @@ -0,0 +1,1065 @@ +import os +import re +import sys +import subprocess +import time +import platform +import shlex +from datetime import datetime +from typing import Any, Dict, Optional, Tuple + +import psutil +import typer +import yaml +from bittensor_cli.cli import CLIManager +from bittensor_wallet import Wallet +from rich.console import Console +from rich.table import Table +from rich.text import Text +from rich.markdown import Markdown +from rich.panel import Panel +from rich.align import Align +from typer.testing import CliRunner + +from .config import ( + BTQS_LOCK_CONFIG_FILE_PATH, + VALIDATOR_PORT, + LOCALNET_ENDPOINT, + DEFAULT_MINER_COMMAND, + DEFAULT_VALIDATOR_COMMAND, + RUST_CHECK_VERSION, + RUST_INSTALLATION_VERSION, + RUST_TARGETS, + SUBTENSOR_MACOS_DEPS, + SUBTENSOR_LINUX_DEPS, +) + +console = Console() + +messages = [ + "πŸ”§ [cyan]Polishing the blocks[/cyan]", + "πŸ›’οΈ [yellow]Oiling up the chain[/yellow]", + "βš™οΈ [green]Calibrating gears[/green]", + "πŸ”Œ [magenta]Plugging in nodes[/magenta]", + "🧰 [blue]Gathering tools[/blue]", + "πŸ“° [cyan]Mapping the network[/cyan]", + "πŸ”­ [yellow]Aligning dependencies[/yellow]", + "πŸ§ͺ [green]Mixing tempos[/green]", + "πŸ“‘ [magenta]Tuning frequencies[/magenta]", + "πŸš€ [red]Preparing for launch[/red]", + "πŸ™Œ [dark_orange]Bittensor on 3! 1, 2, 3!", +] + + +def print_info(message: str, emoji: str = "", style: str = "yellow"): + styled_text = Text(f"{emoji} {message}", style=style) + console.print(styled_text) + + +def print_success(message: str, emoji: str = "βœ…"): + styled_text = Text(f"{emoji} {message}", style="bold green") + console.print(styled_text) + + +def print_warning(message: str, emoji: str = "⚠️"): + styled_text = Text(f"{emoji} {message}", style="bold yellow") + console.print(styled_text) + + +def print_error(message: str, emoji: str = "❌"): + styled_text = Text(f"{emoji} {message}", style="bold red") + console.print(styled_text) + + +def print_step(title: str, description: str, step_number: int): + panel = Panel.fit( + Align.left(f"[bold]{step_number}. {title}[/bold]\n{description}"), + title=f"[dark_orange]Step {step_number}", + border_style="bright_black", + padding=(1, 2), + ) + console.print(panel) + + +def print_info_box(message: str, title: str = "Info", style: str = ""): + markdown_message = Markdown(message, style=style) + panel = Panel( + Align.left(markdown_message), + title=Text(title, style="bold yellow"), + border_style="bold yellow", + expand=False, + padding=(1, 1), + width=console.width - 20, + ) + console.print(panel) + + +def load_config( + error_message: Optional[str] = None, exit_if_missing: bool = True +) -> dict[str, Any]: + """ + Loads the configuration file. + + Args: + error_message (Optional[str]): Custom error message to display if config file is not found. + exit_if_missing (bool): If True, exits the program if the config file is missing. + + Returns: + Dict[str, Any]: Configuration data. + + Raises: + typer.Exit: If the config file is not found and exit_if_missing is True. + """ + if not os.path.exists(BTQS_LOCK_CONFIG_FILE_PATH): + if exit_if_missing: + if error_message: + print_error(f"{error_message}") + else: + print_error("Configuration file not found.") + raise typer.Exit() + else: + return {} + with open(BTQS_LOCK_CONFIG_FILE_PATH, "r") as config_file: + config_data = yaml.safe_load(config_file) or {} + return config_data + + +def is_package_installed(package_name: str) -> bool: + """ + Checks if a system package is installed. + + Args: + package_name (str): The name of the package to check. + + Returns: + bool: True if installed, False otherwise. + """ + try: + if platform.system() == "Linux": + result = subprocess.run( + ["dpkg", "-l", package_name], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + return package_name in result.stdout + elif platform.system() == "Darwin": + result = subprocess.run( + ["brew", "list", package_name], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + return package_name in result.stdout + else: + print_error(f"Unsupported operating system: {platform.system()}") + return False + except Exception as e: + print_error(f"Error checking package {package_name}: {e}") + return False + + +def is_rust_installed(required_version: str) -> bool: + """ + Checks if Rust is installed and matches the required version. + + Args: + required_version (str): The required Rust version. + + Returns: + bool: True if Rust is installed and matches the required version, False otherwise. + """ + try: + result = subprocess.run( + ["rustc", "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + installed_version = result.stdout.strip().split()[1] + return installed_version == required_version + except Exception: + return False + + +def create_virtualenv(venv_path: str) -> str: + """ + Creates a virtual environment at the specified path. + + Args: + venv_path (str): The path where the virtual environment should be created. + + Returns: + str: The path to the Python executable within the virtual environment. + """ + if not os.path.exists(venv_path): + print_info(f"Creating virtual environment at {venv_path}...") + subprocess.run([sys.executable, "-m", "venv", venv_path], check=True) + print_success("Virtual environment created.") + + activate_command = f"source {os.path.join(venv_path, 'bin', 'activate')}" + console.print( + f"[yellow]To activate the virtual environment manually, run:\n[bold cyan]{activate_command}\n" + ) + else: + print_info(f"Using existing virtual environment at {venv_path}.", emoji="πŸ–₯️ ") + + # Get the path to the Python executable in the virtual environment + venv_python = os.path.join(venv_path, "bin", "python") + return venv_python + + +def install_subtensor_dependencies(verbose) -> None: + """ + Installs subtensor dependencies, including system-level dependencies and Rust. + """ + print_info("Installing subtensor system dependencies...", emoji="βš™οΈ ") + + stdout = None if verbose else subprocess.DEVNULL + + # Install required system dependencies + missing_packages = [] + if platform.system() == "Linux": + for package in SUBTENSOR_LINUX_DEPS: + if not is_package_installed(package): + missing_packages.append(package) + + if missing_packages: + console.print( + f"[yellow]Installing missing system packages: {', '.join(missing_packages)}" + ) + subprocess.run( + ["sudo", "apt-get", "update"], + check=True, + stdout=stdout, + stderr=stdout, + ) + subprocess.run( + ["sudo", "apt-get", "install", "-y"] + missing_packages, + check=True, + stdout=stdout, + stderr=stdout, + ) + else: + print_info("All required system packages are already installed.") + + elif platform.system() == "Darwin": + missing_packages = [] + for package in SUBTENSOR_MACOS_DEPS: + if not is_package_installed(package): + missing_packages.append(package) + + if missing_packages: + console.print( + f"[yellow]Installing missing macOS system packages: {', '.join(missing_packages)}" + ) + subprocess.run( + ["brew", "update"], + check=True, + stdout=stdout, + stderr=stdout, + ) + subprocess.run( + ["brew", "install"] + missing_packages, + check=True, + stdout=stdout, + stderr=stdout, + ) + else: + print_info( + "All required macOS system packages are already installed.", emoji="πŸ””" + ) + + else: + print_error( + "[Unsupported operating system for automatic system dependency installation." + ) + return + + # Install Rust + print_info("Checking Rust installation...", emoji="πŸ” ") + + if not is_rust_installed(RUST_CHECK_VERSION): + print_info(f"Installing Rust {RUST_INSTALLATION_VERSION}...", emoji="πŸ§ͺ ") + subprocess.run( + [ + "curl", + "--proto", + "=https", + "--tlsv1.2", + "-sSf", + "https://sh.rustup.rs", + "-o", + "rustup.sh", + ], + check=True, + stdout=stdout, + stderr=stdout, + ) + subprocess.run( + ["sh", "rustup.sh", "-y", "--default-toolchain", RUST_INSTALLATION_VERSION], + check=True, + stdout=stdout, + stderr=stdout, + ) + else: + console.print( + f"[green]Required Rust version {RUST_CHECK_VERSION} is already installed." + ) + + # Add necessary Rust targets + print_info("Configuring Rust toolchain...", emoji="πŸ› οΈ ") + for target in RUST_TARGETS: + subprocess.run( + target, + check=True, + stdout=stdout, + stderr=stdout, + ) + + print_info("Subtensor dependencies installed.\n", emoji="🧰 ") + + +def install_neuron_dependencies(venv_python: str, cwd: str, verbose: bool) -> None: + """ + Installs neuron dependencies into the virtual environment. + + Args: + venv_python (str): Path to the Python executable in the virtual environment. + cwd (str): Current working directory where the setup should run. + """ + stdout = None if verbose else subprocess.DEVNULL + print_info("Installing neuron dependencies...", emoji="βš™οΈ ", style="bold green") + subprocess.run( + [venv_python, "-m", "pip", "install", "--upgrade", "pip"], + cwd=cwd, + check=True, + stdout=stdout, + stderr=stdout, + ) + subprocess.run( + [venv_python, "-m", "pip", "install", "-e", "."], + cwd=cwd, + check=True, + stdout=stdout, + stderr=stdout, + ) + print_info("Neuron dependencies installed.", emoji="πŸ–₯️ ", style="bold green") + + +def remove_ansi_escape_sequences(text: str) -> str: + """ + Removes ANSI escape sequences from the given text. + + Args: + text (str): The text from which to remove ANSI sequences. + + Returns: + str: The cleaned text. + """ + ansi_escape = re.compile( + r""" + \x1B # ESC character + (?: # Non-capturing group + [@-Z\\-_] # 7-bit C1 control codes + | # or + \[ # ESC[ + [0-?]* # Parameter bytes + [ -/]* # Intermediate bytes + [@-~] # Final byte + ) + """, + re.VERBOSE, + ) + return ansi_escape.sub("", text) + + +def exec_command( + command: str, + sub_command: str, + extra_args: Optional[list[str]] = None, + inputs: Optional[list[str]] = None, + internal_command: bool = False, +) -> typer.testing.Result: + """ + Executes a command using the CLIManager and returns the result. + + Args: + command (str): The main command to execute. + sub_command (str): The sub-command to execute. + extra_args (List[str], optional): Additional arguments for the command. + inputs (List[str], optional): Inputs for interactive prompts. + + Returns: + typer.testing.Result: The result of the command execution. + """ + extra_args = extra_args or [] + cli_manager = CLIManager() + runner = CliRunner() + + args = [command, sub_command] + extra_args + command_for_printing = ["btcli"] + [str(arg) for arg in args] + + if not internal_command: + console.print( + f"Executing command: [dark_orange]{' '.join(command_for_printing)}\n" + ) + + input_text = "\n".join(inputs) + "\n" if inputs else None + result = runner.invoke( + cli_manager.app, + args, + input=input_text, + env={"COLUMNS": "700"}, + catch_exceptions=False, + color=False, + ) + return result + + +def is_chain_running(config_file_path: str = BTQS_LOCK_CONFIG_FILE_PATH) -> bool: + """ + Checks if the local chain is running by verifying the PID in the config file. + + Args: + config_file_path (str): Path to the configuration file. + + Returns: + bool: True if the chain is running, False otherwise. + """ + if not os.path.exists(config_file_path): + return False + with open(config_file_path, "r") as config_file: + config_data = yaml.safe_load(config_file) or {} + pid = config_data.get("pid") + if not pid: + return False + try: + process = psutil.Process(pid) + return process.is_running() + except psutil.NoSuchProcess: + return False + + +def subnet_owner_exists(config_file_path: str) -> Tuple[bool, dict]: + """ + Checks if a subnet owner exists in the config file. + + Args: + config_file_path (str): Path to the configuration file. + + Returns: + Tuple[bool, dict]: (True, owner data) if exists, else (False, {}). + """ + if not os.path.exists(config_file_path): + return False, {} + with open(config_file_path, "r") as config_file: + config_data = yaml.safe_load(config_file) or {} + + owner_data = config_data.get("Owner") + pid = config_data.get("pid") + + if owner_data and pid: + if owner_data.get("subtensor_pid") == pid: + return True, owner_data + + return False, {} + + +def subnet_exists(ss58_address: str, netuid: int) -> bool: + """ + Checks if a subnet exists by verifying the subnet list output. + """ + subnets_list = exec_command( + command="subnets", + sub_command="list", + extra_args=[ + "--chain", + LOCALNET_ENDPOINT, + ], + internal_command=True, + ) + exists = verify_subnet_entry( + remove_ansi_escape_sequences(subnets_list.stdout), netuid, ss58_address + ) + return exists + + +def verify_subnet_entry(output_text: str, netuid: int, ss58_address: str) -> bool: + """ + Verifies the presence of a specific subnet entry in the subnets list output. + + Args: + output_text (str): Output of execution command. + netuid (str): The netuid to look for. + ss58_address (str): The SS58 address of the subnet owner. + + Returns: + bool: True if the entry is found, False otherwise. + """ + output_text = remove_ansi_escape_sequences(output_text) + lines = output_text.split("\n") + + data_started = False + + for line in lines: + line = line.strip() + if not line: + continue + + if "NETUID" in line: + data_started = True + continue + + if not data_started: + continue + + if set(line) <= {"━", "╇", "β”Ό", "─", "β•ˆ", "═", "β•‘", "╬", "β•£", "β• "}: + continue + + columns = re.split(r"β”‚|\|", line) + columns = [col.strip() for col in columns] + + if len(columns) < 8: + continue + + netuid_col = columns[0] + ss58_address_col = columns[-1] + + if netuid_col == str(netuid) and ss58_address_col == ss58_address: + return True + + return False + + +def get_btcli_version() -> str: + """ + Gets the version of btcli. + """ + try: + result = subprocess.run(["btcli", "--version"], capture_output=True, text=True) + return result.stdout.strip() + except Exception: + return "Not installed or not found" + + +def get_bittensor_wallet_version() -> str: + """ + Gets the version of bittensor-wallet. + """ + try: + result = subprocess.run( + ["pip", "show", "bittensor-wallet"], capture_output=True, text=True + ) + for line in result.stdout.split("\n"): + if line.startswith("Version:"): + return line.split(":")[1].strip() + except Exception: + return "Not installed or not found" + + +def get_bittensor_version() -> str: + """ + Gets the version of bittensor-wallet. + """ + try: + result = subprocess.run( + ["pip", "show", "bittensor"], capture_output=True, text=True + ) + for line in result.stdout.split("\n"): + if line.startswith("Version:"): + return line.split(":")[1].strip() + except Exception: + return "Not installed or not found" + + +def get_python_version() -> str: + """ + Gets the Python version. + """ + return sys.version.split()[0] + + +def get_python_path() -> str: + """ + Gets the Python executable path. + """ + return sys.executable + + +def get_process_info(pid: int) -> Tuple[str, str, str, str, float, float]: + """ + Retrieves process information. + """ + if pid and psutil.pid_exists(pid): + try: + process = psutil.Process(pid) + status = "Running" + cpu_percent = process.cpu_percent(interval=0.1) + memory_percent = process.memory_percent() + cpu_usage = f"{cpu_percent:.1f}%" + memory_usage = f"{memory_percent:.1f}%" + create_time = datetime.fromtimestamp(process.create_time()) + uptime = datetime.now() - create_time + uptime_str = str(uptime).split(".")[0] + return ( + status, + cpu_usage, + memory_usage, + uptime_str, + cpu_percent, + memory_percent, + ) + except Exception as e: + console.print(f"[red]Error retrieving process info: {e}") + status = "Not Running" + cpu_usage = "N/A" + memory_usage = "N/A" + uptime_str = "N/A" + cpu_percent = 0.0 + memory_percent = 0.0 + pid = "N/A" + return status, cpu_usage, memory_usage, uptime_str, cpu_percent, memory_percent + + +def get_process_entries( + config_data: Dict[str, Any], +) -> Tuple[list[Dict[str, str]], list[float], list[float]]: + """ + Gets process entries for display. + """ + cpu_usage_list = [] + memory_usage_list = [] + process_entries = [] + + # Check Subtensor status + pid = config_data.get("pid") + subtensor_path = config_data.get("subtensor_path", "N/A") + status, cpu_usage, memory_usage, uptime_str, cpu_percent, memory_percent = ( + get_process_info(pid) + ) + + status_style = "green" if status == "Running" else "red" + + if status == "Running": + cpu_usage_list.append(cpu_percent) + memory_usage_list.append(memory_percent) + + process_entries.append( + { + "process": "Subtensor", + "status": status, + "status_style": status_style, + "pid": str(pid), + "cpu_usage": cpu_usage, + "memory_usage": memory_usage, + "uptime_str": uptime_str, + "location": subtensor_path, + "venv_path": config_data.get("venv_subtensor"), + } + ) + + # Check status of Subtensor nodes (substrate_pids) + substrate_pids = config_data.get("substrate_pid", []) + for index, sub_pid in enumerate(substrate_pids, start=1): + status, cpu_usage, memory_usage, uptime_str, cpu_percent, memory_percent = ( + get_process_info(sub_pid) + ) + status_style = "green" if status == "Running" else "red" + + if status == "Running": + cpu_usage_list.append(cpu_percent) + memory_usage_list.append(memory_percent) + + process_entries.append( + { + "process": f"Subtensor Node {index}", + "status": status, + "status_style": status_style, + "pid": str(sub_pid), + "cpu_usage": cpu_usage, + "memory_usage": memory_usage, + "uptime_str": uptime_str, + "location": subtensor_path, + "venv_path": "~", + } + ) + + # Check status of Miners + miners = config_data.get("Miners", {}) + for wallet_name, wallet_info in miners.items(): + pid = wallet_info.get("pid") + status, cpu_usage, memory_usage, uptime_str, cpu_percent, memory_percent = ( + get_process_info(pid) + ) + + status_style = "green" if status == "Running" else "red" + + if status == "Running": + cpu_usage_list.append(cpu_percent) + memory_usage_list.append(memory_percent) + + process_entries.append( + { + "process": f"Miner: {wallet_name}", + "status": status, + "status_style": status_style, + "pid": str(pid), + "cpu_usage": cpu_usage, + "memory_usage": memory_usage, + "uptime_str": uptime_str, + "location": config_data.get("subnet_path"), + "venv_path": wallet_info.get("venv"), + } + ) + + # Check status of Validator + owner_data = config_data.get("Owner") + if owner_data: + pid = owner_data.get("pid") + status, cpu_usage, memory_usage, uptime_str, cpu_percent, memory_percent = ( + get_process_info(pid) + ) + status_style = "green" if status == "Running" else "red" + + if status == "Running": + cpu_usage_list.append(cpu_percent) + memory_usage_list.append(memory_percent) + + process_entries.append( + { + "process": f"Validator: {owner_data.get('wallet_name')}", + "status": status, + "status_style": status_style, + "pid": str(pid), + "cpu_usage": cpu_usage, + "memory_usage": memory_usage, + "uptime_str": uptime_str, + "location": config_data.get("subnet_path"), + "venv_path": owner_data.get("venv"), + } + ) + + return process_entries, cpu_usage_list, memory_usage_list + + +def display_process_status_table( + process_entries: list[Dict[str, str]], + cpu_usage_list: list[float], + memory_usage_list: list[float], + config_data=None, +) -> None: + """ + Displays the process status table. + """ + table = Table( + title="\n[underline dark_orange]BTQS Process Manager[/underline dark_orange]\n", + show_footer=True, + show_edge=False, + header_style="bold white", + border_style="bright_black", + style="bold", + title_justify="center", + show_lines=False, + pad_edge=True, + ) + + table.add_column( + "[bold white]Process", + style="white", + no_wrap=True, + footer_style="bold white", + ) + + table.add_column( + "[bold white]Status", + style="bright_cyan", + justify="center", + footer_style="bold white", + ) + + table.add_column( + "[bold white]PID", + style="bright_magenta", + justify="right", + footer_style="bold white", + ) + + table.add_column( + "[bold white]CPU %", + style="light_goldenrod2", + justify="right", + footer_style="bold white", + ) + + table.add_column( + "[bold white]Memory %", + style="light_goldenrod2", + justify="right", + footer_style="bold white", + ) + + table.add_column( + "[bold white]Uptime", + style="dark_sea_green", + justify="right", + footer_style="bold white", + ) + subtensor_venv, neurons_venv = None, None + for entry in process_entries: + if entry["process"] == "Subtensor": + subtensor_venv = entry["venv_path"] + if entry["process"].startswith("Subtensor"): + process_style = "cyan" + elif entry["process"].startswith("Miner"): + process_style = "magenta" + neurons_venv = entry["venv_path"] + elif entry["process"].startswith("Validator"): + process_style = "yellow" + else: + process_style = "white" + + table.add_row( + f"[{process_style}]{entry['process']}[/{process_style}]", + f"[{entry['status_style']}]{entry['status']}[/{entry['status_style']}]", + entry["pid"], + entry["cpu_usage"], + entry["memory_usage"], + entry["uptime_str"], + ) + + # Compute total CPU and Memory usage + total_cpu = sum(cpu_usage_list) + total_memory = sum(memory_usage_list) + + # Set footers for columns + table.columns[0].footer = f"[white]{len(process_entries)} Processes[/white]" + table.columns[3].footer = f"[white]{total_cpu:.1f}%[/white]" + table.columns[4].footer = f"[white]{total_memory:.1f}%[/white]" + + # Display the table + console.print(table) + + if config_data: + print("\n") + wallet_path = config_data.get("wallets_path", "") + if wallet_path: + console.print("[dark_orange]Wallet Path", wallet_path) + subnet_path = config_data.get("subnet_path", "") + if subnet_path: + console.print("[dark_orange]Subnet Path", subnet_path) + workspace_path = config_data.get("workspace_path", "") + if workspace_path: + console.print("[dark_orange]Workspace Path", workspace_path) + if subtensor_venv: + console.print("[dark_orange]Subtensor virtual environment", subtensor_venv) + if neurons_venv: + console.print("[dark_orange]Neurons virtual environment", neurons_venv) + + +def start_miner( + wallet_name: str, + wallet_info: Dict[str, Any], + subnet_template_path: str, + config_data: Dict[str, Any], + venv_python: str, + verbose=False, +) -> bool: + """ + Starts a single miner and displays logs until user presses Ctrl+C. + """ + wallet = Wallet( + path=config_data["wallets_path"], + name=wallet_name, + hotkey=wallet_info["hotkey"], + ) + + miner_command = get_miner_command(wallet_name, config_data) + base_args = [ + "--wallet.name", + wallet.name, + "--wallet.hotkey", + wallet.hotkey_str, + "--wallet.path", + config_data["wallets_path"], + "--subtensor.chain_endpoint", + LOCALNET_ENDPOINT, + "--logging.trace", + ] + cmd = [venv_python, "-u"] + miner_command + base_args + + env_variables = os.environ.copy() + env_variables["BT_AXON_PORT"] = str(wallet_info["port"]) + env_variables["PYTHONUNBUFFERED"] = "1" + + # Setup log files for miner + logs_dir = os.path.join(config_data["workspace_path"], "logs") + os.makedirs(logs_dir, exist_ok=True) + log_file_path = os.path.join( + logs_dir, f"miner_{wallet_name}_{config_data['pid']}.log" + ) + + with open(log_file_path, "a") as log_file: + try: + if verbose: + console.print( + "🏁 [bold yellow]Starting a miner...Press any key to start a miner [bold dark_orange]and wait for 3 to 5 seconds and press Ctrl + C to start the next neuron.\n" + ) + input() + process = subprocess.Popen( + cmd, + cwd=subnet_template_path, + stdout=log_file, + stderr=subprocess.STDOUT, + env=env_variables, + start_new_session=True, + ) + wallet_info["pid"] = process.pid + wallet_info["log_file"] = log_file_path + + # Update config_data + config_data["Miners"][wallet_name] = wallet_info + if verbose: + attach_to_process_logs( + log_file_path, f"Miner {wallet_name}", process.pid + ) + else: + console.print("[green]🎬 Started miner process!\n") + return True + except Exception as e: + console.print(f"[red]Error starting miner {wallet_name}: {e}") + return False + + +def start_validator( + owner_info: Dict[str, Any], + subnet_template_path: str, + config_data: Dict[str, Any], + venv_python: str, + verbose=False, +) -> bool: + """ + Starts the validator process and displays logs until user presses Ctrl+C. + """ + wallet = Wallet( + path=config_data["wallets_path"], + name=owner_info["wallet_name"], + hotkey=owner_info["hotkey"], + ) + + validator_command = get_validator_command(config_data) + base_args = [ + "--wallet.name", + wallet.name, + "--wallet.hotkey", + wallet.hotkey_str, + "--wallet.path", + config_data["wallets_path"], + "--subtensor.chain_endpoint", + LOCALNET_ENDPOINT, + "--logging.trace", + ] + cmd = [venv_python, "-u"] + validator_command + base_args + + env_variables = os.environ.copy() + env_variables["PYTHONUNBUFFERED"] = "1" + env_variables["BT_AXON_PORT"] = str(VALIDATOR_PORT) + + # Setup log files for validator + logs_dir = os.path.join(config_data["workspace_path"], "logs") + os.makedirs(logs_dir, exist_ok=True) + log_file_path = os.path.join(logs_dir, "validator.log") + + if verbose: + print_info( + "Starting a validator...\nAfter the validator starts press Ctrl + C to start the next neuron.\nPress any key to continue...", + emoji="🏁 ", + ) + input() + with open(log_file_path, "a") as log_file: + try: + process = subprocess.Popen( + cmd, + cwd=subnet_template_path, + stdout=log_file, + stderr=subprocess.STDOUT, + env=env_variables, + start_new_session=True, + ) + owner_info["pid"] = process.pid + owner_info["log_file"] = log_file_path + + # Update config_data + config_data["Owner"] = owner_info + if verbose: + attach_to_process_logs(log_file_path, "Validator", process.pid) + else: + console.print("[green]πŸš“ Started validator process!\n") + return True + except Exception as e: + console.print(f"[red]Error starting validator: {e}") + return False + + +def attach_to_process_logs(log_file_path: str, process_name: str, pid: int = None): + """ + Attaches to the log file of a process and prints logs until user presses Ctrl+C or the process terminates. + """ + try: + with open(log_file_path, "r") as log_file: + # Move to the end of the file + log_file.seek(0, os.SEEK_END) + print_info( + f"Attached to {process_name}. Press Ctrl+C to move on.", emoji="πŸ“Ž " + ) + while True: + line = log_file.readline() + if not line: + # Check if the process is still running + if pid and not psutil.pid_exists(pid): + console.print(f"\n[red]{process_name} process has terminated.") + break + time.sleep(0.1) + continue + print(line, end="") + except KeyboardInterrupt: + print_info(f"Detached from {process_name}.", emoji="\nπŸ”Œ ") + except Exception as e: + console.print(f"[red]Error attaching to {process_name}: {e}") + + +def get_miner_command(wallet_name: str, config_data: dict) -> list[str]: + """ + Retrieves the command and arguments to start a miner. + + Args: + wallet_name (str): The name of the miner wallet. + config_data (dict): The loaded configuration data. + + Returns: + List[str]: The command split into a list suitable for subprocess. + """ + miner_info = config_data.get("Miners", {}).get(wallet_name, {}) + custom_command = miner_info.get("miner_command") + if custom_command: + return shlex.split(custom_command) + else: + return shlex.split(DEFAULT_MINER_COMMAND) + + +def get_validator_command(config_data: dict) -> list[str]: + """ + Retrieves the command and arguments to start the validator. + + Args: + config_data (dict): The loaded configuration data. + + Returns: + List[str]: The command split into a list suitable for subprocess. + """ + owner_info = config_data.get("Owner", {}) + custom_command = owner_info.get("validator_command") + if custom_command: + return shlex.split(custom_command) + else: + return shlex.split(DEFAULT_VALIDATOR_COMMAND) diff --git a/btqs_requirements.txt b/btqs_requirements.txt new file mode 100644 index 00000000..14cb04c2 --- /dev/null +++ b/btqs_requirements.txt @@ -0,0 +1,3 @@ +psutil +GitPython +tdqm diff --git a/setup.py b/setup.py index f9c92cda..ca49c9e3 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ def read_requirements(path): requirements = read_requirements("requirements.txt") cuda_requirements = read_requirements("cuda_requirements.txt") +btqs_requirements = read_requirements("btqs_requirements.txt") here = path.abspath(path.dirname(__file__)) @@ -76,10 +77,12 @@ def read_requirements(path): install_requires=requirements, extras_require={ "cuda": cuda_requirements, + "btqs": btqs_requirements, }, entry_points={ "console_scripts": [ "btcli=bittensor_cli.cli:main", + "btqs=btqs.btqs_cli:main", ], }, classifiers=[