Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
rootCmd.AddCommand(cmd.Retry())
rootCmd.AddCommand(cmd.StartAll())
rootCmd.AddCommand(cmd.Migrate())
rootCmd.AddCommand(cmd.Mcp())
rootCmd.AddCommand(cmd.Cleanup())

config.Version = version
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/yoheimuta/go-protoparser/v4 v4.14.2 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.augendre.info/arangolint v0.2.0 // indirect
go.augendre.info/fatcontext v0.8.1 // indirect
Expand Down Expand Up @@ -374,6 +375,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/modelcontextprotocol/go-sdk v1.1.0
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/samber/lo v1.51.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g
github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/modelcontextprotocol/go-sdk v1.1.0 h1:Qjayg53dnKC4UZ+792W21e4BpwEZBzwgRW6LrjLWSwA=
github.com/modelcontextprotocol/go-sdk v1.1.0/go.mod h1:6fM3LCm3yV7pAs8isnKLn07oKtB0MP9LHd3DfAcKw10=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/moricho/tparallel v0.3.2 h1:odr8aZVFA3NZrNybggMkYO3rgPRcqjeQUlBBFVxKHTI=
Expand Down Expand Up @@ -868,6 +870,8 @@ github.com/yoheimuta/go-protoparser/v4 v4.14.2 h1:/P/LlX1CF9NaTWEltGcIZVvNlPbhAB
github.com/yoheimuta/go-protoparser/v4 v4.14.2/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8=
github.com/yoheimuta/protolint v0.55.6 h1:vEDWGz768e3LDxMoOUC8fDQDdXnH9JKV/k9rlHD+ZhY=
github.com/yoheimuta/protolint v0.55.6/go.mod h1:XrnOc0O5mckLR1GAOjqMPdb3R3ZEfLkMpLoq5RxxoG0=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
6 changes: 6 additions & 0 deletions internal/cmd/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dagu-org/dagu/internal/runtime"
"github.com/dagu-org/dagu/internal/service/coordinator"
"github.com/dagu-org/dagu/internal/service/frontend"
"github.com/dagu-org/dagu/internal/service/mcpserver"
"github.com/dagu-org/dagu/internal/service/resource"
"github.com/dagu-org/dagu/internal/service/scheduler"
"github.com/google/uuid"
Expand Down Expand Up @@ -263,6 +264,11 @@ func (c *Context) NewScheduler() (*scheduler.Scheduler, error) {
return scheduler.New(c.Config, m, c.DAGRunMgr, c.DAGRunStore, c.QueueStore, c.ProcStore, c.ServiceRegistry, coordinatorCli)
}

// NewMcpServer creates a new NewMcpServer instance
func (c *Context) NewMcpServer() (*mcpserver.MCPServer, error) {
return mcpserver.New(c.Config)
}

// StringParam retrieves a string parameter from the command line flags.
// It checks if the parameter is wrapped in quotes and removes them if necessary.
func (c *Context) StringParam(name string) (string, error) {
Expand Down
36 changes: 36 additions & 0 deletions internal/cmd/mcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"
)

func Mcp() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "mcp",
Short: "Start the MCP Server",
Long: "Start the MCP Server",
Args: cobra.ArbitraryArgs,
}, mcpFlags, runMcp,
)
}

// Command line flags for the mcp command
var mcpFlags = []commandLineFlag{}

// runMcp handles the execution of the mcp command
func runMcp(ctx *Context, _ []string) error {
mcpserver, err := ctx.NewMcpServer()
if err != nil {
return fmt.Errorf("failed to initialize mcp server: %w", err)
}

if err := mcpserver.Start(ctx); err != nil {
return fmt.Errorf("failed to start mcp server in directory %s: %w",
ctx.Config.Paths.DAGsDir, err)
}

return nil
}
237 changes: 237 additions & 0 deletions internal/service/mcpserver/mcpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package mcpserver

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"path/filepath"
"sync/atomic"
"time"

"github.com/dagu-org/dagu/internal/common/config"
"github.com/dagu-org/dagu/internal/common/dirlock"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/google/jsonschema-go/jsonschema"
"github.com/modelcontextprotocol/go-sdk/mcp"
)

type MCPServer struct {
server *mcp.Server
logDir string
running atomic.Bool
config *config.Config
dirLock dirlock.DirLock
location *time.Location
}

