Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions framework/docker/container/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type Lifecycle struct {
containerName string
id string
preStartListeners port.Listeners
hostNetwork bool
}

// SetHostNetwork configures the container to use the host's network stack
// instead of a bridge network.
func (c *Lifecycle) SetHostNetwork(enabled bool) {
c.hostNetwork = enabled
}

func NewLifecycle(log *zap.Logger, c types.TastoraDockerClient, containerName string) *Lifecycle {
Expand Down Expand Up @@ -73,54 +80,63 @@ func (c *Lifecycle) CreateContainer(
pS[k] = struct{}{}
}

pb, listeners, err := port.GenerateBindings(ports)
if err != nil {
return fmt.Errorf("failed to generate port bindings: %w", err)
containerCfg := &container.Config{
Image: imageRef,
Entrypoint: entrypoint,
Cmd: cmd,
Env: env,
Hostname: hostName,
Labels: map[string]string{consts.CleanupLabel: c.client.CleanupLabel()},
}

c.preStartListeners = listeners
hostCfg := &container.HostConfig{
Binds: volumeBinds,
AutoRemove: false,
DNS: []string{},
Mounts: mounts,
}

var endpointSettings network.EndpointSettings
if ipAddr == "" {
endpointSettings = network.EndpointSettings{}
var netCfg *network.NetworkingConfig

if c.hostNetwork {
hostCfg.NetworkMode = "host"
} else {
endpointSettings = network.EndpointSettings{
IPAMConfig: &network.EndpointIPAMConfig{
IPv4Address: ipAddr,
containerCfg.ExposedPorts = pS

pb, listeners, err := port.GenerateBindings(ports)
if err != nil {
return fmt.Errorf("failed to generate port bindings: %w", err)
}
c.preStartListeners = listeners

hostCfg.PortBindings = pb
hostCfg.PublishAllPorts = true

var endpointSettings network.EndpointSettings
if ipAddr != "" {
endpointSettings = network.EndpointSettings{
IPAMConfig: &network.EndpointIPAMConfig{
IPv4Address: ipAddr,
},
}
}
netCfg = &network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
networkID: &endpointSettings,
},
}
}

cc, err := c.client.ContainerCreate(
ctx,
&container.Config{
Image: imageRef,

Entrypoint: entrypoint,
Cmd: cmd,
Env: env,
Hostname: hostName,
Labels: map[string]string{consts.CleanupLabel: c.client.CleanupLabel()},
ExposedPorts: pS,
},
&container.HostConfig{
Binds: volumeBinds,
PortBindings: pb,
PublishAllPorts: true,
AutoRemove: false,
DNS: []string{},
Mounts: mounts,
},
&network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
networkID: &endpointSettings,
},
},
containerCfg,
hostCfg,
netCfg,
nil,
c.containerName,
)
if err != nil {
listeners.CloseAll()
c.preStartListeners.CloseAll()
c.preStartListeners = port.Listeners{}
return err
}
Expand Down
27 changes: 18 additions & 9 deletions framework/docker/evstack/spamoor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ type Builder struct {
logger *zap.Logger
image container.Image

rpcHosts []string
privKey string
nameSuffix string
rpcHosts []string
privKey string
nameSuffix string
additionalStartArgs []string
hostNetwork bool
}

func NewNodeBuilder(testName string) *Builder {
Expand All @@ -34,15 +36,22 @@ func (b *Builder) WithImage(img container.Image) *Builder { b.image = img; retur
func (b *Builder) WithRPCHosts(hosts ...string) *Builder { b.rpcHosts = hosts; return b }
func (b *Builder) WithPrivateKey(pk string) *Builder { b.privKey = pk; return b }
func (b *Builder) WithNameSuffix(s string) *Builder { b.nameSuffix = s; return b }
func (b *Builder) WithAdditionalStartArgs(args ...string) *Builder {
b.additionalStartArgs = args
return b
}
func (b *Builder) WithHostNetwork() *Builder { b.hostNetwork = true; return b }

func (b *Builder) Build(ctx context.Context) (*Node, error) {
cfg := Config{
DockerClient: b.dockerClient,
DockerNetworkID: b.dockerNetwork,
Logger: b.logger,
Image: b.image,
RPCHosts: b.rpcHosts,
PrivateKey: b.privKey,
DockerClient: b.dockerClient,
DockerNetworkID: b.dockerNetwork,
Logger: b.logger,
Image: b.image,
RPCHosts: b.rpcHosts,
PrivateKey: b.privKey,
AdditionalStartArgs: b.additionalStartArgs,
HostNetwork: b.hostNetwork,
}
return newNode(ctx, cfg, b.testName, 0, b.nameSuffix)
}
Expand Down
27 changes: 20 additions & 7 deletions framework/docker/evstack/spamoor/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ type Config struct {
Logger *zap.Logger
Image container.Image

RPCHosts []string
PrivateKey string
RPCHosts []string
PrivateKey string
AdditionalStartArgs []string
HostNetwork bool
}

type Node struct {
Expand All @@ -50,7 +52,11 @@ func newNode(ctx context.Context, cfg Config, testName string, index int, name s
log := cfg.Logger.With(zap.String("component", "spamoor-daemon"), zap.Int("i", index))
n := &Node{cfg: cfg, logger: log, name: name}
n.Node = container.NewNode(cfg.DockerNetworkID, cfg.DockerClient, testName, cfg.Image, "/home/spamoor", index, nodeType(0), log)
n.SetContainerLifecycle(container.NewLifecycle(cfg.Logger, cfg.DockerClient, n.Name()))
lc := container.NewLifecycle(cfg.Logger, cfg.DockerClient, n.Name())
if cfg.HostNetwork {
lc.SetHostNetwork(true)
}
n.SetContainerLifecycle(lc)
if err := n.CreateAndSetupVolume(ctx, n.Name()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,11 +95,17 @@ func (n *Node) Start(ctx context.Context) error {
if err := n.ContainerLifecycle.StartContainer(ctx); err != nil {
return err
}
hostPorts, err := n.ContainerLifecycle.GetHostPorts(ctx, defaultInternalPorts().Web+"/tcp")
if err != nil {
return err

var mapped string
if n.cfg.HostNetwork {
mapped = defaultInternalPorts().Web
} else {
hostPorts, err := n.ContainerLifecycle.GetHostPorts(ctx, defaultInternalPorts().Web+"/tcp")
if err != nil {
return err
}
mapped = internal.MustExtractPort(hostPorts[0])
}
mapped := internal.MustExtractPort(hostPorts[0])
n.external = types.Ports{HTTP: mapped}
n.started = true
// readiness wait for /metrics endpoint (best-effort)
Expand Down Expand Up @@ -123,6 +135,7 @@ func (n *Node) createNodeContainer(ctx context.Context) error {
cmd = append(cmd, "--rpchost", s)
}
}
cmd = append(cmd, n.cfg.AdditionalStartArgs...)

port := nat.Port(p.Web + "/tcp")
ports := nat.PortMap{
Expand Down
142 changes: 142 additions & 0 deletions framework/docker/victoriatraces/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package victoriatraces

import (
"context"
"fmt"
"sync"

"github.com/celestiaorg/tastora/framework/docker/container"
"github.com/celestiaorg/tastora/framework/docker/internal"
"github.com/celestiaorg/tastora/framework/types"
"github.com/docker/go-connections/nat"
"go.uber.org/zap"
)

type nodeType int

func (nodeType) String() string { return "victoriatraces" }

// default port for VictoriaTraces — serves both OTLP HTTP ingest and query API.
const defaultHTTPPort = "10428"

type Config struct {
Logger *zap.Logger
DockerClient types.TastoraDockerClient
DockerNetworkID string
Image container.Image
}

type Node struct {
*container.Node

cfg Config
logger *zap.Logger
started bool
mu sync.Mutex

internalHTTPPort string
externalHTTPPort string

Internal scope
External scope
}

func New(ctx context.Context, cfg Config, testName string, index int) (*Node, error) {
img := cfg.Image
if img.Repository == "" {
img = container.NewImage("victoriametrics/victoria-traces", "latest", "")
}
log := cfg.Logger.With(zap.String("component", "victoriatraces"), zap.Int("i", index))
home := "/home/victoriatraces"
n := &Node{cfg: cfg, logger: log}
n.Node = container.NewNode(cfg.DockerNetworkID, cfg.DockerClient, testName, img, home, index, nodeType(0), log)
n.SetContainerLifecycle(container.NewLifecycle(cfg.Logger, cfg.DockerClient, n.Name()))
if err := n.CreateAndSetupVolume(ctx, n.Name()); err != nil {
return nil, err
}
n.Internal = scope{hostname: func() string { return n.Name() }, port: &n.internalHTTPPort}
n.External = scope{hostname: func() string { return "0.0.0.0" }, port: &n.externalHTTPPort}
return n, nil
}

func (n *Node) Name() string {
return fmt.Sprintf("victoriatraces-%d-%s", n.Index, internal.SanitizeDockerResourceName(n.TestName))
}

func (n *Node) HostName() string {
return internal.CondenseHostName(n.Name())
}

func (n *Node) Start(ctx context.Context) error {
n.mu.Lock()
defer n.mu.Unlock()
if n.started {
return n.StartContainer(ctx)
}
if err := n.createContainer(ctx); err != nil {
return err
}
if err := n.ContainerLifecycle.StartContainer(ctx); err != nil {
return err
}
hostPorts, err := n.ContainerLifecycle.GetHostPorts(ctx, n.internalHTTPPort+"/tcp")
if err != nil {
return err
}
n.externalHTTPPort = internal.MustExtractPort(hostPorts[0])
n.started = true
return nil
}

func (n *Node) createContainer(ctx context.Context) error {
if n.internalHTTPPort == "" {
n.internalHTTPPort = defaultHTTPPort
}

ports := nat.PortMap{
nat.Port(n.internalHTTPPort + "/tcp"): {},
}
cmd := []string{"-storageDataPath", n.HomeDir() + "/data"}
return n.CreateContainer(ctx, n.TestName, n.NetworkID, n.Image, ports, "", n.Bind(), nil, n.HostName(), cmd, nil, nil)
}

func (n *Node) GetNetworkInfo(ctx context.Context) (types.NetworkInfo, error) {
internalIP, err := internal.GetContainerInternalIP(ctx, n.DockerClient, n.ContainerLifecycle.ContainerID())
if err != nil {
return types.NetworkInfo{}, err
}
return types.NetworkInfo{
Internal: types.Network{
Hostname: n.HostName(),
IP: internalIP,
Ports: types.Ports{HTTP: n.internalHTTPPort},
},
External: types.Network{
Hostname: "0.0.0.0",
Ports: types.Ports{HTTP: n.externalHTTPPort},
},
}, nil
}

// scope provides scoped (internal/external) access to VictoriaTraces endpoints.
type scope struct {
hostname func() string
port *string
}

// IngestHTTPEndpoint returns the full OTLP HTTP ingest URL including the /v1/traces path.
// Use this for exporters (like Go's otlptracehttp with WithEndpointURL) that send to the
// URL as-is without appending any path.
func (s scope) IngestHTTPEndpoint() string {
return fmt.Sprintf("http://%s:%s/insert/opentelemetry/v1/traces", s.hostname(), *s.port)
}

// OTLPBaseEndpoint returns the OTLP base URL without the /v1/traces suffix.
// Use this for exporters (like Rust's opentelemetry-otlp) that auto-append /v1/traces.
func (s scope) OTLPBaseEndpoint() string {
return fmt.Sprintf("http://%s:%s/insert/opentelemetry", s.hostname(), *s.port)
}

func (s scope) QueryURL() string {
return fmt.Sprintf("http://%s:%s", s.hostname(), *s.port)
}
Loading