Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions catkit2/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,15 +684,8 @@ PYBIND11_MODULE(catkit_bindings, m)
.def_property_readonly("config", &TestbedProxy::GetConfig)
.def_property_readonly("host", &TestbedProxy::GetHost)
.def_property_readonly("port", &TestbedProxy::GetPort)
.def_property_readonly("logging_egress_port", &TestbedProxy::GetLoggingEgressPort)
.def_property_readonly("active_services", &TestbedProxy::GetActiveServices)
.def_property_readonly("inactive_services", &TestbedProxy::GetInactiveServices)
.def_property_readonly("logging_ingress_port", &TestbedProxy::GetLoggingIngressPort)
.def_property_readonly("logging_egress_port", &TestbedProxy::GetLoggingEgressPort)
.def_property_readonly("data_logging_ingress_port", &TestbedProxy::GetDataLoggingIngressPort)
.def_property_readonly("data_logging_egress_port", &TestbedProxy::GetDataLoggingEgressPort)
.def_property_readonly("tracing_ingress_port", &TestbedProxy::GetTracingIngressPort)
.def_property_readonly("tracing_egress_port", &TestbedProxy::GetTracingEgressPort)
.def_property_readonly("base_data_path", &TestbedProxy::GetBaseDataPath)
.def_property_readonly("support_data_path", &TestbedProxy::GetSupportDataPath)
.def_property_readonly("long_term_monitoring_path", &TestbedProxy::GetLongTermMonitoringPath)
Expand Down Expand Up @@ -853,8 +846,8 @@ PYBIND11_MODULE(catkit_bindings, m)
.def(py::init<>())
.def("connect", &LogForwarder::Connect);