func New(cfg *config.Config) (*MCPServer, error) {

timeLoc := cfg.Global.Location
if timeLoc == nil {
timeLoc = time.Local
}
lockOpts := &dirlock.LockOptions{
StaleThreshold: cfg.Scheduler.LockStaleThreshold,
RetryInterval: cfg.Scheduler.LockRetryInterval,
}
lockDir := filepath.Join(cfg.Paths.DataDir, "mcpserver", "locks")
dirLock := dirlock.New(lockDir, lockOpts)

s := mcp.NewServer(&mcp.Implementation{Name: "server", Version: "v0.0.1"}, nil)

s.AddTool(&mcp.Tool{
Name: "list_DAGs",
Description: "List all the Direct Acyclic Graphs (DAGs) in the server",
InputSchema: &jsonschema.Schema{
Type: "object",
},
}, listDags)

s.AddTool(&mcp.Tool{
Name: "list_DAG_runs",
Description: "List all the Direct Acyclic Graphs (DAGs) runs in the server",
InputSchema: &jsonschema.Schema{
Type: "object",
},
}, listDagRuns)

s.AddTool(&mcp.Tool{
Name: "get_DAG",
Description: `Get the details and description of a Direct Acyclic Graph (DAG) from its name.
It gives the workflows details, steps and configuration.`,
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {Type: "string", Description: "Name of the DAG"},
},
},
}, getDag)

/*
s.AddTool(&mcp.Tool{
Name: "execute_DAG",
Description: "Execute a specific Workflow",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {Type: "string", MaxLength: jsonschema.Ptr(256)},
},
},
}, executeDag)


s.AddTool(&mcp.Tool{
Name: "create_DAG",
Description: "Create a new workflow",
InputSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {Type: "string", MaxLength: jsonschema.Ptr(10)},
},
},
}, createDag)
*/
return &MCPServer{
server: s,
logDir: cfg.Paths.LogDir,
location: timeLoc,
dirLock: dirLock,
}, nil
}

func listDags(ctx context.Context, ctr *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// host := os.Getenv("DAGU_HOST")
host := "localhost"
// port := os.Getenv("DAGU_PORT")
port := "8080"
//api_base_url := os.Getenv("DAGU_API_BASE_URL")
api_base_url := "api/v2"

resp, err := http.Get(fmt.Sprintf("http://%s:%s/%s/dags", host, port, api_base_url))
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)

return &mcp.CallToolResult{
Content: []mcp.Content{
&mcp.TextContent{Text: string(body)},
},
IsError: err != nil,
}, nil
}

func listDagRuns(ctx context.Context, ctr *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// host := os.Getenv("DAGU_HOST")
host := "localhost"
// port := os.Getenv("DAGU_PORT")
port := "8080"
//api_base_url := os.Getenv("DAGU_API_BASE_URL")
api_base_url := "api/v2"

resp, err := http.Get(fmt.Sprintf("http://%s:%s/%s/dag-runs", host, port, api_base_url))
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)

return &mcp.CallToolResult{
Content: []mcp.Content{
&mcp.TextContent{Text: string(body)},
},
IsError: err != nil,
}, nil
}

func getDag(ctx context.Context, ctr *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var arg struct {
Name string `json:"name"`
}

// host := os.Getenv("DAGU_HOST")
host := "localhost"
// port := os.Getenv("DAGU_PORT")
port := "8080"
//api_base_url := os.Getenv("DAGU_API_BASE_URL")
api_base_url := "api/v2"

if err := json.Unmarshal(ctr.Params.Arguments, &arg); err != nil {
return nil, err
}

resp, err := http.Get(fmt.Sprintf("http://%s:%s/%s/dags/%s", host, port, api_base_url, arg.Name))
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)

return &mcp.CallToolResult{
Content: []mcp.Content{
&mcp.TextContent{Text: string(body)},
},
IsError: err != nil,
}, nil
}

/*
func executeDag(ctx context.Context, ctr *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// host := os.Getenv("DAGU_HOST")
host := "localhost"
// port := os.Getenv("DAGU_PORT")
port := "8080"
// api_base_url := os.Getenv("DAGU_API_BASE_URL")
api_base_url := "api/v2"

resp, err := http.Get(fmt.Sprintf("http://%s:%s/%s/dags", host, port, api_base_url))

return &mcp.CallToolResult{
StructuredContent: resp.Body,
IsError: err != nil,
}, nil
}

func createDag(ctx context.Context, ctr *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// host := os.Getenv("DAGU_HOST")
host := "localhost"
// port := os.Getenv("DAGU_PORT")
port := "8080"
// api_base_url := os.Getenv("DAGU_API_BASE_URL")
api_base_url := "api/v2"

resp, err := http.Post(fmt.Sprintf("http://%s:%s/%s/dags", host, port, api_base_url))

return &mcp.CallToolResult{
StructuredContent: resp.Body,
IsError: err != nil,
}, nil
}
*/
func (s *MCPServer) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Acquire directory lock first to prevent multiple mcp server instances
logger.Info(ctx, "Waiting to acquire mcp server lock")
if err := s.dirLock.Lock(ctx); err != nil {
return fmt.Errorf("failed to acquire mcp server lock: %w", err)
}

logger.Info(ctx, "Acquired mcp server lock")
defer s.dirLock.Unlock()

if err := s.server.Run(ctx, &mcp.StdioTransport{}); err != nil {
logger.Error(ctx, "MCP server failed", slog.String("error", err.Error()))
return fmt.Errorf("mcp server failed: %w", err)
}

return nil
}