This document describes the technical architecture and development practices for Osmedeus. It's intended for developers who want to understand, modify, or extend the codebase.
- Project Structure
- Architecture Overview
- Core Components
- Workflow Engine
- Agent Step Type
- Agent-ACP Step Type
- Execution Pipeline
- Runner System
- Authentication Middleware
- Template Engine
- Function Registry
- Scheduler System
- Workflow Linter
- Database Layer
- SARIF Integration
- Workflow Hooks
- Queue System
- Nmap Integration
- Tmux Session Management
- SSH & Distributed Sync
- Webhook Triggers
- Testing
- Canary Testing
- Adding New Features
- CLI Shortcuts and Tips
osmedeus/
├── cmd/osmedeus/ # Application entry point
├── internal/ # Private packages
│ ├── client/ # Remote API client
│ ├── config/ # Configuration management
│ ├── console/ # Console output capture
│ ├── core/ # Core types (Workflow, Step, Trigger, etc.)
│ ├── database/ # SQLite/PostgreSQL via Bun ORM
│ ├── cloud/ # Cloud infrastructure provisioning (DO, AWS, GCP, Linode, Azure)
│ ├── distributed/ # Distributed execution (master/worker, worker ID: wosm-<uuid8>)
│ ├── executor/ # Workflow execution engine
│ ├── fileio/ # High-performance file I/O (mmap)
│ ├── functions/ # Utility functions (Goja JS runtime)
│ ├── heuristics/ # Target type detection
│ ├── installer/ # Binary installation (direct/Nix)
│ ├── linter/ # Workflow linting and validation
│ ├── logger/ # Structured logging (Zap)
│ ├── parser/ # YAML parsing and caching
│ ├── runner/ # Execution environments (host/docker/ssh)
│ ├── scheduler/ # Trigger scheduling (cron/event/watch)
│ ├── snapshot/ # Workspace export/import
│ ├── state/ # Run state export
│ ├── template/ # {{Variable}} interpolation engine
│ ├── terminal/ # Terminal UI (colors, tables, spinners)
│ ├── updater/ # Self-update via GitHub releases
│ └── workspace/ # Workspace management
├── lib/ # Shared library utilities
├── pkg/ # Public packages
│ ├── cli/ # Cobra CLI commands
│ └── server/ # Fiber REST API server
│ ├── handlers/ # Request handlers
│ └── middleware/ # Auth middleware (JWT, API Key)
├── public/ # Public assets (examples, presets, UI)
├── test/ # Test suites
│ ├── e2e/ # E2E CLI tests
│ ├── integration/ # Integration tests
│ └── testdata/ # Test workflow fixtures
├── docs/ # API documentation
└── build/ # Build artifacts and Docker files
Osmedeus follows a layered architecture:
┌─────────────────────────────────────────────────────────────┐
│ CLI / API │
│ (pkg/cli, pkg/server) │
├─────────────────────────────────────────────────────────────┤
│ Executor Layer │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ Executor │ │ Dispatcher │ │ Step Executors │ │
│ │ │ │ │ │ (bash, function, │ │
│ │ │ │ │ │ foreach, etc.) │ │
│ └─────────────┘ └──────────────┘ └────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Runner Layer │
│ ┌──────────────┐ ┌───────────────┐ ┌─────────────────┐ │
│ │ Host Runner │ │ Docker Runner │ │ SSH Runner │ │
│ └──────────────┘ └───────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Support Systems │
│ ┌──────────────┐ ┌───────────────┐ ┌─────────────────┐ │
│ │ Template │ │ Functions │ │ Scheduler │ │
│ │ Engine │ │ Registry │ │ (triggers) │ │
│ └──────────────┘ └───────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Data Layer │
│ ┌──────────────┐ ┌───────────────┐ ┌─────────────────┐ │
│ │ Parser/ │ │ Database │ │ Workspace │ │
│ │ Loader │ │ (SQLite/PG) │ │ Manager │ │
│ └──────────────┘ └───────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
// internal/core/workflow.go
type Workflow struct {
Kind WorkflowKind // "module" or "flow"
Name string
Description string
Params []Param
Triggers []Trigger
Runner RunnerType
RunnerConfig *RunnerConfig
Steps []Step // For modules
Modules []ModuleRef // For flows
}Module: Single execution unit with sequential steps Flow: Orchestrates multiple modules with dependency management
// internal/core/step.go
type Step struct {
Name string
Type StepType // bash, function, foreach, parallel-steps, remote-bash, http, llm, agent, agent-acp
PreCondition string // Skip condition
Command string // For bash/remote-bash
Commands []string // Multiple commands
Function string // For function type
Input string // For foreach
Variable string // Foreach variable name
Threads int // Foreach parallelism
Step *Step // Nested step for foreach
ParallelSteps []Step // For parallel-steps type
StepRunner RunnerType // For remote-bash: docker or ssh
StepRunnerConfig *StepRunnerConfig // Runner config for remote-bash
// Agent step fields
Query string // Task prompt for the agent
Queries []string // Multiple queries (multi-goal mode)
SystemPrompt string // System prompt for the agent
AgentTools []AgentToolDef // Preset or custom tools
MaxIterations int // Max tool-calling loop iterations
Models []string // Preferred models (tried in order)
SubAgents []SubAgentDef // Inline sub-agents spawnable via spawn_agent
MaxAgentDepth int // Max nesting depth for sub-agents (default: 3)
Memory *AgentMemoryConfig // Sliding window, summarization, persistence
OutputSchema string // JSON schema for structured output
StopCondition string // JS expression evaluated after each iteration
PlanPrompt string // Planning stage prompt
OnToolStart string // JS hook before each tool call
OnToolEnd string // JS hook after each tool call
// Agent-ACP step fields
Agent string // Built-in ACP agent name (claude-code, codex, etc.)
Cwd string // Working directory for ACP session
AllowedPaths []string // Restrict file access to these directories
ACPConfig *ACPStepConfig // Custom agent command, env, write permissions
Exports map[string]string
OnSuccess []Action
OnError []Action
Decision *DecisionConfig // Conditional branching (switch/case)
}The remote-bash step type allows per-step Docker or SSH execution, independent of the module-level runner:
steps:
- name: docker-scan
type: remote-bash
step_runner: docker
step_runner_config:
image: alpine:latest
volumes:
- /data:/data
command: nmap -sV {{target}}
- name: ssh-scan
type: remote-bash
step_runner: ssh
step_runner_config:
host: "{{ssh_host}}"
port: 22
user: "{{ssh_user}}"
key_file: ~/.ssh/id_rsa
command: whoami && hostnameSteps can include decision routing to jump to different steps based on switch/case matching:
steps:
- name: detect-type
type: bash
command: echo "{{target_type}}"
exports:
detected_type: "output"
decision:
switch: "{{detected_type}}"
cases:
"domain":
goto: subdomain-enum
"ip":
goto: port-scan
"cidr":
goto: network-scan
default:
goto: generic-recon
- name: subdomain-enum
type: bash
command: subfinder -d {{target}}
decision:
switch: "always"
cases:
"always":
goto: _end # Special value to end workflowThe _end special value terminates workflow execution from the current step.
The agent step type implements an agentic LLM execution loop. It sends a query to the LLM with available tools, executes tool calls returned by the LLM, feeds results back, and repeats until the LLM responds without tool calls or max_iterations is reached.
1. Planning stage (optional) ──▶ LLM generates a plan from plan_prompt
2. Initialize conversation ──▶ system_prompt + query + plan (if any)
3. Main agent loop:
a. Send conversation to LLM (with tools)
b. If no tool_calls → done
c. Execute tool calls (parallel or sequential)
d. Append tool results to conversation
e. Evaluate stop_condition (if defined)
f. Apply memory window (if configured)
g. Repeat until max_iterations
4. Structured output (optional) ──▶ Final LLM call with output_schema
5. Persist conversation (if memory.persist_path set)
steps:
- name: my-agent
type: agent
query: "Analyze {{Target}} and report findings."
system_prompt: "You are a security analyst."
max_iterations: 10
agent_tools:
- preset: bash
- preset: read_file
- preset: http_get
- name: custom_tool
description: "My custom tool"
parameters:
type: object
properties:
input:
type: string
required: [input]
handler: 'process(args.input)'
models:
- gpt-4o
- claude-sonnet-4-20250514
memory:
max_messages: 30
summarize_on_truncate: true
persist_path: "{{Output}}/agent/conversation.json"
resume_path: "{{Output}}/agent/conversation.json"
stop_condition: 'contains(agent_content, "DONE")'
output_schema: '{"type":"object","properties":{"summary":{"type":"string"}}}'
plan_prompt: "Create a plan for analyzing the target."
on_tool_start: 'log_info("Tool: " + tool_name)'
on_tool_end: 'log_info("Result: " + result)'
parallel_tool_calls: true
exports:
findings: "{{agent_content}}"All preset tools are defined in PresetToolRegistry (internal/core/agent_tool_presets.go):
| Preset | Description | Parameters |
|---|---|---|
bash |
Execute a shell command | command |
read_file |
Read file contents | path |
read_lines |
Read file as array of lines | path |
file_exists |
Check if a file exists | path |
file_length |
Count non-empty lines in a file | path |
append_file |
Append content from source to dest | dest, content |
save_content |
Write string content to a file | content, path |
glob |
Find files matching a glob pattern | pattern |
grep_string |
Search file for lines containing a string | source, str |
grep_regex |
Search file for lines matching a regex | source, pattern |
http_get |
Make an HTTP GET request | url |
http_request |
Make an HTTP request with method/headers/body | url, method, body?, headers? |
jq |
Query JSON data using jq syntax | json_data, expression |
exec_python |
Run inline Python code | code |
exec_python_file |
Run a Python file | path |
run_module |
Run an osmedeus module | module, target, params? |
run_flow |
Run an osmedeus flow | flow, target, params? |
Custom tools use a JS handler expression. The parsed arguments are available as the args object:
agent_tools:
- name: check_domain
description: "Validate if a string is a valid domain"
parameters:
type: object
properties:
domain:
type: string
required: [domain]
handler: 'contains(args.domain, ".")'Agents can delegate to sub-agents via the spawn_agent tool (automatically added when sub_agents is defined):
steps:
- name: orchestrator
type: agent
query: "Analyze {{Target}} by coordinating specialists"
system_prompt: "You are an orchestrator. Delegate tasks to sub-agents."
max_iterations: 10
max_agent_depth: 3
agent_tools:
- preset: bash
sub_agents:
- name: recon_agent
description: "Specialized agent for reconnaissance"
system_prompt: "You are a recon specialist"
max_iterations: 5
agent_tools:
- preset: bash
- preset: http_get
- name: vuln_scanner
description: "Specialized agent for vulnerability scanning"
max_iterations: 5
agent_tools:
- preset: bash
- preset: read_fileSub-agents are implemented via SubAgentToolExecutor in internal/executor/tool_executor.go. Child token counts are merged into the parent via agentState.MergeTokens().
- Sliding window:
max_messageslimits conversation history (system message always kept) - Summarization:
summarize_on_truncate: trueuses LLM to summarize dropped messages - Persistence:
persist_pathsaves conversation JSON after completion - Resume:
resume_pathloads a prior conversation on start
| Export | Description |
|---|---|
agent_content |
Final text output from the agent |
agent_history |
Full conversation history as JSON |
agent_iterations |
Number of iterations completed |
agent_total_tokens |
Total tokens used (including sub-agents) |
agent_prompt_tokens |
Prompt tokens used |
agent_completion_tokens |
Completion tokens used |
agent_tool_results |
All tool call results as JSON |
agent_plan |
Plan generated by planning stage (if used) |
agent_goal_results |
Results per query in multi-goal mode (JSON) |
Hook expressions receive these variables:
tool_name- Name of the tool being calledtool_args- JSON string of tool argumentsresult- Tool result (empty inon_tool_start)duration- Execution time in ms (0 inon_tool_start)iteration- Current agent iteration numbererror- Error string (empty if no error)
The agent-acp step type spawns an external AI coding agent as a subprocess and communicates via the Agent Communication Protocol (ACP). Unlike the agent step type (which uses the internal LLM loop), agent-acp delegates to real agent binaries.
| Agent Name | Command | Args |
|---|---|---|
claude-code |
npx |
-y @zed-industries/claude-code-acp@latest |
codex |
npx |
-y @zed-industries/codex-acp |
opencode |
opencode |
acp |
gemini |
gemini |
--experimental-acp |
Defined in builtinACPAgents map in internal/executor/acp_executor.go.
┌───────────────────────────────────────────────────────────────┐
│ ACPExecutor.Execute() │
│ 1. Resolve agent name → command + args │
│ 2. Build prompt from step.Messages │
│ 3. Call RunAgentACP() standalone function │
│ a. Spawn subprocess with stdin/stdout pipes │
│ b. Create ACP client (acpClient) for callbacks │
│ c. ACP Initialize → NewSession → Prompt │
│ d. Collect agent output via SessionUpdate callbacks │
│ 4. Return output as StepResult with exports │
└───────────────────────────────────────────────────────────────┘
steps:
- name: acp-agent
type: agent-acp
agent: claude-code # Built-in agent name
cwd: "{{Output}}" # Working directory
allowed_paths:
- "{{Output}}"
acp_config:
env:
CUSTOM_VAR: "value"
write_enabled: true # Allow file writes (default: false)
messages:
- role: system
content: "You are a security analyst."
- role: user
content: "Analyze the scan results."
exports:
analysis: "{{acp_output}}"The acpClient (internal/executor/acp_client.go) implements the acp.Client interface:
| Method | Behavior |
|---|---|
SessionUpdate |
Accumulates agent text output, logs tool calls and thoughts |
RequestPermission |
Auto-approves by selecting allow_once → allow_always → first option |
ReadTextFile |
Reads files scoped to allowedPaths |
WriteTextFile |
Writes files if writeEnabled is true |
CreateTerminal / KillTerminalCommand / etc. |
No-op stubs |
| Export | Description |
|---|---|
acp_output |
Collected agent text output |
acp_stderr |
Agent process stderr |
acp_agent |
Agent name used |
RunAgentACP(ctx, prompt, agentName, cfg) can be called independently from workflow execution (used by the CLI agent command and the API endpoint):
output, stderr, err := executor.RunAgentACP(ctx, "your prompt", "claude-code", &executor.RunAgentACPConfig{
Cwd: "/workspace",
AllowedPaths: []string{"/workspace"},
WriteEnabled: false,
})osmedeus agent "your message" # Default agent (claude-code)
osmedeus agent --agent codex "your message" # Specific agent
osmedeus agent --list # List available agents
osmedeus agent --cwd /path "message" # Set working directory
osmedeus agent --timeout 1h "message" # Custom timeout
echo "message" | osmedeus agent --stdin # Read from stdinPOST /osm/api/agent/chat/completions provides an OpenAI-compatible interface:
curl -X POST http://localhost:8000/osm/api/agent/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"claude-code","messages":[{"role":"user","content":"Hello"}]}'The model field maps to a built-in agent name. Only one ACP agent can run at a time (returns 409 Conflict if busy).
// internal/core/context.go
type ExecutionContext struct {
WorkflowName string
WorkflowKind WorkflowKind
RunID string
Target string
Variables map[string]interface{}
Params map[string]string
Exports map[string]interface{}
StepIndex int
Logger *zap.Logger
}The context is passed through the execution pipeline and accumulates state:
- Variables are set by the executor (built-in variables)
- Params are user-provided
- Exports are step outputs that propagate to subsequent steps
The parser (internal/parser/parser.go) handles YAML parsing:
type Parser struct{}
func (p *Parser) Parse(path string) (*core.Workflow, error)
func (p *Parser) Validate(workflow *core.Workflow) errorThe loader (internal/parser/loader.go) provides caching and lookup:
type Loader struct {
workflowsDir string
modulesDir string
cache map[string]*core.Workflow
}
func (l *Loader) LoadWorkflow(name string) (*core.Workflow, error)
func (l *Loader) ListFlows() ([]string, error)
func (l *Loader) ListModules() ([]string, error)Lookup order:
- Check cache
- Try
workflows/<name>.yaml - Try
workflows/<name>-flow.yaml - Try
workflows/modules/<name>.yaml - Try
workflows/modules/<name>-module.yaml
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ CLI/API │────▶│ Executor │────▶│ Dispatcher │
└──────────────┘ └──────────────┘ └──────────────┘
│
┌────────────────────────────┼────────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ BashExecutor │ │FunctionExec │ │ForeachExec │
└──────────────┘ └──────────────┘ └──────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ HTTPExecutor │ │ LLMExecutor │ │AgentExecutor │
└──────────────┘ └──────────────┘ └──────────────┘
┌──────────────┐
│ ACPExecutor │
└──────────────┘
│ │ │
└────────────────────────────┼────────────────────────────┘
▼
┌──────────────┐
│ Runner │
└──────────────┘
// internal/executor/executor.go
type Executor struct {
templateEngine *template.Engine
functionRegistry *functions.Registry
stepDispatcher *StepDispatcher
}
func (e *Executor) ExecuteModule(ctx context.Context, module *core.Workflow,
params map[string]string, cfg *config.Config) (*core.WorkflowResult, error)
func (e *Executor) ExecuteFlow(ctx context.Context, flow *core.Workflow,
params map[string]string, cfg *config.Config) (*core.WorkflowResult, error)Key responsibilities:
- Initialize execution context with built-in variables
- Create and setup the appropriate runner
- Iterate through steps, dispatching to appropriate handler
- Handle pre-conditions, exports, and decision routing
- Process on_success/on_error actions
The dispatcher uses a plugin registry pattern for extensible step type handling:
// internal/executor/dispatcher.go
type StepDispatcher struct {
registry *PluginRegistry // Extensible executor registry
templateEngine *template.Engine
functionRegistry *functions.Registry
bashExecutor *BashExecutor // Registered as plugin
llmExecutor *LLMExecutor // Registered as plugin
runner runner.Runner
}
// PluginRegistry manages step type executors
type PluginRegistry struct {
executors map[core.StepType]StepExecutor
}
// StepExecutor interface for all step type handlers
type StepExecutor interface {
CanHandle(stepType core.StepType) bool
Execute(ctx context.Context, step *core.Step, execCtx *core.ExecutionContext, runner runner.Runner) (*core.StepResult, error)
}
func (d *StepDispatcher) Dispatch(ctx context.Context, step *core.Step,
execCtx *core.ExecutionContext) (*core.StepResult, error)Built-in executors registered at startup:
BashExecutor- handlesbashstepsFunctionExecutor- handlesfunctionstepsForeachExecutor- handlesforeachstepsParallelExecutor- handlesparallel-stepsstepsRemoteBashExecutor- handlesremote-bashstepsHTTPExecutor- handleshttpstepsLLMExecutor- handlesllmstepsAgentExecutor- handlesagentsteps (agentic LLM loop with tool calling)ACPExecutor- handlesagent-acpsteps (ACP subprocess agents)
The run control plane tracks active workflow executions for cancellation support:
// internal/executor/run_control_plane.go
type RunControlPlane struct {
mu sync.RWMutex
runs map[string]*ActiveRun
}
type ActiveRun struct {
RunUUID string
Cancel context.CancelFunc
PIDs *sync.Map // Currently running process IDs
StartedAt time.Time
}
// Key operations
func (r *RunControlPlane) Register(runUUID string, cancel context.CancelFunc) *ActiveRun
func (r *RunControlPlane) Cancel(runUUID string) ([]int, error) // Returns killed PIDs
func (r *RunControlPlane) AddPID(runUUID string, pid int)
func (r *RunControlPlane) RemovePID(runUUID string, pid int)The control plane is accessed via GetRunControlPlane() singleton. When a run is cancelled:
- Context is cancelled to stop new operations
- All tracked PIDs are killed via SIGKILL (including process groups)
// internal/runner/runner.go
type Runner interface {
Execute(ctx context.Context, command string) (*CommandResult, error)
Setup(ctx context.Context) error
Cleanup(ctx context.Context) error
Type() core.RunnerType
IsRemote() bool
}
type CommandResult struct {
Output string
ExitCode int
Error error
}Simple local execution using os/exec:
func (r *HostRunner) Execute(ctx context.Context, command string) (*CommandResult, error) {
cmd := exec.CommandContext(ctx, "sh", "-c", command)
// ... execute and capture output
}Supports both ephemeral (docker run --rm) and persistent (docker exec) modes:
type DockerRunner struct {
config *core.RunnerConfig
containerID string // For persistent mode
}
func (r *DockerRunner) Execute(ctx context.Context, command string) (*CommandResult, error) {
if r.config.Persistent && r.containerID != "" {
return r.execInContainer(ctx, command)
}
return r.runEphemeral(ctx, command)
}Uses golang.org/x/crypto/ssh for remote execution:
type SSHRunner struct {
config *core.RunnerConfig
client *ssh.Client
}
func (r *SSHRunner) Setup(ctx context.Context) error {
// Build auth methods (key or password)
// Establish SSH connection
// Optionally copy binary to remote
}The server supports two authentication methods:
| Method | Header | Description |
|---|---|---|
| API Key | x-osm-api-key |
Simple token-based auth |
| JWT | Authorization: Bearer <token> |
Token from /osm/api/login |
// pkg/server/server.go - setupRoutes()
if s.config.Server.EnabledAuthAPI {
api.Use(middleware.APIKeyAuth(s.config))
} else if !s.options.NoAuth {
api.Use(middleware.JWTAuth(s.config))
}Priority order:
- API Key Auth - If
EnabledAuthAPIis true - JWT Auth - If API key auth disabled and NoAuth is false
- No Auth - If NoAuth option is true
// pkg/server/middleware/auth.go
func APIKeyAuth(cfg *config.Config) fiber.Handler {
return func(c *fiber.Ctx) error {
apiKey := c.Get("x-osm-api-key")
if !isValidAPIKey(apiKey, cfg.Server.AuthAPIKey) {
return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{
"error": true,
"message": "Invalid or missing API key",
})
}
return c.Next()
}
}Security features:
- Case-sensitive exact matching
- Rejects empty/whitespace-only keys
- Rejects placeholder values ("null", "undefined", "nil")
The template engine (internal/template/engine.go) handles {{variable}} interpolation:
type Engine struct{}
func (e *Engine) Render(template string, ctx map[string]interface{}) (string, error)Resolution order:
- Check context variables
- Check environment variables (optional)
- Return empty string if not found
// internal/executor/executor.go
func (e *Executor) injectBuiltinVariables(cfg *config.Config, params map[string]string,
execCtx *core.ExecutionContext) {
execCtx.SetVariable("BaseFolder", cfg.BaseFolder)
execCtx.SetVariable("Target", params["target"])
execCtx.SetVariable("Output", filepath.Join(workspacesPath, targetSpace))
execCtx.SetVariable("threads", threads)
execCtx.SetVariable("RunUUID", execCtx.RunUUID)
// ... more variables
}Foreach uses [[variable]] syntax (double brackets) to avoid conflicts with template variables:
- name: process-items
type: foreach
input: "/path/to/items.txt"
variable: item
step:
command: echo [[item]] # Replaced during foreach iterationFunctions are implemented in Go and exposed to a Goja JavaScript VM with pooling for performance:
// internal/functions/goja_runtime.go
type GojaRuntime struct {
pool *GojaPool // Pool of pre-warmed VMs
}
func NewGojaRuntime() *GojaRuntime {
pool := NewGojaPool(4) // Pool size
return &GojaRuntime{pool: pool}
}
// Pool provides thread-safe VM reuse with compiled program caching
type GojaPool struct {
vms []*gojaVM
compiledProgs sync.Map // Cached compiled JS programs
}- Add the Go implementation in the appropriate file:
// internal/functions/file_functions.go
func (vf *vmFunc) myNewFunction(call goja.FunctionCall) goja.Value {
arg := call.Argument(0).String()
// ... implementation
return vf.vm.ToValue(output)
}- Register in
goja_runtime.go:
_ = vm.Set("my_new_function", vf.myNewFunction)- Add constant in
constants.go:
const FnMyNewFunction = "my_new_function"Notable utility functions include exec_python(code) and exec_python_file(path) for running Python code, exec_ts(code) and exec_ts_file(path) for running TypeScript via bun, and run_module(module, target, params) / run_flow(flow, target, params) for launching osmedeus workflows as subprocesses.
The skip(message?) function aborts remaining steps in the current module. In a flow, execution continues to the next module. It raises ErrSkipModule (defined in internal/functions/constants.go).
These functions provide output and execution control within workflows:
// internal/functions/util_functions.go
// printf prints a message to stdout
func (r *OttoRuntime) printf(call otto.FunctionCall) otto.Value
// catFile prints file content to stdout
func (r *OttoRuntime) catFile(call otto.FunctionCall) otto.Value
// exit exits the scan with given code (0=success, non-zero=error)
func (r *OttoRuntime) exit(call otto.FunctionCall) otto.ValueUsage in workflows:
steps:
- name: print-status
type: function
function: printf("Scan completed for {{Target}}")
- name: show-results
type: function
function: cat_file("{{Output}}/results.txt")These functions enable event-driven workflows by generating and emitting events:
// internal/functions/event_functions.go
// generate_event emits a single structured event
// Usage: generate_event(workspace, topic, source, data_type, data)
func (vf *vmFunc) generateEvent(call goja.FunctionCall) goja.Value
// generate_event_from_file emits an event for each line in a file
// Usage: generate_event_from_file(workspace, topic, source, data_type, filePath)
func (vf *vmFunc) generateEventFromFile(call goja.FunctionCall) goja.ValueUsage in workflows:
steps:
- name: emit-single-event
type: function
function: |
generate_event("{{Workspace}}", "assets.new", "scanner", "subdomain", "api.example.com")
- name: emit-from-file
type: function
function: |
generate_event_from_file("{{Workspace}}", "assets.new", "recon", "subdomain", "{{Output}}/subdomains.txt")Event delivery uses a fallback chain:
- Server API - POST to
/osm/api/events/emitif server configured - Redis Pub/Sub - Publish to
osm:events:{topic}in distributed mode - Database Queue - Store in
event_logstable withprocessed=false - Webhooks - Send to configured webhook endpoints
Functions for parsing SARIF (Static Analysis Results Interchange Format) output from SAST tools:
// internal/functions/sarif_functions.go
// db_import_sarif imports vulnerabilities from a SARIF file into the database
// Supports output from: Semgrep, Trivy, Kingfisher, Bearer
// Usage: db_import_sarif(workspace, file_path) -> {new, updated, unchanged, errors, total}
func (vf *vmFunc) dbImportSARIF(call goja.FunctionCall) goja.Value
// convert_sarif_to_markdown converts a SARIF file to a markdown table
// Usage: convert_sarif_to_markdown(input_path, output_path) -> bool
func (vf *vmFunc) convertSARIFToMarkdown(call goja.FunctionCall) goja.ValueUsage in workflows:
steps:
- name: import-sarif
type: function
function: |
db_import_sarif("{{Workspace}}", "{{Output}}/semgrep.sarif")
- name: sarif-report
type: function
function: |
convert_sarif_to_markdown("{{Output}}/trivy.sarif", "{{Output}}/trivy-report.md")SARIF severity mapping: error → high, warning → medium, note → low, none → info.
// internal/functions/type_functions.go
// detect_language detects the dominant programming language in a directory
// Supports 26+ languages via file extension and shebang analysis
// Skips non-source dirs: node_modules, vendor, .git, __pycache__, etc.
// Usage: detect_language(path) -> string ("golang", "python", "javascript", etc.)
func (vf *vmFunc) detectLanguage(call goja.FunctionCall) goja.Value// internal/functions/file_functions.go
// extract_to auto-detects archive format and extracts to destination
// Supports: .zip, .tar.gz, .tgz, .tar.bz2, .tar.xz
// Removes destination directory first (idempotent)
// Usage: extract_to(source, dest) -> bool
func (vf *vmFunc) extractTo(call goja.FunctionCall) goja.ValueUsage in workflows:
steps:
- name: detect-lang
type: function
function: |
detect_language("{{Output}}/repo")
exports:
lang: "output"
- name: extract-repo
type: function
function: |
extract_to("/tmp/repo.tar.gz", "{{Output}}/repo")// internal/functions/registry.go
func (r *Registry) Execute(expr string, ctx map[string]interface{}) (interface{}, error) {
return r.runtime.Execute(expr, ctx)
}
func (r *Registry) EvaluateCondition(condition string, ctx map[string]interface{}) (bool, error) {
return r.runtime.EvaluateCondition(condition, ctx)
}// internal/core/trigger.go
type TriggerType string
const (
TriggerManual TriggerType = "manual"
TriggerCron TriggerType = "cron"
TriggerEvent TriggerType = "event"
TriggerWatch TriggerType = "watch"
)The scheduler manages workflow triggers using gocron for cron jobs and fsnotify for file watching:
// internal/scheduler/scheduler.go
type Scheduler struct {
scheduler gocron.Scheduler
triggers map[string]*RegisteredTrigger
handlers map[string]TriggerHandler
events chan *core.Event
// File watcher (fsnotify-based)
watcher *fsnotify.Watcher
watchPaths map[string][]*RegisteredTrigger // path → triggers mapping
}
func (s *Scheduler) RegisterTrigger(workflow *core.Workflow, trigger *core.Trigger) error
func (s *Scheduler) EmitEvent(event *core.Event) error
func (s *Scheduler) Start() error // Starts cron scheduler, file watcher, and event listener
func (s *Scheduler) Stop() error // Stops all and closes watcherFile watching uses fsnotify for instant inotify-based notifications (sub-millisecond latency) instead of polling.
Event triggers support two syntaxes for extracting variables from events:
New exports-style syntax (recommended for multiple variables):
triggers:
- name: on-new-asset
on: event
event:
topic: assets.new
filters:
- "event.data_type == 'subdomain'"
filter_functions:
- "contains(event_data.url, '/api/')" # Utility functions available
input:
target: event_data.url
description: trim(event_data.desc)
source: event.sourceLegacy syntax (single input):
input:
type: event_data
field: url
name: targetThe filter_functions field allows using utility functions (like contains(), starts_with(), etc.) in filters, while filters uses plain JavaScript expressions.
The full event context is available in triggered workflows via the event object:
event.topic- Event topicevent.source- Event sourceevent.data_type- Data typeevent_data.*- Event data fields (shorthand forevent.data.*)
Events are matched using JavaScript expressions with Goja runtime:
func (s *Scheduler) evaluateFilters(filters []string, event *core.Event) bool {
vm := goja.New()
vm.Set("event", eventObj)
vm.Set("event_data", event.Data) // Shorthand access
for _, filter := range filters {
result, _ := vm.RunString(filter)
if !result.ToBoolean() {
return false
}
}
return true
}The workflow linter (internal/linter/) provides static analysis of workflow YAML files to catch common issues before execution.
# Lint a single workflow
osmedeus workflow lint my-workflow.yaml
# Lint by workflow name (searches in workflows path)
osmedeus workflow lint my-workflow
# Lint all workflows in a directory
osmedeus workflow lint /path/to/workflows/
# Output formats
osmedeus workflow lint my-workflow.yaml --format pretty # Default, colored output
osmedeus workflow lint my-workflow.yaml --format json # Machine-readable JSON
osmedeus workflow lint my-workflow.yaml --format github # GitHub Actions annotations
# Filter by severity
osmedeus workflow lint my-workflow.yaml --severity warning # Show warnings and above
osmedeus workflow lint my-workflow.yaml --severity error # Show only errors
# Disable specific rules
osmedeus workflow lint my-workflow.yaml --disable unused-variable,empty-step
# CI mode (exit with error code if issues found)
osmedeus workflow lint my-workflow.yaml --check| Severity | Description | Exit Code |
|---|---|---|
| info | Best practice suggestions (e.g., unused exports) | 0 |
| warning | Potential issues that may cause problems | 0 |
| error | Critical issues that will likely cause failures | 1 (with --check) |
| Rule | Severity | Description |
|---|---|---|
missing-required-field |
warning | Detects missing required fields (name, kind, type) |
duplicate-step-name |
warning | Detects multiple steps with the same name |
empty-step |
warning | Detects steps with no executable content |
unused-variable |
info | Detects exports that are never referenced |
invalid-goto |
warning | Detects decision goto references to non-existent steps |
invalid-depends-on |
warning | Detects depends_on references to non-existent steps |
circular-dependency |
warning | Detects circular references in step dependencies |
Note: The undefined-variable rule is available but not enabled by default as it can produce false positives for dynamically-injected variables.
The linter recognizes all runtime-injected variables to avoid false positives. These include:
Path Variables: BaseFolder, Binaries, Data, ExternalData, ExternalConfigs, Workflows, Workspaces, etc.
Target Variables: Target, target, TargetFile, TargetSpace
Output Variables: Output, output, Workspace, workspace
Metadata Variables: Version, RunUUID, TaskDate, TimeStamp, Today, RandomString
Heuristic Variables: TargetType, TargetRootDomain, TargetTLD, Org, TargetHost, TargetPort, etc.
Chunk Variables: ChunkIndex, ChunkSize, TotalChunks, ChunkStart, ChunkEnd
// internal/linter/linter.go
type Linter struct {
rules []LinterRule
options LinterOptions
}
// LinterRule interface for all lint rules
type LinterRule interface {
Name() string
Description() string
Severity() Severity
Check(ast *WorkflowAST) []LintIssue
}
func (l *Linter) Lint(path string) (*LintResult, error)
func (l *Linter) LintContent(content []byte, filename string) (*LintResult, error)- Create the rule in
internal/linter/rules.go:
type MyNewRule struct{}
func (r *MyNewRule) Name() string { return "my-new-rule" }
func (r *MyNewRule) Description() string { return "Detects my issue" }
func (r *MyNewRule) Severity() Severity { return SeverityWarning }
func (r *MyNewRule) Check(wast *WorkflowAST) []LintIssue {
var issues []LintIssue
// ... implementation
return issues
}- Register in
GetDefaultRules():
func GetDefaultRules() []LinterRule {
return []LinterRule{
// ... existing rules
&MyNewRule{},
}
}The write coordinator batches database operations to reduce I/O by ~70%:
// internal/database/write_coordinator.go
type WriteCoordinator struct {
runID int64
stepResults []*StepResult // Buffered step results
progressDelta int // Accumulated progress updates
artifacts []*Artifact // Buffered artifacts
flushThreshold int // Flush after N step results (default: 10)
flushInterval time.Duration // Flush every interval (default: 5s)
}
// Usage
wc := NewWriteCoordinator(runID, runUUID, nil) // nil uses defaults
defer wc.Close() // Final flush
wc.AddStepResult(stepName, stepType, status, command, output, ...)
wc.IncrementProgress(1)
wc.AddArtifact(path, artifactType)Platform detection functions for environment-aware workflows:
// internal/executor/platform.go
func DetectDocker() bool // Checks /.dockerenv and /proc/1/cgroup
func DetectKubernetes() bool // Checks service account directory
func DetectCloudProvider() string // Returns: aws, gcp, azure, or localThese are exposed as template variables:
{{PlatformOS}}- runtime.GOOS{{PlatformArch}}- runtime.GOARCH{{PlatformInDocker}}- "true" or "false"{{PlatformInKubernetes}}- "true" or "false"{{PlatformCloudProvider}}- aws/gcp/azure/local
// internal/database/database.go
func Connect(cfg *config.Config) (*bun.DB, error) {
switch {
case cfg.IsPostgres():
return connectPostgres(cfg)
case cfg.IsSQLite():
return connectSQLite(cfg)
default:
return nil, fmt.Errorf("unsupported database engine")
}
}// internal/database/models.go
type Run struct {
ID string
RunID string
WorkflowName string
WorkflowKind string // "flow" or "module"
Target string
Params map[string]string
Status string // "pending", "running", "completed", "failed"
Workspace string // Logical workspace name (same as TargetSpace)
StartedAt time.Time
CompletedAt time.Time
ErrorMessage string
ScheduleID string
TriggerType string // "manual", "cron", "event", "api"
TriggerName string
TotalSteps int
CompletedSteps int
RunPriority string // "low", "normal", "high", "critical"
RunMode string // "local", "distributed", "cloud"
HooksEnabled bool // true if workflow has hooks
IsQueued bool // true if queued for delayed execution
WebhookUUID string // UUID for webhook trigger
WebhookAuthKey string // Optional auth key for webhook
CreatedAt time.Time
UpdatedAt time.Time
}
type Asset struct {
ID int64
Workspace string
AssetValue string // Primary identifier (hostname)
URL string
Input string
Scheme string // "http", "https"
Method string
Path string
StatusCode int
ContentType string
ContentLength int64
Title string
Words int
Lines int
HostIP string
A []string // DNS A records (JSON)
TLS string
AssetType string
Tech []string // Technologies (JSON)
Time string // Response time
Remarks string // Labels
Source string // Discovery source
IsCDN bool // Behind CDN (from httpx cdn/cdn_name fields)
IsCloud bool // CDN name matches cloud provider
IsWAF bool // cdn_type == "waf" in httpx data
CreatedAt time.Time
UpdatedAt time.Time
}
type Workspace struct {
ID int64
Name string
LocalPath string
TotalAssets int
TotalSubdomains int
TotalURLs int
TotalVulns int
VulnCritical int
VulnHigh int
VulnMedium int
VulnLow int
VulnPotential int
RiskScore float64
Tags []string // JSON array
LastRun time.Time
RunWorkflow string
CreatedAt time.Time
UpdatedAt time.Time
}
type EventLog struct {
ID int64
Topic string // "run.started", "run.completed", "asset.discovered", etc.
EventID string
Name string
Source string // "executor", "scheduler", "api"
DataType string
Data string // JSON payload
Workspace string
RunID string
WorkflowName string
Processed bool
ProcessedAt time.Time
Error string
CreatedAt time.Time
}
type Schedule struct {
ID string
Name string
WorkflowName string
WorkflowPath string
TriggerName string
TriggerType string // "cron", "event", "watch"
Schedule string // Cron expression
EventTopic string
WatchPath string
Target string // Default target for scheduled runs
Workspace string // Default workspace
Params map[string]string // Additional parameters (JSON)
InputConfig map[string]string // JSON params (deprecated, use Params)
IsEnabled bool
LastRun time.Time
NextRun time.Time
RunCount int
CreatedAt time.Time
UpdatedAt time.Time
}// internal/database/repository/asset_repo.go
type AssetRepository struct {
db *bun.DB
}
func (r *AssetRepository) Create(ctx context.Context, asset *database.Asset) error
func (r *AssetRepository) Search(ctx context.Context, query AssetQuery) ([]*database.Asset, int, error)
func (r *AssetRepository) Upsert(ctx context.Context, asset *database.Asset) error// internal/database/seed.go
func ListSchedules(ctx context.Context, offset, limit int) (*ScheduleResult, error)
func GetScheduleByID(ctx context.Context, id string) (*Schedule, error)
func CreateSchedule(ctx context.Context, input CreateScheduleInput) (*Schedule, error)
func UpdateSchedule(ctx context.Context, id string, input UpdateScheduleInput) (*Schedule, error)
func DeleteSchedule(ctx context.Context, id string) error
func UpdateScheduleLastRun(ctx context.Context, id string) error// internal/database/jsonl.go
type JSONLImporter struct {
db *bun.DB
batchSize int
}
func (i *JSONLImporter) ImportAssets(ctx context.Context, filePath, workspace, source string) (*ImportResult, error)The db_reset_event_logs utility function enables event reprocessing:
// internal/functions/db_functions.go
// db_reset_event_logs(workspace?, topic_pattern?) -> {reset: int, total: int}
// Resets processed event logs back to unprocessed state
// Examples:
db_reset_event_logs() // Reset all processed events
db_reset_event_logs("example.com") // Reset events for workspace
db_reset_event_logs("", "db.*") // Reset events matching topic pattern (glob)
db_reset_event_logs("example.com", "assets.*") // Both filtersTopic patterns use glob syntax (* matches any characters, ? matches single character).
Osmedeus supports importing and analyzing results from SAST (Static Application Security Testing) tools that produce SARIF output.
| Tool | Type | SARIF Output |
|---|---|---|
| Semgrep | Code analysis | semgrep --sarif -o results.sarif |
| Trivy | Container/FS scanning | trivy fs --format sarif -o results.sarif |
| Kingfisher | Dependency checks | Native SARIF output |
| Bearer | API key detection | Native SARIF output |
SARIF File → Parse runs/results/rules → Map severity → Upsert into database
↓
{new, updated, unchanged, errors, total}
The import function:
- Parses the SARIF JSON structure (runs → results → rules/locations)
- Maps SARIF severity levels to osmedeus severity (error→high, warning→medium, note→low)
- Upserts findings into the database with deduplication
- Marks assets with
asset_type='repo'for code-level analysis - Returns stats:
{new, updated, unchanged, errors, total}
convert_sarif_to_markdown() generates severity-sorted tables with:
- Severity counts summary
- Location (file:line)
- Rule ID, title, and description
Workflows support pre/post execution hooks that run before and after the main steps:
hooks:
pre_scan_steps:
- name: setup-env
type: bash
command: mkdir -p {{Output}}/results
post_scan_steps:
- name: notify
type: function
function: |
generate_event("{{Workspace}}", "scan.completed", "workflow", "status", "done")// internal/core/workflow.go
type WorkflowHooks struct {
PreScanSteps []Step `yaml:"pre_scan_steps,omitempty"`
PostScanSteps []Step `yaml:"post_scan_steps,omitempty"`
}Pre-scan steps execute before the main workflow steps. Post-scan steps execute after all main steps complete. Both use the same Step type as regular workflow steps and support all step types (bash, function, etc.).
The Hooks field is tracked on Run records via HooksEnabled for metadata purposes.
The queue system enables delayed task execution with dual-source polling from database and Redis:
1. Queue task: osmedeus worker queue new -f <flow> -t <target>
└── Creates Run record with is_queued=true, status="queued"
└── Optionally pushes to Redis queue
2. Poll & Execute: osmedeus worker queue run
├── DB poller: Checks every 5s for is_queued=true runs
├── Redis poller: BRPOP on task queue (optional)
├── Dedup: Track seen runUUIDs to avoid duplicates
├── Executor: Run workflow, update status
└── Concurrency: Configurable parallel workers
// pkg/cli/worker_queue.go
type QueuePoller struct {
config QueuePollerConfig
taskChan chan *QueuedTask
seen sync.Map // Deduplication
}
type QueuedTask struct {
RunUUID string
WorkflowName string
Target string
Params map[string]string
InputIsFile bool
InputFilePath string
}osmedeus worker queue list # List queued tasks
osmedeus worker queue new -f <flow> -t <target> # Queue task for later
osmedeus worker queue new -m <module> -T targets.txt -p key=value
osmedeus worker queue run --concurrency 5 # Process queued tasksFunctions for nmap port scanning and result processing:
// internal/functions/nmap_functions.go
// nmap_to_jsonl converts nmap XML or gnmap output to JSONL format
// Supports .xml, .gnmap, .nmap (auto-detects format)
// Output: {asset_value, host_ip, asset_type, open_ports, ports}
func (vf *vmFunc) nmapToJSONL(call goja.FunctionCall) goja.Value
// run_nmap executes nmap and auto-converts results to JSONL
// Default flags: "-sV -T4"
func (vf *vmFunc) runNmap(call goja.FunctionCall) goja.ValueUsage in workflows:
steps:
- name: port-scan
type: function
function: |
run_nmap("{{Target}}", "-sV -T4 --top-ports 1000", "{{Output}}/nmap-scan")
- name: import-ports
type: function
function: |
db_import_port_assets("{{Workspace}}", "{{Output}}/nmap-scan.jsonl")The db_import_port_assets(workspace, file_path, source?) function imports JSONL output from nmap_to_jsonl into the database with asset_type=ip.
Functions for managing long-running background processes via tmux:
// internal/functions/tmux_functions.go
tmux_run(command, session_name?) // Create detached session (auto-name: bosm-<random8>)
tmux_capture(session_name) // Capture pane output ("all" for all sessions)
tmux_send(session_name, command) // Send keystrokes + Enter
tmux_kill(session_name) // Destroy session
tmux_list() // List active session namesUsage in workflows:
steps:
- name: start-background-scan
type: function
function: |
tmux_run("nmap -sV {{Target}}", "scan-session")
- name: check-output
type: function
function: |
tmux_capture("scan-session")Functions for remote execution and file synchronization across distributed workers:
// internal/functions/ssh_functions.go
ssh_exec(host, command, user?, key_path?, password?, port?) // Remote command (pooled connection)
ssh_rsync(host, src, dest, user?, key_path?, password?, port?) // Copy via rsync+SSH
sync_from_master(src, dest) // Pull from master (local cp fallback)
sync_from_worker(identifier, ip, src, dest) // Pull from specific worker
rsync_to_worker(identifier, ip, src, dest) // Push to specific workerDistributed coordination uses a hooks pattern to avoid circular imports:
// internal/functions/execute_hooks.go
type ExecuteHooks struct {
SendExecuteRequest func(ctx, action, expr, ...) error
ShouldUseRedis func() bool
ResolveWorkerSSH func(ctx, identifier) (*WorkerSSHInfo, error)
}
RegisterExecuteHooks(hooks *ExecuteHooks) // Register at startup
UnregisterExecuteHooks() // CleanupThe distributed package registers hooks at startup, allowing SSH/sync functions to coordinate across workers without importing the distributed package directly.
API endpoints for triggering workflow runs via webhooks:
// pkg/server/handlers/webhook_runs.go
GET /osm/api/webhook-runs // List webhook-enabled runs (authenticated)
GET /osm/api/webhook-runs/{uuid}/trigger // Trigger via GET (unauthenticated)
POST /osm/api/webhook-runs/{uuid}/trigger // Trigger with overrides (unauthenticated)Runs with webhook_uuid set serve as templates. The trigger endpoint is unauthenticated by default, with optional ?key=<auth_key> protection. POST body can override target, flow, or module.
New database fields on Run model:
webhook_uuid- UUID v4 identifier for webhookwebhook_auth_key- Optional authentication key
internal/functions/registry_test.go # Function unit tests
internal/parser/loader_test.go # Parser/loader unit tests
internal/runner/runner_test.go # Runner unit tests
internal/executor/executor_test.go # Executor unit tests
internal/scheduler/scheduler_test.go # Scheduler unit tests
pkg/server/handlers/handlers_test.go # API handler unit tests
test/integration/workflow_test.go # Workflow integration tests
test/e2e/ # E2E CLI tests
├── e2e_test.go # Common test helpers
├── version_test.go # Version command tests
├── health_test.go # Health command tests
├── workflow_test.go # Workflow command tests
├── function_test.go # Function command tests
├── scan_test.go # Scan command tests
├── server_test.go # Server command tests
├── worker_test.go # Worker command tests
├── distributed_test.go # Distributed scan e2e tests
├── ssh_test.go # SSH runner e2e tests (module & step level)
├── api_test.go # API endpoint e2e tests (all routes)
├── agent_test.go # Agent step e2e tests
├── agent_acp_test.go # Agent-ACP step e2e tests
├── canary_test.go # Canary tests (real-world scans in Docker)
├── cloud_test.go # Cloud CLI e2e tests
├── db_clean_test.go # Database cleanup e2e tests
├── hooks_test.go # Workflow hooks e2e tests
└── worker_test.go # Worker management e2e tests
# All unit tests (fast, no external dependencies)
make test-unit
# Integration tests (requires Docker)
make test-integration
# E2E CLI tests (requires binary build)
make test-e2e
# SSH E2E tests - full workflow tests with SSH runner
# Tests both module-level (runner: ssh) and step-level (step_runner: ssh)
# Uses linuxserver/openssh-server Docker container
make test-e2e-ssh
# API E2E tests - tests all API endpoints
# Starts Redis, seeds database, starts server, tests all routes
make test-e2e-api
# Distributed scan e2e tests (requires Docker for Redis)
make test-distributed
# Docker runner tests
make test-docker
# SSH runner unit tests (using linuxserver/openssh-server)
make test-ssh
# Canary tests (real-world scans in Docker, 20-60 min each)
make test-canary-all # All canary scenarios (60-90min)
make test-canary-repo # SAST on juice-shop (~25min)
make test-canary-domain # Domain recon on hackerone.com (~20min)
make test-canary-ip # CIDR scanning (~25min)
make test-canary-general # Domain-list-recon on hackerone.com subdomains (~40min)
make canary-up # Build & start canary container (shared setup)
make canary-down # Teardown canary container
# All tests with coverage
make test-coverageUse testify for assertions:
func TestMyFeature(t *testing.T) {
// Arrange
tmpDir := t.TempDir()
// Act
result, err := myFunction(tmpDir)
// Assert
require.NoError(t, err)
assert.Equal(t, expected, result)
}For integration tests, use build tags:
func TestDockerRunner_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
// ...
}Canary tests are real-world integration tests that run actual security scans inside a Docker container. They verify the full pipeline from workflow execution through database persistence and API reporting.
1. Build canary Docker image (multi-stage: Go 1.25 builder → Ubuntu 24.04 runtime)
2. Compile osmedeus from current source (not released binaries)
3. Layer onto toolbox image with pre-installed SAST tools (Trivy, Semgrep, Kingfisher)
4. Start API server in background on :8002
5. Run scan workflows against real targets
6. Verify: filesystem artifacts + API responses + database records
| Test | Target | Duration | What It Tests |
|---|---|---|---|
TestCanary_Repo |
juice-shop | ~25min | SAST scanning, SARIF import, vulnerability DB |
TestCanary_Domain |
hackerone.com | ~20min | DNS enumeration, subdomain discovery |
TestCanary_CIDR |
Public IPs | ~25min | Network range scanning |
TestCanary_General |
hackerone.com subdomains | ~40min | Domain-list-recon (probing, fingerprinting, scanning) |
TestCanary_FullSuite |
All above | ~90min | Complete lifecycle with container management |
# Full suite (builds container → runs all 3 → cleans up)
make test-canary-all
# Individual scenarios (each handles container lifecycle)
make test-canary-repo
make test-canary-domain
make test-canary-ip
# Manual container management for development
make canary-up # Build & start container
make canary-down # Stop & cleanupCanary tests assert across three layers:
- Filesystem: SARIF files, markdown reports, text outputs exist
- API: Runs, assets, vulnerabilities accessible via REST endpoints
- Database: Workspace records, total counts, vulnerability severity breakdown
Osmedeus supports installing base folders and workflows from curated preset repositories for reproducible deployments.
# Install base folder from preset repository
osmedeus install base --preset
# Install base and restore previous osm-settings.yaml (API keys, Redis config, etc.)
osmedeus install base --preset --keep-setting
# Install workflows from preset repository
osmedeus install workflow --preset
# Validate and install ready-to-use base
osmedeus install validate --presetWhen install base runs, the entire base folder (including osm-settings.yaml) is deleted and replaced. To prevent losing custom settings:
- Automatic backup:
osm-settings.yamlis always backed up to~/osmedeus-base/backup-osm-settings.yamlbefore removal --keep-settingflag: Restores the previousosm-settings.yamlover the newly installed one after installation
This is handled by the Installer.KeepSetting field in internal/installer/installer.go.
| Variable | Default | Description |
|---|---|---|
OSM_PRESET_URL |
Default base repo | Override base preset source |
OSM_WORKFLOW_URL |
Default workflow repo | Override workflow preset source |
OSM_IGNORE_REGISTRY |
(unset) | Skip auto binary installation and binary health check |
Preset installation is useful for Docker images and CI/CD pipelines where reproducible, stable deployments from tested configurations are preferred over manual source specification.
- Define the type in
internal/core/types.go:
const StepTypeMyNew StepType = "mynew"- Create executor in
internal/executor/mynew_executor.go:
type MyNewExecutor struct {
templateEngine *template.Engine
}
func (e *MyNewExecutor) Execute(ctx context.Context, step *core.Step,
execCtx *core.ExecutionContext) (*core.StepResult, error) {
// Implementation
}- Register in dispatcher (
internal/executor/dispatcher.go):
func (d *StepDispatcher) Dispatch(...) (*core.StepResult, error) {
switch step.Type {
// ...
case core.StepTypeMyNew:
return d.myNewExecutor.Execute(ctx, step, execCtx)
}
}- Create runner in
internal/runner/myrunner.go:
type MyRunner struct {
config *core.RunnerConfig
}
func (r *MyRunner) Execute(ctx context.Context, command string) (*CommandResult, error)
func (r *MyRunner) Setup(ctx context.Context) error
func (r *MyRunner) Cleanup(ctx context.Context) error
func (r *MyRunner) Type() core.RunnerType
func (r *MyRunner) IsRemote() bool- Add type in
internal/core/types.go:
const RunnerTypeMy RunnerType = "myrunner"- Register in factory (
internal/runner/runner.go):
func NewRunnerFromType(runnerType core.RunnerType, ...) (Runner, error) {
switch runnerType {
case core.RunnerTypeMy:
return NewMyRunner(config, binaryPath)
}
}- Create installer in
internal/installer/mymode.go:
func InstallBinaryViaMyMode(name, pkg, binariesFolder string) error {
// Implementation
}- Add flag in
pkg/cli/install.go:
installBinaryCmd.Flags().BoolVar(&myModeInstall, "my-mode-install", false, "use MyMode to install")- Register in
runInstallBinary()switch statement.
See internal/installer/nix.go for a complete example.
- Add handler in
pkg/server/handlers/handlers.go:
func MyHandler(cfg *config.Config) fiber.Handler {
return func(c *fiber.Ctx) error {
// Implementation
return c.JSON(fiber.Map{"data": result})
}
}- Register route in
pkg/server/server.go:
func (s *Server) setupRoutes() {
// ...
api.Get("/my-endpoint", handlers.MyHandler(s.config))
}- Create command file in
pkg/cli/mycommand.go:
var myCmd = &cobra.Command{
Use: "mycommand",
Short: "Description",
RunE: func(cmd *cobra.Command, args []string) error {
// Implementation
},
}
func init() {
myCmd.Flags().StringVarP(&myFlag, "flag", "f", "", "description")
}- Register in
pkg/cli/root.go:
func init() {
rootCmd.AddCommand(myCmd)
}osmedeus func- alias forosmedeus functionosmedeus func e- alias forosmedeus function evalosmedeus db ls- alias forosmedeus db list
Query and manage database tables directly from the CLI:
# List all tables with row counts
osmedeus db list
# Query specific table (default columns shown)
osmedeus db list --table event_logs
# List available columns for a table
osmedeus db list --table event_logs --list-columns
# Filter by specific columns
osmedeus db list --table event_logs --columns topic,source,data_type,data
# Show all columns including hidden ones (id, timestamps)
osmedeus db list --table event_logs --all
# Filter by field value
osmedeus db list --table event_logs --where topic=assets.new
osmedeus db list --table event_logs --where processed=false
# Search across all columns
osmedeus db list --table event_logs --search "nuclei"
# Output as JSON for scripting
osmedeus db list --table event_logs --json
# Pagination
osmedeus db list --table event_logs --offset 50 --limit 100Default columns per table:
runs: run_id, job_id, workflow_name, target, status, started_atevent_logs: topic, source, processed, data_type, workspace, dataassets: asset_value, host_ip, title, status_code, last_seen_at, technologiesschedules: name, workflow_name, trigger_type, schedule, is_enabled, run_count
Evaluate utility functions from the command line with bulk processing support:
# Single expression evaluation
osmedeus func eval 'log_info("hello")'
osmedeus func eval -e 'fileLength("/path/to/file.txt")'
# With target variable
osmedeus func eval -e 'httpGet("https://" + target)' -t example.com
# Bulk processing from file (target variable available in script)
osmedeus func eval -e 'log_info("Processing: " + target)' -T targets.txt
# Bulk processing with concurrency
osmedeus func eval -e 'httpGet("https://" + target)' -T targets.txt -c 10
# Using function files for reusable logic
osmedeus func eval --function-file check-host.js -T targets.txt -c 5
# Additional parameters
osmedeus func eval -e 'log_info(target + " in " + ws)' -T targets.txt --params ws=production
# Function name with arguments
osmedeus func eval log_info "hello world"
osmedeus func eval -f httpGet "https://example.com"
# Read script from stdin
echo 'log_info("hello")' | osmedeus func eval --stdin
# List available functions
osmedeus func list
osmedeus func list event # Filter by category-c, --concurrency- Number of targets to scan concurrently--timeout- Scan timeout (e.g.,2h,3h,1d)--repeat- Repeat scan after completion--repeat-wait-time- Wait time between repeats (e.g.,30m,1h,1d)-mcan be specified multiple times to run modules in sequence-x, --exclude <module>- Exclude module(s) from flow execution (exact match, repeatable)-X, --fuzzy-exclude <substr>- Exclude modules whose name contains substring (repeatable)
# Worker status and management
osmedeus worker status # Show registered workers
osmedeus worker status --columns id,alias,ip,status # Custom columns
osmedeus worker status -s "query" # Search/filter workers
osmedeus worker eval -e '<expr>' # Evaluate with distributed hooks
osmedeus worker set <id-or-alias> <field> <value> # Update worker metadata
# Queue system for delayed execution
osmedeus worker queue list # List queued tasks
osmedeus worker queue new -f <flow> -t <target> # Queue a task
osmedeus worker queue new -m <module> -T targets.txt -p key=value
osmedeus worker queue run --concurrency 5 # Process queued tasks# List assets (paginated table output)
osmedeus assets
osmedeus assets -w example.com # Filter by workspace
osmedeus assets --source httpx # Filter by source
osmedeus assets --type web # Filter by asset type
osmedeus assets "api.example" # Search by keyword
# Customize output columns
osmedeus assets --columns url,title,status_code
osmedeus assets --exclude-columns raw_json_data,raw_response
osmedeus assets --all # Show all columns including hidden ones
# Pagination
osmedeus assets --limit 100 --offset 50
# Asset statistics (unique technologies, sources, remarks, types)
osmedeus assets --stats
osmedeus assets --stats -w example.com # Stats for specific workspace
# JSON output (for scripting)
osmedeus assets --json
osmedeus assets --stats --json- Use
osmedeus --usage-exampleto see comprehensive examples for all commands - Use
--verboseor--debugfor detailed logging - Use
--dry-runto preview scan execution without running commands - Use
--log-file-tmpto create timestamped log files for debugging
- Use
go fmtandgolangci-lint - Follow Go naming conventions
- Use structured logging with zap
- Return errors, don't panic
- Use context for cancellation
- Write tests for new features
# Build
make build
# Test
make test-unit
# Format
make fmt
# Lint
make lint
# Tidy dependencies
make tidy
# Generate (if needed)
make generate
# Generate Swagger docs
make swagger
# Update embedded UI from dashboard build
make update-ui
# Install to $GOBIN
make install
# Docker Toolbox (all tools pre-installed)
make docker-toolbox # Build toolbox image
make docker-toolbox-run # Start toolbox container
make docker-toolbox-shell # Enter container shell
# Canary tests (real-world scans in Docker)
make test-canary-all # All scenarios (30-60min)
make canary-up # Start canary container
make canary-down # Cleanup canary container