Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cli/cmd/ds-load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

func main() {
pluginEnum := ""

pluginFinder, err := plugin.NewHomeDirFinder(true)
if err != nil {
os.Stderr.WriteString(err.Error())
Expand All @@ -26,9 +27,11 @@ func main() {
os.Stderr.WriteString(err.Error())
os.Exit(1)
}

for _, p := range plugins {
pluginEnum += (p.Name + "|")
}

pluginEnum = strings.TrimSuffix(pluginEnum, "|")

yamlLoader := kongyaml.NewYAMLResolver("")
Expand All @@ -52,9 +55,12 @@ func main() {
}

kongCtx := kong.Parse(&cli, options...)

ctx := cc.NewCommonContext(cli.Verbosity, string(cli.Config))

if err := kongCtx.Run(ctx); err != nil {
kongCtx.FatalIfErrorf(err)
}

os.Exit(common.GetExitCode())
}
4 changes: 3 additions & 1 deletion cli/pkg/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (listPlugins *ListPluginsCmd) Run(c *cc.CommonCtx) error {
if err != nil {
return err
}

plugins, err := find.Find()
if err != nil {
return err
Expand All @@ -52,8 +53,8 @@ func (listPlugins *ListPluginsCmd) Run(c *cc.CommonCtx) error {
for _, p := range plugins {
os.Stdout.WriteString(p.Name + " " + p.Path + "\n")
}
return nil

return nil
}

type VersionCmd struct{}
Expand All @@ -63,5 +64,6 @@ func (cmd *VersionCmd) Run(c *cc.CommonCtx) error {
constants.AppName,
version.GetInfo().String(),
)

return nil
}
27 changes: 23 additions & 4 deletions cli/pkg/app/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type ExecCmd struct {

func (e *ExecCmd) Run(c *cc.CommonCtx) error {
defaultPrintCmd := []string{"fetch", "version", "export-transform"}
var err error
var find *plugin.Finder

var (
err error
find *plugin.Finder
)

if e.PluginFolder != "" {
find = plugin.NewFinder(true, e.PluginFolder)
} else {
Expand All @@ -42,24 +46,29 @@ func (e *ExecCmd) Run(c *cc.CommonCtx) error {
return err
}
}

pl := e.CommandArgs[0]

plugins, err := find.Find()
if err != nil {
return err
}

for _, p := range plugins {
if pl == p.Name {
e.execPlugin = p
break
}
}

if e.execPlugin == nil {
return errors.Errorf("plugin [%s] not found", pl)
}

e.pluginArgs = e.CommandArgs[1:]

var pluginSubCommand string

if len(e.CommandArgs) > 1 {
pluginSubCommand = e.CommandArgs[1]
}
Expand All @@ -73,6 +82,7 @@ func (e *ExecCmd) Run(c *cc.CommonCtx) error {
if err != nil {
return errors.Wrap(err, "Could not connect to the directory")
}

e.publisher = publish.NewDirectoryPublisher(c, dirClient)
}

Expand All @@ -83,9 +93,13 @@ func (e *ExecCmd) LaunchPlugin(c *cc.CommonCtx) error {
if (!slices.Contains(e.pluginArgs, "-c") || !slices.Contains(e.pluginArgs, "--config")) && c.ConfigPath != "" {
e.pluginArgs = append(e.pluginArgs, "-c", c.ConfigPath)
}

pluginCmd := exec.Command(e.execPlugin.Path, e.pluginArgs...) //nolint:gosec
var pStdout io.ReadCloser
var wg sync.WaitGroup

var (
pStdout io.ReadCloser
wg sync.WaitGroup
)

pStderr, err := pluginCmd.StderrPipe()
if err != nil {
Expand All @@ -94,6 +108,7 @@ func (e *ExecCmd) LaunchPlugin(c *cc.CommonCtx) error {
defer pStderr.Close()

wg.Add(1)

go listenOnStderr(c, &wg, pStderr)

if e.Print {
Expand All @@ -110,6 +125,7 @@ func (e *ExecCmd) LaunchPlugin(c *cc.CommonCtx) error {
if err != nil {
return err
}

if (fi.Mode() & os.ModeCharDevice) == 0 {
pluginCmd.Stdin = os.Stdin
}
Expand All @@ -122,12 +138,14 @@ func (e *ExecCmd) LaunchPlugin(c *cc.CommonCtx) error {
if !e.Print {
err = e.publisher.Publish(c.Context, pStdout)
}

if err != nil {
wg.Wait()
return err
}

wg.Wait()

return pluginCmd.Wait()
}

Expand All @@ -154,5 +172,6 @@ func listenOnStderr(c *cc.CommonCtx, wg *sync.WaitGroup, stderr io.ReadCloser) {
c.Log.Fatal().Err(err)
}
}

wg.Done()
}
1 change: 1 addition & 0 deletions cli/pkg/app/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (l *PublishCmd) Run(commonCtx *cc.CommonCtx) error {
if err != nil {
return errors.Wrap(err, "Could not connect to the directory")
}

publisher = publish.NewDirectoryPublisher(commonCtx, dirClient)

return l.processMessagesFromStdIn(commonCtx, publisher)
Expand Down
3 changes: 3 additions & 0 deletions cli/pkg/clients/directory_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ func validate(cfg *Config) error {
opts := []grpc.DialOption{
grpc.WithUserAgent("ds-load " + version.GetInfo().Version),
}

if cfg.Insecure {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

if _, err := grpcurl.BlockingDial(ctx, "tcp", cfg.Host, creds, opts...); err != nil {
return err
}

return nil
}
3 changes: 3 additions & 0 deletions cli/pkg/plugin/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func NewHomeDirFinder(env bool) (*Finder, error) {
func (f Finder) Find() ([]*Plugin, error) {
addedPlugins := []string{}
dirs := f.dirs

if f.env {
pathEnv := os.Getenv("PATH")
dirs = append(dirs, strings.Split(pathEnv, string(os.PathListSeparator))...)

pwd, err := os.Getwd()
if err != nil {
fmt.Println(err)
Expand All @@ -55,6 +57,7 @@ func (f Finder) Find() ([]*Plugin, error) {
if err != nil {
return nil, err
}

if len(files) > 0 {
for _, f := range files {
p := NewPlugin(f)
Expand Down
2 changes: 2 additions & 0 deletions cli/pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ func NewPlugin(path string) *Plugin {
func pluginName(path string) string {
file := filepath.Base(path)
name := strings.TrimPrefix(file, constants.PluginPrefix)

if runtime.GOOS == "windows" {
name = strings.TrimSuffix(name, ".exe")
}

return name
}
15 changes: 15 additions & 0 deletions cli/pkg/publish/publisher_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ func (p *DirectoryPublisher) Publish(ctx context.Context, reader io.Reader) erro

for {
var message msg.Transform

err := jsonReader.ReadProtoMessage(&message)
if err == io.EOF {
break
}

if err != nil {
return err
}

err = p.publishMessages(ctx, &message)
if err != nil {
return err
Expand All @@ -63,6 +66,7 @@ func (p *DirectoryPublisher) Publish(ctx context.Context, reader io.Reader) erro
if p.objCounter != nil {
printCounter(os.Stdout, p.objCounter)
}

if p.relCounter != nil {
printCounter(os.Stdout, p.relCounter)
}
Expand All @@ -77,11 +81,14 @@ func (p *DirectoryPublisher) Publish(ctx context.Context, reader io.Reader) erro

func (p *DirectoryPublisher) publishMessages(ctx context.Context, message *msg.Transform) error {
errGroup, iCtx := errgroup.WithContext(ctx)

stream, err := p.importerClient.Import(iCtx)
if err != nil {
return err
}

errGroup.Go(p.receiver(stream))

errGroup.Go(p.doneHandler(stream.Context()))

opCode := message.OpCode
Expand All @@ -95,16 +102,19 @@ func (p *DirectoryPublisher) publishMessages(ctx context.Context, message *msg.T
fmt.Fprintf(os.Stderr, "validation failed, object: [%s] type [%s]\n", object.Id, object.Type)
continue
}

if (opCode == dsi3.Opcode_OPCODE_DELETE || opCode == dsi3.Opcode_OPCODE_DELETE_WITH_RELATIONS) && object.Type == "group" {
continue
}

fmt.Fprintf(os.Stdout, "object: [%s] type [%s]\n", object.Id, object.Type)
sErr := stream.Send(&dsi3.ImportRequest{
Msg: &dsi3.ImportRequest_Object{
Object: object,
},
OpCode: opCode,
})

p.handleStreamError(sErr)
}

Expand All @@ -114,13 +124,15 @@ func (p *DirectoryPublisher) publishMessages(ctx context.Context, message *msg.T
fmt.Fprintf(os.Stderr, "validation failed, relation: [%s] obj: [%s] subj [%s]\n", relation.Relation, relation.ObjectId, relation.SubjectId)
continue
}

fmt.Fprintf(os.Stdout, "relation: [%s] obj: [%s] subj [%s]\n", relation.Relation, relation.ObjectId, relation.SubjectId)
sErr := stream.Send(&dsi3.ImportRequest{
Msg: &dsi3.ImportRequest_Relation{
Relation: relation,
},
OpCode: opCode,
})

p.handleStreamError(sErr)
}

Expand Down Expand Up @@ -162,6 +174,7 @@ func (p *DirectoryPublisher) receiver(stream dsi3.Importer_ImportClient) func()
switch m := result.Msg.(type) {
case *dsi3.ImportResponse_Status:
p.errs = true

printStatus(os.Stderr, m.Status)
case *dsi3.ImportResponse_Counter:
switch m.Counter.Type {
Expand All @@ -179,11 +192,13 @@ func (p *DirectoryPublisher) receiver(stream dsi3.Importer_ImportClient) func()
func (p *DirectoryPublisher) doneHandler(ctx context.Context) func() error {
return func() error {
<-ctx.Done()

err := ctx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
p.Log.Trace().Err(err).Msg("subscriber-doneHandler")
return err
}

return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/auth0/pkg/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type CLI struct {
Verify VerifyCmd `cmd:"verify" help:"verify fetcher configuration and credentials"`
}

type VersionCmd struct {
}
type VersionCmd struct{}

func (cmd *VersionCmd) Run() error {
fmt.Printf("%s - %s\n",
AppName,
version.GetInfo().String(),
)

return nil
}
3 changes: 3 additions & 0 deletions plugins/auth0/pkg/app/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ func (cmd *ExecCmd) Run(ctx *cc.CommonCtx) error {
if err != nil {
return err
}

fetcher = fetcher.WithUserPID(cmd.UserPID).WithEmail(cmd.UserEmail).WithRoles(cmd.Roles)

templateContent, err := cmd.getTemplateContent()
if err != nil {
return err
}

transformer := transform.NewGoTemplateTransform(templateContent)

return exec.Execute(ctx.Context, ctx.Log, transformer, fetcher)
}
4 changes: 2 additions & 2 deletions plugins/auth0/pkg/app/export_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"github.com/aserto-dev/ds-load/sdk/transform"
)

type ExportTransformCmd struct {
}
type ExportTransformCmd struct{}

func (t *ExportTransformCmd) Run(ctx *cc.CommonCtx) error {
templateContent, err := Assets().ReadFile("assets/transform_template.tmpl")
if err != nil {
return err
}

transformer := transform.NewGoTemplateTransform(templateContent)

return transformer.ExportTransform(os.Stdout)
Expand Down
1 change: 1 addition & 0 deletions plugins/auth0/pkg/app/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (f *FetchCmd) Run(ctx *cc.CommonCtx) error {
if err != nil {
return err
}

fetcher = fetcher.WithUserPID(f.UserPID).WithEmail(f.UserEmail).WithRoles(f.Roles).WithOrgs(f.Orgs).WithSAML(f.SAML)

return fetcher.Fetch(ctx.Context, os.Stdout, os.Stderr)
Expand Down
2 changes: 2 additions & 0 deletions plugins/auth0/pkg/app/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (t *TransformCmd) Run(ctx *cc.CommonCtx) error {
}

goTemplateTransformer := transform.NewGoTemplateTransform(templateContent)

return t.transform(ctx.Context, goTemplateTransformer)
}

Expand All @@ -36,6 +37,7 @@ func (t *TransformCmd) getTemplateContent() ([]byte, error) {
}

templateLoader := template.NewTemplateLoader(templateContent)

templateContent, err = templateLoader.Load(t.Template)
if err != nil {
return nil, err
Expand Down
Loading
Loading