Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/appolly/discover/docker_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func ddlog() *slog.Logger {
type dockerAPIClient interface {
IsEnabled(context.Context) bool
ContainerInfo(context.Context, app.PID) (docker.ContainerMeta, bool)
InvalidatePID(app.PID)
}

func DockerDiscoveryDecoratorProvider(
Expand Down Expand Up @@ -69,6 +70,7 @@ func (dd *dockerDecorator) decorate(ctx context.Context) {
}
case EventDeleted:
delete(dd.containerByPID, ev.Obj.pid)
dd.docker.InvalidatePID(ev.Obj.pid)
}
}
dd.out.SendCtx(ctx, instrumentables)
Expand Down
219 changes: 210 additions & 9 deletions pkg/docker/docker_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ package docker // import "go.opentelemetry.io/obi/pkg/docker"

import (
"context"
"errors"
"io"
"log/slog"
"maps"
"strings"
"sync"
"time"

"github.com/moby/moby/api/types/events"
"github.com/moby/moby/client"

"go.opentelemetry.io/obi/pkg/appolly/app"
Expand All @@ -18,21 +22,42 @@ import (
"go.opentelemetry.io/obi/pkg/internal/helpers/container"
)

const composeServiceLabelKey = "com.docker.compose.service"
const (
composeServiceLabelKey = "com.docker.compose.service"
// abbreviationLength defines the length for the short ID form
abbreviationLength = 12
)

func cmlog() *slog.Logger {
return slog.With("component", "docker.ContainerStore")
}

var osInfoForPID = container.InfoForPID

// Full length ID as provided by the docker API
type ContainerID string

type ContainerMeta struct {
// TODO: add other fields https://opentelemetry.io/docs/specs/semconv/resource/container/
ID string
ID string // short form ID limited to abbreviationLength
FullID ContainerID
Name string
ComposeService string
}

// containerEntry groups container metadata with the PIDs known to belong to it.
// This allows a single map lookup to both retrieve metadata and support PID-based invalidation.
type containerEntry struct {
meta ContainerMeta
pids []app.PID
}

// dockerClient defines the Docker API methods needed by ContainerStore.
type dockerClient interface {
ContainerInspect(ctx context.Context, container string, options client.ContainerInspectOptions) (client.ContainerInspectResult, error)
Events(ctx context.Context, options client.EventsListOptions) client.EventsResult
}

// ContainerStore caches access to the Docker container API.
// The behavior can be overridden via environment variables:
// - DOCKER_HOST to set the URL to the docker server.
Expand All @@ -43,14 +68,21 @@ type ContainerMeta struct {
// - DOCKER_TLS_VERIFY to enable or disable TLS verification
// (off by default).
type ContainerStore struct {
initMutex sync.Mutex
docker client.ContainerAPIClient
log *slog.Logger
initMutex sync.Mutex
docker dockerClient
log *slog.Logger
watcherStarted sync.Once

cacheMu sync.RWMutex
byPID map[app.PID]ContainerMeta
byContainerID map[ContainerID]containerEntry // metadata + PIDs keyed by full container ID
}

func NewStore() *ContainerStore {
return &ContainerStore{
log: cmlog(),
log: cmlog(),
byPID: make(map[app.PID]ContainerMeta),
byContainerID: make(map[ContainerID]containerEntry),
}
}

Expand Down Expand Up @@ -93,11 +125,49 @@ func (s *ContainerStore) initialize(ctx context.Context) {
// ContainerInfo returns the ContainerMeta that is associated to the provided PID.
// It also returns true if the ContainerMeta was found for the provided PID. False otherwise
func (s *ContainerStore) ContainerInfo(ctx context.Context, pid app.PID) (ContainerMeta, bool) {
s.cacheMu.RLock()
if ci, ok := s.byPID[pid]; ok {
Comment thread
MrAlias marked this conversation as resolved.
s.cacheMu.RUnlock()
return ci, true
}
s.cacheMu.RUnlock()

osCntInfo, err := osInfoForPID(pid)
if err != nil {
s.log.Debug("failed to get OS container info for pid", "pid", pid, "error", err)
return ContainerMeta{}, false
}

// Reuse metadata if another PID from the same container is already cached.
// We acquire the write lock directly to avoid a TOCTOU race: if the container
// is invalidated between the read check and the write, we must not cache stale metadata.
fullContainerID := ContainerID(osCntInfo.ContainerID)
s.cacheMu.Lock()
if entry, ok := s.byContainerID[fullContainerID]; ok {
// Re-validate that the PID still belongs to this container while holding the lock.
currentInfo, err := osInfoForPID(pid)
if err != nil || ContainerID(currentInfo.ContainerID) != fullContainerID {
s.cacheMu.Unlock()
return ContainerMeta{}, false
}
meta := entry.meta
seen := false
for _, cachedPID := range entry.pids {
if cachedPID == pid {
seen = true
break
}
}
if !seen {
entry.pids = append(entry.pids, pid)
s.byContainerID[fullContainerID] = entry
}
s.byPID[pid] = meta
s.cacheMu.Unlock()
return meta, true
}
s.cacheMu.Unlock()

inspectResult, err := s.docker.ContainerInspect(ctx, osCntInfo.ContainerID, client.ContainerInspectOptions{})
if err != nil {
s.log.Debug("failed to inspect docker container",
Expand All @@ -108,7 +178,6 @@ func (s *ContainerStore) ContainerInfo(ctx context.Context, pid app.PID) (Contai
}

inspectInfo := inspectResult.Container
const abbreviationLength = 12
containerID := inspectInfo.ID
if len(containerID) > abbreviationLength {
containerID = containerID[:abbreviationLength]
Expand All @@ -119,12 +188,45 @@ func (s *ContainerStore) ContainerInfo(ctx context.Context, pid app.PID) (Contai
composeSvcName = inspectInfo.Config.Labels[composeServiceLabelKey]
}

return ContainerMeta{
meta := ContainerMeta{
// some containers start with '/'. Removing it
Name: strings.Trim(inspectInfo.Name, "/"),
ID: containerID,
FullID: ContainerID(inspectInfo.ID),
ComposeService: composeSvcName,
}, true
}

s.cacheMu.Lock()
// Re-validate that the PID still belongs to the inspected container: the process
// may have exited while ContainerInspect was in flight, causing InvalidatePID to
// be a no-op (byPID entry didn't exist yet), and we would cache stale metadata.
currentInfo, err := osInfoForPID(pid)
if err != nil || ContainerID(currentInfo.ContainerID) != meta.FullID {
s.cacheMu.Unlock()
return ContainerMeta{}, false
}
if entry, ok := s.byContainerID[meta.FullID]; ok {
meta = entry.meta
seen := false
for _, cachedPID := range entry.pids {
if cachedPID == pid {
seen = true
break
}
}
if !seen {
entry.pids = append(entry.pids, pid)
}
s.byPID[pid] = meta
s.byContainerID[meta.FullID] = entry
s.cacheMu.Unlock()
return meta, true
}
s.byPID[pid] = meta
s.byContainerID[meta.FullID] = containerEntry{meta: meta, pids: []app.PID{pid}}
s.cacheMu.Unlock()
Comment thread
florianl marked this conversation as resolved.
Comment thread
florianl marked this conversation as resolved.

return meta, true
}

func (ci *ContainerMeta) DecorateService(s *svc.Attrs) {
Expand Down Expand Up @@ -167,3 +269,102 @@ func ContainerMetadata[T ~string](dst map[T]string, ci *ContainerMeta, stringer
out[stringer(attr.ContainerID)] = ci.ID

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for data consistency, I'd remove ID field and replace it here by attr.ContainerId[:abbreviationLength]

return out
}

// Start begins the event watcher goroutine to invalidate and remove
// metadata of destroyed containers.
func (s *ContainerStore) Start(ctx context.Context) {
s.watcherStarted.Do(func() {
s.initMutex.Lock()
s.initialize(ctx)
s.initMutex.Unlock()
go s.watchContainerEvents(ctx)
})
}

func (s *ContainerStore) watchContainerEvents(ctx context.Context) {
for {
s.initMutex.Lock()
s.initialize(ctx)
docker := s.docker
s.initMutex.Unlock()

if docker == nil {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
continue
}

fltrs := make(client.Filters).
Add("type", string(events.ContainerEventType)).
Add("event", string(events.ActionDie), string(events.ActionDestroy))

if err := s.eventsLoop(ctx, fltrs); err != nil && !errors.Is(err, context.Canceled) {
s.log.Debug("docker event stream error", "error", err)
}

select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
}
Comment thread
florianl marked this conversation as resolved.
}

func (s *ContainerStore) eventsLoop(ctx context.Context, fltrs client.Filters) error {
result := s.docker.Events(ctx, client.EventsListOptions{Filters: fltrs})
for {
select {
case msg, ok := <-result.Messages:
if !ok {
return nil
}
if msg.Actor.ID != "" {
s.invalidateContainer(msg.Actor.ID)
}
case err, ok := <-result.Err:
if !ok || errors.Is(err, io.EOF) {
return nil
}
return err
case <-ctx.Done():
return context.Canceled
}
}
}

func (s *ContainerStore) InvalidatePID(pid app.PID) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()

meta, ok := s.byPID[pid]
if !ok {
return
}
delete(s.byPID, pid)

entry := s.byContainerID[meta.FullID]
newPIDs := entry.pids[:0]
for _, cachedPID := range entry.pids {
if cachedPID != pid {
newPIDs = append(newPIDs, cachedPID)
}
}

if len(newPIDs) == 0 {
delete(s.byContainerID, meta.FullID)
return
}
s.byContainerID[meta.FullID] = containerEntry{meta: entry.meta, pids: newPIDs}
}

func (s *ContainerStore) invalidateContainer(containerID string) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
for _, pid := range s.byContainerID[ContainerID(containerID)].pids {
delete(s.byPID, pid)
}
delete(s.byContainerID, ContainerID(containerID))
}
Loading