Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 59 additions & 135 deletions axiom/cli/Console.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

#include "axiom/cli/Console.h"
#include <sys/resource.h>
#include <iostream>
#include "axiom/cli/linenoise/linenoise.h"

Expand Down Expand Up @@ -47,64 +46,7 @@ using namespace facebook::velox;

namespace axiom::sql {

void Console::initialize() {
gflags::SetUsageMessage(
"Axiom local SQL command line. "
"Run 'axiom_sql --help' for available options.\n");

// Disable logging to stderr if not in debug mode.
FLAGS_logtostderr = FLAGS_debug;
}

void Console::run() {
if (!FLAGS_query.empty()) {
runNoThrow(FLAGS_query, false);
} else {
std::cout << "Axiom SQL. Type statement and end with ;.\n"
"flag name = value; sets a gflag.\n"
"help; prints help text."
<< std::endl;
readCommands("SQL> ");
}
}

namespace {
struct Timing {
uint64_t micros{0};
uint64_t userNanos{0};
uint64_t systemNanos{0};

std::string toString() const {
double pct = 0;
if (micros > 0) {
pct = 100 * (userNanos + systemNanos) / (micros * 1000);
}

std::stringstream out;
out << succinctNanos(micros * 1000) << " / " << succinctNanos(userNanos)
<< " user / " << succinctNanos(systemNanos) << " system (" << pct
<< "%)";
return out.str();
}
};

template <typename T>
T time(const std::function<T()>& func, Timing& timing) {
struct rusage start{};
getrusage(RUSAGE_SELF, &start);
SCOPE_EXIT {
struct rusage end{};
getrusage(RUSAGE_SELF, &end);
auto tvNanos = [](struct timeval tv) {
return tv.tv_sec * 1'000'000'000 + tv.tv_usec * 1'000;
};
timing.userNanos = tvNanos(end.ru_utime) - tvNanos(start.ru_utime);
timing.systemNanos = tvNanos(end.ru_stime) - tvNanos(start.ru_stime);
};

MicrosecondTimer timer(&timing.micros);
return func();
}
namespace util::print {

int64_t countResults(const std::vector<RowVectorPtr>& results) {
int64_t numRows = 0;
Expand Down Expand Up @@ -213,52 +155,24 @@ int32_t printResults(const std::vector<RowVectorPtr>& results) {

return numRows;
}
} // namespace

void Console::runNoThrow(std::string_view sql, bool isInteractive) {
const SqlQueryRunner::RunOptions options{
.numWorkers = FLAGS_num_workers,
.numDrivers = FLAGS_num_drivers,
.splitTargetBytes = FLAGS_split_target_bytes,
.optimizerTraceFlags = FLAGS_optimizer_trace,
.debugMode = FLAGS_debug,
};

decltype(runner_.parseMultiple(sql, options)) statements;
try {
// Parse all statements upfront.
statements = runner_.parseMultiple(sql, options);
} catch (std::exception& e) {
std::cerr << "Parse failed: " << e.what() << std::endl;
return;
}

// Execute each statement with timing.
for (const auto& statement : statements) {
try {
Timing statementTiming;
auto result = time<SqlQueryRunner::SqlResult>(
[&]() { return runner_.run(*statement, options); }, statementTiming);

if (result.message.has_value()) {
std::cout << result.message.value() << std::endl;
} else {
printResults(result.results);
}

if (isInteractive) {
// In interactive mode, show per-statement timing.
std::cout << statementTiming.toString() << std::endl;
}
} catch (std::exception& e) {
std::cerr << "Query failed: " << e.what() << std::endl;
// Stop executing remaining statements in this block.
return;
void printOutcomes(const std::vector<StatementOutcome>& outcomes) {
for (const auto& outcome : outcomes) {
if (outcome.message.has_value()) {
std::cout << outcome.message.value() << std::endl;
}
if (!outcome.data.empty()) {
printResults(outcome.data);
}
if (outcome.timing.has_value()) {
std::cout << outcome.timing->toString() << std::endl;
}
}
}

namespace {
} // namespace util::print

namespace util::read {

// Reads multi-line command from 'in' until encounters ';' followed by
// zero or
Expand Down Expand Up @@ -315,7 +229,42 @@ std::string readCommand(const std::string& prompt, bool& atEnd) {
atEnd = true;
return "";
}
} // namespace
} // namespace util::read

void Console::initialize() {
gflags::SetUsageMessage(
"Axiom local SQL command line. "
"Run 'axiom_sql --help' for available options.\n");

// Disable logging to stderr if not in debug mode.
FLAGS_logtostderr = FLAGS_debug;

const SqlQueryRunner::RunOptions options{
.numWorkers = FLAGS_num_workers,
.numDrivers = FLAGS_num_drivers,
.splitTargetBytes = FLAGS_split_target_bytes,
.optimizerTraceFlags = FLAGS_optimizer_trace,
.debugMode = FLAGS_debug,
// Interactive mode prints time elapsed for each statement.
.measureTiming = FLAGS_query.empty(),
.historyPath = FLAGS_data_path + "/.history",
};

runner_.registerCommands(options);
}

void Console::run() {
if (!FLAGS_query.empty()) {
auto result = runner_.handleCommand(FLAGS_query);
util::print::printOutcomes(result.outcomes);
} else {
std::cout << "Axiom SQL. Type statement and end with ;.\n"
"flag name = value; sets a gflag.\n"
"help; prints help text."
<< std::endl;
readCommands("SQL> ");
}
}

void Console::readCommands(const std::string& prompt) {
linenoiseSetMultiLine(1);
Expand All @@ -325,7 +274,7 @@ void Console::readCommands(const std::string& prompt) {

for (;;) {
bool atEnd;
std::string command = readCommand(prompt, atEnd);
std::string command = util::read::readCommand(prompt, atEnd);
if (atEnd) {
break;
}
Expand All @@ -334,21 +283,14 @@ void Console::readCommands(const std::string& prompt) {
continue;
}

if (command.starts_with("exit") || command.starts_with("quit")) {
break;
}

if (command.starts_with("help")) {
static const char* helpText =
"Axiom Interactive SQL\n\n"
"Type SQL and end with ';'.\n"
"To set a flag, type 'flag <gflag_name> = <value>;' Leave a space on either side of '='.\n\n"
"Useful flags:\n\n"
"num_workers - Make a distributed plan for this many workers. Runs it in-process with remote exchanges with serialization and passing data in memory. If num_workers is 1, makes single node plans without remote exchanges.\n\n"
"num_drivers - Specifies the parallelism for workers. This many threads per pipeline per worker.\n\n";
// Try registered commands first, then execute SQL.
auto result = runner_.handleCommand(command);

std::cout << helpText;
continue;
if (result.handled) {
util::print::printOutcomes(result.outcomes);
}
if (result.shouldExit) {
break;
}

char* flag = nullptr;
Expand Down Expand Up @@ -398,25 +340,7 @@ void Console::readCommands(const std::string& prompt) {
}
continue;
}

if (sscanf(command.c_str(), "session %ms = %ms", &flag, &value) == 2) {
std::cout << "Session '" << flag << "' set to '" << value << "'"
<< std::endl;
runner_.sessionConfig()[std::string(flag)] = std::string(value);
continue;
}

if (command.starts_with("savehistory")) {
runner_.saveHistory(FLAGS_data_path + "/.history");
continue;
}

if (command.starts_with("clearhistory")) {
runner_.clearHistory();
continue;
}

runNoThrow(command);
}
}

} // namespace axiom::sql
7 changes: 0 additions & 7 deletions axiom/cli/Console.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ class Console {
void run();

private:
// Executes SQL and prints results, catching any exceptions.
// @param sql The SQL text to execute, which may contain multiple
// semicolon-separated statements.
// @param isInteractive If true, shows timing after each statement for
// multi-statement queries.
void runNoThrow(std::string_view sql, bool isInteractive = true);

// Reads and executes commands from standard input in interactive mode.
void readCommands(const std::string& prompt);

Expand Down
Loading