Skip to content

Commit

Permalink
Remove debug line for invocations
Browse files Browse the repository at this point in the history
This showed the URL which is no longer required

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Jul 14, 2021
1 parent dca4fa1 commit 124ae95
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 23 deletions.
17 changes: 15 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const topic = "cron-function"
func main() {
config, err := getControllerConfig()
if err != nil {
panic(err)
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
os.Exit(1)
}

sha, ver := version.GetReleaseInfo()
Expand All @@ -38,14 +39,26 @@ func main() {
config.ContentType,
config.PrintResponse)

go func() {
for {
r := <-invoker.Responses
if r.Error != nil {
log.Printf("Error with: %s, %s", r.Function, err.Error())
} else {
log.Printf("Response: %s [%d]", r.Function, r.Status)
}
}
}()

cronScheduler := cfunction.NewScheduler()
interval := time.Second * 10

cronScheduler.Start()
err = startFunctionProbe(interval, topic, config, cronScheduler, invoker)

if err != nil {
panic(err)
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
os.Exit(1)
}
}

Expand Down
25 changes: 13 additions & 12 deletions types/cron_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,32 @@ func (c *CronFunctions) Contains(cf *CronFunction) bool {
// and returns error if it is not possible
func ToCronFunction(f ptypes.FunctionStatus, namespace string, topic string) (CronFunction, error) {
if f.Annotations == nil {
return CronFunction{}, errors.New(fmt.Sprint(f.Name, " has no annotations."))
return CronFunction{}, fmt.Errorf("%s has no annotations", f.Name)
}

fTopic := (*f.Annotations)["topic"]
fSchedule := (*f.Annotations)["schedule"]

if fTopic != topic {
return CronFunction{}, errors.New(fmt.Sprint(f.Name, " has wrong topic: ", fTopic))
return CronFunction{}, fmt.Errorf("%s has wrong topic: %s", fTopic, f.Name)
}

if !CheckSchedule(fSchedule) {
return CronFunction{}, errors.New(fmt.Sprint(f.Name, " has wrong cron schedule: ", fSchedule))
return CronFunction{}, fmt.Errorf("%s has wrong cron schedule: %s", f.Name, fSchedule)
}

var c CronFunction
c.FuncData = f
c.Name = f.Name
c.Namespace = namespace
c.Schedule = fSchedule
return c, nil
return CronFunction{
FuncData: f,
Name: f.Name,
Namespace: namespace,
Schedule: fSchedule,
}, nil
}

// InvokeFunction Invokes the cron function
func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {

gwURL := fmt.Sprintf("%s/%s", i.GatewayURL, c.String())
log.Printf("HTTP POST: %s", gwURL)

req, err := http.NewRequest(http.MethodPost, gwURL, nil)
if err != nil {
Expand All @@ -91,7 +91,7 @@ func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {

if err != nil {
i.Responses <- types.InvokerResponse{
Error: errors.Wrap(err, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
Error: errors.Wrap(err, fmt.Sprintf("unable to invoke %s", c.String())),
}
return nil, err
}
Expand All @@ -103,8 +103,9 @@ func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {
if err != nil {
log.Printf("Error reading body")
i.Responses <- types.InvokerResponse{
Error: errors.Wrap(err, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
Error: errors.Wrap(err, fmt.Sprintf("unable to invoke %s", c.String())),
}

return nil, err
}

Expand Down
18 changes: 9 additions & 9 deletions types/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ type ScheduledFunction struct {
// ScheduledFunctions is an array of ScheduledFunction
type ScheduledFunctions []ScheduledFunction

// AddCronFunction adds a function to cron
func (s *Scheduler) AddCronFunction(c CronFunction, invoker *types.Invoker) (ScheduledFunction, error) {
eID, err := s.main.AddFunc(c.Schedule, func() {
log.Printf("Executing function: %s", c.String())
c.InvokeFunction(invoker)
})
return ScheduledFunction{c, EntryID(eID)}, err
}

// NewScheduler returns a scheduler
func NewScheduler() *Scheduler {
return &Scheduler{
Expand All @@ -56,6 +47,15 @@ func (s *Scheduler) Start() {
s.main.Start()
}

// AddCronFunction adds a function to cron
func (s *Scheduler) AddCronFunction(c CronFunction, invoker *types.Invoker) (ScheduledFunction, error) {
eID, err := s.main.AddFunc(c.Schedule, func() {
log.Printf("Executing function: %s", c.String())
c.InvokeFunction(invoker)
})
return ScheduledFunction{c, EntryID(eID)}, err
}

// Remove removes the function from scheduler
func (s *Scheduler) Remove(function ScheduledFunction) {
s.main.Remove(cron.EntryID(function.ID))
Expand Down

0 comments on commit 124ae95

Please sign in to comment.