Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/svcinit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ go_library(

go_binary(
name = "svcinit",
embed = [":svcinit_lib"],
data = ["//cmd/get_assigned_port"],
embed = [":svcinit_lib"],
visibility = ["//visibility:public"],
x_defs = {
"getAssignedPortRlocationPath": "$(rlocationpath //cmd/get_assigned_port)",
},
visibility = ["//visibility:public"],
)
4 changes: 2 additions & 2 deletions cmd/svcinit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func main() {

// Make sure we grab the svcctl port before we assign test ports,
// otherwise we might steal an assigned port by accident.
listener, err := net.Listen("tcp", "127.0.0.1:0")
listener, err := net.Listen("tcp", "127.0.0.1:50549")
must(err)

ports, err := assignPorts(unversionedSpecs)
Expand Down Expand Up @@ -216,7 +216,7 @@ func main() {
fmt.Println(err)
} else {
timeoutVal -= int(math.Ceil(testStartTime.Sub(start).Seconds()))
testCmd.Env = append(testCmd.Env, "TEST_TIMEOUT=" + strconv.Itoa(timeoutVal))
testCmd.Env = append(testCmd.Env, "TEST_TIMEOUT="+strconv.Itoa(timeoutVal))
}
}

Expand Down
30 changes: 27 additions & 3 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -51,6 +52,16 @@ func colorize(s svclib.VersionedServiceSpec) string {
return s.Colorize(s.Label)
}

func (r *Runner) ServiceLabels() []string {
var labels []string
for label, service := range r.serviceInstances {
if service.Type == "service" {
labels = append(labels, label)
}
}
return labels
}

func (r *Runner) StartAll(serviceErrCh chan error) ([]topological.Task, error) {
tasks := allTasks(r.serviceInstances, func(ctx context.Context, service *ServiceInstance) error {
if service.Type == "group" {
Expand Down Expand Up @@ -233,11 +244,18 @@ func prepareServiceInstance(ctx context.Context, s svclib.VersionedServiceSpec)
}, nil
}

logPath := "/tmp/child.log"
log, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}

instance := &ServiceInstance{
VersionedServiceSpec: s,
log: log,
}

err := initializeServiceCmd(ctx, instance)
err = initializeServiceCmd(ctx, instance)
if err != nil {
return nil, err
}
Expand All @@ -256,8 +274,14 @@ func initializeServiceCmd(ctx context.Context, instance *ServiceInstance) error
for k, v := range s.Env {
cmd.Env = append(cmd.Env, k+"="+v)
}
cmd.Stdout = logger.New(s.Label+"> ", s.Color, os.Stdout)
cmd.Stderr = logger.New(s.Label+"> ", s.Color, os.Stderr)
cmd.Stdout = io.MultiWriter(
logger.New(s.Label+"> ", s.Color, os.Stdout),
instance.log,
)
cmd.Stderr = io.MultiWriter(
logger.New(s.Label+"> ", s.Color, os.Stderr),
instance.log,
)

if shouldUseProcessGroups {
setPgid(cmd)
Expand Down
7 changes: 6 additions & 1 deletion runner/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type ServiceInstance struct {
svclib.VersionedServiceSpec
stdin io.WriteCloser
log *os.File
cmd *exec.Cmd

startTime time.Time
Expand Down Expand Up @@ -217,7 +218,11 @@ func (s *ServiceInstance) Stop(sig syscall.Signal) error {
time.Sleep(5 * time.Millisecond)
}

return nil
return s.log.Close()
}

func (s *ServiceInstance) LogPath() string {
return s.log.Name()
}

func (s *ServiceInstance) Wait() error {
Expand Down
5 changes: 4 additions & 1 deletion svcctl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "svcctl",
srcs = ["svcctl.go"],
srcs = [
"svcctl.go",
"ui.go",
],
importpath = "rules_itest/svcctl",
visibility = ["//visibility:public"],
deps = [
Expand Down
2 changes: 2 additions & 0 deletions svcctl/svcctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (p portHandler) handle(ctx context.Context, r *runner.Runner, _ chan error,

func Serve(ctx context.Context, listener net.Listener, r *runner.Runner, ports svclib.Ports, servicesErrCh chan error) error {
mux := http.NewServeMux()
handle(ctx, mux, r, servicesErrCh, "GET /", handleUI)
handle(ctx, mux, r, servicesErrCh, "GET /v0/log", handleLog)
handle(ctx, mux, r, servicesErrCh, "GET /v0/healthcheck", handleHealthCheck)
handle(ctx, mux, r, servicesErrCh, "GET /v0/start", handleStart)
handle(ctx, mux, r, servicesErrCh, "GET /v0/kill", handleKill)
Expand Down
211 changes: 211 additions & 0 deletions svcctl/ui.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
//go:build go1.22

package svcctl

import (
"context"
"fmt"
"html/template"
"io"
"net/http"
"os"
"strconv"
"time"

"rules_itest/runner"
)

const pageTemplate = `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Service Logs</title>
<style>
body {
display: flex;
height: 100vh;
margin: 0;
font-family: sans-serif;
}

#sidebar {
width: 200px;
background-color: #f0f0f0;
border-right: 1px solid #ccc;
padding: 10px;
box-sizing: border-box;
}

#main {
flex: 1;
display: flex;
flex-direction: column;
}

#log {
flex: 1;
padding: 10px;
overflow-y: scroll;
white-space: pre-wrap;
background-color: #000;
color: #0f0;
font-family: monospace;
font-size: 14px;
border-top: 1px solid #ccc;
}

.service {
cursor: pointer;
padding: 5px;
border-radius: 3px;
margin-bottom: 5px;
}

.service:hover {
background-color: #ddd;
}

.service.active {
background-color: #bbb;
font-weight: bold;
}
</style>
</head>
<body>
<div id="sidebar">
{{ range $index, $svc := .Services }}
<div class="service {{ if eq $index 0 }}active{{ end }}" data-service="{{ $svc }}">{{ $svc }}</div>
{{ end }}
</div>

<div id="main">
<div id="log">Connecting...</div>
</div>

<script>
const logDiv = document.getElementById('log');
let offset = 0;
let controller = null;
let currentService = document.querySelector('.service.active').getAttribute('data-service');

async function fetchLogs() {
controller = new AbortController();
const signal = controller.signal;

try {
const url = new URL('/v0/log', window.location);
url.searchParams.set('service', currentService);
url.searchParams.set('offset', offset);
const response = await fetch(url, { signal });
const decoder = new TextDecoder("utf-8");

for await (const chunk of response.body) {
const text = decoder.decode(chunk, { stream: true });
const isAtBottom = logDiv.scrollTop + logDiv.clientHeight >= logDiv.scrollHeight - 5;

logDiv.textContent += text;
offset += chunk.length;

if (isAtBottom) {
logDiv.scrollTop = logDiv.scrollHeight;
}
}
} catch (err) {
if (err.name === 'AbortError') {
console.log('Fetch aborted');
} else {
console.error('Fetch logs failed:', err);
logDiv.textContent += '\n--- connection error ---\n';
}
}
}

function start() {
if (controller) {
controller.abort();
}
logDiv.textContent = '';
offset = 0;
fetchLogs();
}

// Sidebar click
document.querySelectorAll('.service').forEach(el => {
el.addEventListener('click', () => {
document.querySelectorAll('.service').forEach(s => s.classList.remove('active'));
el.classList.add('active');

currentService = el.getAttribute('data-service');
start();
});
});

start();
</script>
</body>
</html>
`

var tmpl = template.Must(template.New("page").Parse(pageTemplate))

func handleUI(ctx context.Context, r *runner.Runner, _ chan error, w http.ResponseWriter, req *http.Request) {
err := tmpl.Execute(w, struct{ Services []string }{Services: r.ServiceLabels()})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

func handleLog(ctx context.Context, r *runner.Runner, _ chan error, w http.ResponseWriter, req *http.Request) {
instance := r.GetInstance(req.URL.Query().Get("service"))

offsetStr := req.URL.Query().Get("offset")
var offset int64
if offsetStr != "" {
offset, _ = strconv.ParseInt(offsetStr, 10, 64)
}

f, err := os.Open(instance.LogPath())
if err != nil {
http.Error(w, "failed to open log file", http.StatusInternalServerError)
return
}
defer f.Close()

_, err = f.Seek(offset, io.SeekStart)
if err != nil {
http.Error(w, "failed to seek log file", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Transfer-Encoding", "chunked")

buf := make([]byte, 4096)
fmt.Println("ZZZZ starting transfer")
for {
n, err := f.Read(buf)
fmt.Println("ZZZ finished read", instance.LogPath())
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
// client disconnected
fmt.Printf("Client disconnected: %v\n", writeErr)
return
}
fmt.Println("writing bytes", n)
if flusher, ok := w.(http.Flusher); ok {
fmt.Println("flushing", n)
flusher.Flush()
}
}

if err == io.EOF {
time.Sleep(100 * time.Millisecond)
continue
} else if err != nil {
// Some other error
fmt.Printf("Error reading log file: %v\n", err)
return
}
}
}
8 changes: 8 additions & 0 deletions tests/go_service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ func main() {
w.Write([]byte(strconv.Itoa(fibSink)))
})

go func() {
for {
time.Sleep(100 * time.Millisecond)
log.Print("HIII STILL ALIVE")
}
}()

serve(*port, *soReuseport)

}

func fib(n int) int {
Expand Down
Loading