m.def("trace_connect", [](std::string process_name, std::string host, int port) {
tracing_proxy.Connect(process_name, host, port);
m.def("trace_connect", [](std::string process_name, std::shared_ptr<LocalMessageBroker> broker) {
tracing_proxy.Connect(process_name, broker);
});
m.def("trace_disconnect", []() {
tracing_proxy.Disconnect();
Expand Down Expand Up @@ -1087,7 +1080,9 @@ PYBIND11_MODULE(catkit_bindings, m)
return py::none();
});

py::class_<LocalMessageBroker, std::shared_ptr<LocalMessageBroker>>(m, "LocalMessageBroker")
py::class_<MessageBroker, std::shared_ptr<MessageBroker>>(m, "MessageBroker");

py::class_<LocalMessageBroker, Shareable, MessageBroker, std::shared_ptr<LocalMessageBroker>>(m, "LocalMessageBroker")
.def_static("create", [](std::shared_ptr<Memory> header, std::vector<std::shared_ptr<Memory>> memory_blocks)
{
auto stream = StructStream(header);
Expand Down
2 changes: 0 additions & 2 deletions catkit2/testbed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@
'trace_interval',
'trace_instant',
'trace_counter',
'ZmqDistributor',
]

from .testbed import *
from .experiment import *
from .service import *
from .logging import *
from .tracing import *
from .distributor import *
from .testbed_proxy import *
from .service_proxy import *
86 changes: 0 additions & 86 deletions catkit2/testbed/distributor.py

This file was deleted.

6 changes: 3 additions & 3 deletions catkit2/testbed/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ def run(self):

# Set up log forwarder.
log_forwarder = LogForwarder()
log_forwarder.connect('experiment', f'tcp://{self.testbed.host}:{self.testbed.logging_ingress_port}')
log_forwarder.connect('experiment', self.testbed.message_broker)

# Set up log writer.
self._log_writer = LogWriter(self.testbed.host, self.testbed.logging_egress_port)
self._log_writer = LogWriter(self.testbed.message_broker)
self._log_writer.start()

# Set up logging to terminal.
self._log_terminal = LogTerminal(self.testbed.host, self.testbed.logging_egress_port)
self._log_terminal = LogTerminal(self.testbed.message_broker)
self._log_terminal.start()

log_writer = Experiment._running_experiments[0]._log_writer
Expand Down
64 changes: 35 additions & 29 deletions catkit2/testbed/logging.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
import threading
import zmq
import json
import contextlib
from colorama import Fore, Back, Style

from ..catkit_bindings import submit_log_entry, Severity
from ..catkit_bindings import submit_log_entry, Severity, MessageSubscriptionMode

class CatkitLogHandler(logging.StreamHandler):
'''A log handler to pipe Python log messages into the catkit2 logging system.
Expand All @@ -27,17 +26,18 @@ def emit(self, record):
submit_log_entry(filename, line, function, severity, message)

class LogObserver:
def __init__(self, host, port):
self.context = zmq.Context()
self.host = host
self.port = port
def __init__(self, broker):
self.broker = broker
self.subscription = None

self.shutdown_flag = threading.Event()
self.thread = None

def start(self):
'''Start the proxy thread.
'''
# Subscribe to all log topics
self.subscription = self.broker.subscribe('logs', mode=MessageSubscriptionMode.Sequential)
self.thread = threading.Thread(target=self.loop)
self.thread.start()

Expand All @@ -52,36 +52,28 @@ def stop(self):
self.thread.join()

def loop(self):
# Set up sockets.
socket = self.context.socket(zmq.SUB)
socket.connect(f'tcp://{self.host}:{self.port}')
socket.subscribe('')
socket.RCVTIMEO = 50

# Main loop.
while not self.shutdown_flag.is_set():
# Receive new log message.
try:
log_message = socket.recv_multipart()
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# Timed out.
message = self.subscription.get_next_message(timeout_in_sec=0.1)
if message is None:
continue
else:
raise RuntimeError('Error during receive.') from e
except Exception:
continue

# Decode log message.
log_message = log_message[0].decode('ascii')
log_message = json.loads(log_message)
# Decode log message from payload.
payload = message.payload.tobytes().decode('utf-8')
log_message = json.loads(payload)

self.handle_message(log_message)

def handle_message(self, log_message):
pass

class LogWriter(LogObserver):
def __init__(self, host, port, log_format=None):
super().__init__(host, port)
def __init__(self, broker, log_format=None):
super().__init__(broker)

if log_format is None:
log_format = '{time} - {service_id} - {severity} - {message}'
Expand Down Expand Up @@ -128,8 +120,15 @@ def handle_message(self, log_message):
if severity.value < self.level.value:
return

# Add service_id to log_message for formatting (extract from source).
log_message_for_format = log_message.copy()
log_message_for_format['service_id'] = log_message['source']['service_id']
log_message_for_format['filename'] = log_message['source']['file']
log_message_for_format['line'] = log_message['source']['line']
log_message_for_format['function'] = log_message['source']['function']

# Format output message.
message = self.log_format.format(**log_message)
message = self.log_format.format(**log_message_for_format)

# Write log message to file.
with self.file_lock:
Expand All @@ -138,8 +137,8 @@ def handle_message(self, log_message):
self._output_file.flush()

class LogTerminal(LogObserver):
def __init__(self, host, port):
super().__init__(host, port)
def __init__(self, broker):
super().__init__(broker)

self.level = Severity.WARNING
self.colors = {
Expand All @@ -152,13 +151,20 @@ def __init__(self, host, port):

def handle_message(self, log_message):
severity = getattr(Severity, log_message['severity'].upper())
service_id = log_message['source']['service_id']

if log_message['service_id'] != 'experiment':
if service_id != 'experiment':
if severity.value < self.level.value:
return

header = '{time} - {severity: <8} - {service_id} - {filename}:{line}'.format(**log_message)
formatted_message = '{message}'.format(**log_message)
header = '{time} - {severity: <8} - {service_id} - {file}:{line}'.format(
time=log_message['time'],
severity=log_message['severity'],
service_id=service_id,
file=log_message['source']['file'],
line=log_message['source']['line']
)
formatted_message = '{message}'.format(message=log_message['message'])

print(header)
print(self.colors[severity] + formatted_message + Style.RESET_ALL)
Loading
Loading