Skip to content

Commit 63124a2

Browse files
Merge pull request #95 from colonyos/generator_timeout_support
support triggering generator based on a timeout value
2 parents d065ba7 + 6c5fbf6 commit 63124a2

10 files changed

Lines changed: 230 additions & 32 deletions

File tree

internal/cli/generator.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func init() {
3636
addGeneratorCmd.MarkFlagRequired("name")
3737
addGeneratorCmd.Flags().IntVarP(&GeneratorTrigger, "trigger", "", -1, "Trigger")
3838
addGeneratorCmd.MarkFlagRequired("trigger")
39+
addGeneratorCmd.Flags().IntVarP(&GeneratorTimeout, "timeout", "", -1, "Timeout")
3940

4041
packGeneratorCmd.Flags().StringVarP(&ExecutorID, "executorid", "", "", "Executor Id")
4142
packGeneratorCmd.Flags().StringVarP(&ExecutorPrvKey, "executorprvkey", "", "", "Executor private key")
@@ -127,19 +128,15 @@ var addGeneratorCmd = &cobra.Command{
127128
CheckError(errors.New("Generator name not specified"))
128129
}
129130

130-
if GeneratorTimeout == -1 {
131-
CheckError(errors.New("Generator timeout not specified"))
132-
}
133-
134131
if GeneratorTrigger == -1 {
135132
CheckError(errors.New("Generator trigger not specified"))
136133
}
137134

138-
generator := core.CreateGenerator(ColonyID, GeneratorName, workflowSpecJSON, GeneratorTrigger)
135+
generator := core.CreateGenerator(ColonyID, GeneratorName, workflowSpecJSON, GeneratorTrigger, GeneratorTimeout)
139136
addedGenerator, err := client.AddGenerator(generator, ExecutorPrvKey)
140137
CheckError(err)
141138

142-
log.WithFields(log.Fields{"GeneratorID": addedGenerator.ID}).Info("Generator added")
139+
log.WithFields(log.Fields{"GeneratorID": addedGenerator.ID, "GeneratorName": GeneratorName, "Trigger": GeneratorTrigger, "Timeout": GeneratorTimeout}).Info("Generator added")
143140
},
144141
}
145142

@@ -255,6 +252,7 @@ var getGeneratorCmd = &cobra.Command{
255252
[]string{"Id", generator.ID},
256253
[]string{"Name", generator.Name},
257254
[]string{"Trigger", strconv.Itoa(generator.Trigger)},
255+
[]string{"Timeout", strconv.Itoa(generator.Timeout)},
258256
[]string{"Lastrun", generator.LastRun.Format(TimeLayout)},
259257
[]string{"CheckerPeriod", strconv.Itoa(generator.CheckerPeriod)},
260258
[]string{"QueueSize", strconv.Itoa(generator.QueueSize)},

pkg/core/generator.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,21 @@ type Generator struct {
1111
Name string `json:"name"`
1212
WorkflowSpec string `json:"workflowspec"`
1313
Trigger int `json:"trigger"`
14+
Timeout int `json:"timeout"`
15+
FirstPack time.Time `json:"firstpack"`
1416
LastRun time.Time `json:"lastrun"`
1517
QueueSize int `json:"queuesize"`
1618
CheckerPeriod int `json:"checkerperiod"`
1719
}
1820

19-
func CreateGenerator(colonyID string, name string, workflowSpec string, trigger int) *Generator {
21+
func CreateGenerator(colonyID string, name string, workflowSpec string, trigger int, timeout int) *Generator {
2022
generator := &Generator{
2123
ColonyID: colonyID,
2224
Name: name,
2325
WorkflowSpec: workflowSpec,
2426
Trigger: trigger,
27+
Timeout: timeout,
28+
FirstPack: time.Time{},
2529
}
2630

2731
return generator
@@ -88,6 +92,7 @@ func (generator *Generator) Equals(generator2 *Generator) bool {
8892
generator.Name != generator2.Name ||
8993
generator.WorkflowSpec != generator2.WorkflowSpec ||
9094
generator.Trigger != generator2.Trigger ||
95+
generator.Timeout != generator2.Timeout ||
9196
generator.CheckerPeriod != generator2.CheckerPeriod ||
9297
generator.QueueSize != generator2.QueueSize {
9398
same = false

pkg/core/generator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestCreateGenerator(t *testing.T) {
1717
workflowSpec.AddFunctionSpec(funcSpec2)
1818
jsonStr, err := workflowSpec.ToJSON()
1919
assert.Nil(t, err)
20-
generator := CreateGenerator(GenerateRandomID(), "test_genname", jsonStr, 10)
20+
generator := CreateGenerator(GenerateRandomID(), "test_genname", jsonStr, 10, 10)
2121
generator.ID = GenerateRandomID()
2222
generator.QueueSize = 100
2323
generator.CheckerPeriod = 200
@@ -45,11 +45,11 @@ func TestCreateGeneratorSpecArray(t *testing.T) {
4545
workflowSpec1.AddFunctionSpec(funcSpec2)
4646
jsonStr, err := workflowSpec1.ToJSON()
4747
assert.Nil(t, err)
48-
generator1 := CreateGenerator(GenerateRandomID(), "test_genname1", jsonStr, 10)
48+
generator1 := CreateGenerator(GenerateRandomID(), "test_genname1", jsonStr, 10, 10)
4949
generator1.ID = GenerateRandomID()
5050
arr = append(arr, generator1)
5151

52-
generator2 := CreateGenerator(GenerateRandomID(), "test_genname2", jsonStr, 10)
52+
generator2 := CreateGenerator(GenerateRandomID(), "test_genname2", jsonStr, 10, 10)
5353
generator2.ID = GenerateRandomID()
5454
assert.Nil(t, err)
5555
arr = append(arr, generator2)

pkg/database/database.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ type Database interface {
137137
// Generator functions
138138
AddGenerator(generator *core.Generator) error
139139
SetGeneratorLastRun(generatorID string) error
140+
SetGeneratorFirstPack(generatorID string) error
140141
GetGeneratorByID(generatorID string) (*core.Generator, error)
141142
GetGeneratorByName(name string) (*core.Generator, error)
142143
FindGeneratorsByColonyID(colonyID string, count int) ([]*core.Generator, error)

pkg/database/postgresql/database.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (db *PQDatabase) Initialize() error {
144144
return err
145145
}
146146

147-
sqlStatement = `CREATE TABLE ` + db.dbPrefix + `GENERATORS (GENERATOR_ID TEXT PRIMARY KEY NOT NULL, COLONY_ID TEXT NOT NULL, NAME TEXT NOT NULL UNIQUE, WORKFLOW_SPEC TEXT NOT NULL, TRIGGER INTEGER, LASTRUN TIMESTAMPTZ)`
147+
sqlStatement = `CREATE TABLE ` + db.dbPrefix + `GENERATORS (GENERATOR_ID TEXT PRIMARY KEY NOT NULL, COLONY_ID TEXT NOT NULL, NAME TEXT NOT NULL UNIQUE, WORKFLOW_SPEC TEXT NOT NULL, TRIGGER INTEGER, TIMEOUT INTEGER, LASTRUN TIMESTAMPTZ, FIRSTPACK TIMESTAMPTZ)`
148148
_, err = db.postgresql.Exec(sqlStatement)
149149
if err != nil {
150150
return err
@@ -210,5 +210,23 @@ func (db *PQDatabase) Initialize() error {
210210
return err
211211
}
212212

213+
sqlStatement = `CREATE INDEX ` + db.dbPrefix + `RETENTION_INDEX1 ON ` + db.dbPrefix + `ATTRIBUTES (ADDED, STATE)`
214+
_, err = db.postgresql.Exec(sqlStatement)
215+
if err != nil {
216+
return err
217+
}
218+
219+
sqlStatement = `CREATE INDEX ` + db.dbPrefix + `RETENTION_INDEX2 ON ` + db.dbPrefix + `PROCESSES (SUBMISSION_TIME, STATE)`
220+
_, err = db.postgresql.Exec(sqlStatement)
221+
if err != nil {
222+
return err
223+
}
224+
225+
sqlStatement = `CREATE INDEX ` + db.dbPrefix + `RETENTION_INDEX3 ON ` + db.dbPrefix + `PROCESSGRAPHS (SUBMISSION_TIME, STATE)`
226+
_, err = db.postgresql.Exec(sqlStatement)
227+
if err != nil {
228+
return err
229+
}
230+
213231
return nil
214232
}

pkg/database/postgresql/generators.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
)
1010

1111
func (db *PQDatabase) AddGenerator(generator *core.Generator) error {
12-
sqlStatement := `INSERT INTO ` + db.dbPrefix + `GENERATORS (GENERATOR_ID, COLONY_ID, NAME, WORKFLOW_SPEC, TRIGGER, LASTRUN) VALUES ($1, $2, $3, $4, $5, $6)`
13-
_, err := db.postgresql.Exec(sqlStatement, generator.ID, generator.ColonyID, generator.Name, generator.WorkflowSpec, generator.Trigger, time.Time{})
12+
sqlStatement := `INSERT INTO ` + db.dbPrefix + `GENERATORS (GENERATOR_ID, COLONY_ID, NAME, WORKFLOW_SPEC, TRIGGER, TIMEOUT, LASTRUN, FIRSTPACK) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`
13+
_, err := db.postgresql.Exec(sqlStatement, generator.ID, generator.ColonyID, generator.Name, generator.WorkflowSpec, generator.Trigger, generator.Timeout, time.Time{}, time.Time{})
1414
if err != nil {
1515
return err
1616
}
@@ -27,12 +27,14 @@ func (db *PQDatabase) parseGenerators(rows *sql.Rows) ([]*core.Generator, error)
2727
var name string
2828
var workflowSpec string
2929
var trigger int
30+
var timeout int
3031
var lastRun time.Time
31-
if err := rows.Scan(&generatorID, &colonyID, &name, &workflowSpec, &trigger, &lastRun); err != nil {
32+
var firstPack time.Time
33+
if err := rows.Scan(&generatorID, &colonyID, &name, &workflowSpec, &trigger, &timeout, &lastRun, &firstPack); err != nil {
3234
return nil, err
3335
}
3436

35-
generator := &core.Generator{ID: generatorID, ColonyID: colonyID, Name: name, WorkflowSpec: workflowSpec, Trigger: trigger, LastRun: lastRun}
37+
generator := &core.Generator{ID: generatorID, ColonyID: colonyID, Name: name, WorkflowSpec: workflowSpec, Trigger: trigger, Timeout: timeout, LastRun: lastRun, FirstPack: firstPack}
3638

3739
generators = append(generators, generator)
3840
}
@@ -103,6 +105,21 @@ func (db *PQDatabase) SetGeneratorLastRun(generatorID string) error {
103105
return nil
104106
}
105107

108+
func (db *PQDatabase) SetGeneratorFirstPack(generatorID string) error {
109+
generator, err := db.GetGeneratorByID(generatorID)
110+
if err != nil {
111+
return err
112+
}
113+
114+
sqlStatement := `UPDATE ` + db.dbPrefix + `GENERATORS SET FIRSTPACK=$1 WHERE GENERATOR_ID=$2`
115+
_, err = db.postgresql.Exec(sqlStatement, time.Now(), generator.ID)
116+
if err != nil {
117+
return err
118+
}
119+
120+
return nil
121+
}
122+
106123
func (db *PQDatabase) FindGeneratorsByColonyID(colonyID string, count int) ([]*core.Generator, error) {
107124
sqlStatement := `SELECT * FROM ` + db.dbPrefix + `GENERATORS WHERE COLONY_ID=$1 LIMIT $2`
108125
rows, err := db.postgresql.Query(sqlStatement, colonyID, count)

pkg/database/postgresql/generators_test.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ import (
1111
func TestAddGenerator(t *testing.T) {
1212
db, err := PrepareTests()
1313
assert.Nil(t, err)
14+
defer db.Close()
1415

1516
generator := utils.FakeGenerator(t, core.GenerateRandomID())
1617
generator.ID = core.GenerateRandomID()
1718
err = db.AddGenerator(generator)
1819
assert.Nil(t, err)
19-
20-
defer db.Close()
2120
}
2221

2322
func TestGetGenerator(t *testing.T) {
2423
db, err := PrepareTests()
2524
assert.Nil(t, err)
25+
defer db.Close()
2626

2727
generator := utils.FakeGenerator(t, core.GenerateRandomID())
2828
generator.ID = core.GenerateRandomID()
@@ -32,13 +32,12 @@ func TestGetGenerator(t *testing.T) {
3232
generatorFromDB, err := db.GetGeneratorByID(generator.ID)
3333
assert.Nil(t, err)
3434
assert.True(t, generator.Equals(generatorFromDB))
35-
36-
defer db.Close()
3735
}
3836

3937
func TestSetGeneratorLastRun(t *testing.T) {
4038
db, err := PrepareTests()
4139
assert.Nil(t, err)
40+
defer db.Close()
4241

4342
generator := utils.FakeGenerator(t, core.GenerateRandomID())
4443
generator.ID = core.GenerateRandomID()
@@ -58,13 +57,35 @@ func TestSetGeneratorLastRun(t *testing.T) {
5857
assert.Nil(t, err)
5958

6059
assert.Greater(t, generatorFromDB.LastRun.Unix(), lastRun)
60+
}
6161

62+
func TestSetGeneratorFirstPack(t *testing.T) {
63+
db, err := PrepareTests()
64+
assert.Nil(t, err)
6265
defer db.Close()
66+
67+
generator := utils.FakeGenerator(t, core.GenerateRandomID())
68+
generator.ID = core.GenerateRandomID()
69+
err = db.AddGenerator(generator)
70+
assert.Nil(t, err)
71+
72+
generatorFromDB, err := db.GetGeneratorByID(generator.ID)
73+
assert.Nil(t, err)
74+
assert.True(t, generator.Equals(generatorFromDB))
75+
76+
err = db.SetGeneratorFirstPack(generator.ID)
77+
assert.Nil(t, err)
78+
79+
generatorFromDB, err = db.GetGeneratorByID(generator.ID)
80+
assert.Nil(t, err)
81+
82+
assert.True(t, generatorFromDB.FirstPack.Unix() > 0)
6383
}
6484

6585
func TestFindGeneratorsByColonyID(t *testing.T) {
6686
db, err := PrepareTests()
6787
assert.Nil(t, err)
88+
defer db.Close()
6889

6990
colonyID := core.GenerateRandomID()
7091
generator1 := utils.FakeGenerator(t, colonyID)
@@ -91,13 +112,12 @@ func TestFindGeneratorsByColonyID(t *testing.T) {
91112
}
92113
}
93114
assert.True(t, count == 2)
94-
95-
defer db.Close()
96115
}
97116

98117
func TestFindAllGenerators(t *testing.T) {
99118
db, err := PrepareTests()
100119
assert.Nil(t, err)
120+
defer db.Close()
101121

102122
colonyID1 := core.GenerateRandomID()
103123
generator1 := utils.FakeGenerator(t, colonyID1)
@@ -114,13 +134,12 @@ func TestFindAllGenerators(t *testing.T) {
114134
generatorsFromDB, err := db.FindAllGenerators()
115135
assert.Nil(t, err)
116136
assert.Len(t, generatorsFromDB, 2)
117-
118-
defer db.Close()
119137
}
120138

121139
func TestDeleteGeneratorByID(t *testing.T) {
122140
db, err := PrepareTests()
123141
assert.Nil(t, err)
142+
defer db.Close()
124143

125144
colonyID := core.GenerateRandomID()
126145
generator1 := utils.FakeGenerator(t, colonyID)
@@ -159,13 +178,12 @@ func TestDeleteGeneratorByID(t *testing.T) {
159178
count, err = db.CountGeneratorArgs(generator1.ID)
160179
assert.Nil(t, err)
161180
assert.Equal(t, count, 0)
162-
163-
defer db.Close()
164181
}
165182

166183
func TestDeleteAllGeneratorsByColonyID(t *testing.T) {
167184
db, err := PrepareTests()
168185
assert.Nil(t, err)
186+
defer db.Close()
169187

170188
colonyID1 := core.GenerateRandomID()
171189
generator1 := utils.FakeGenerator(t, colonyID1)
@@ -227,6 +245,4 @@ func TestDeleteAllGeneratorsByColonyID(t *testing.T) {
227245
count, err = db.CountGeneratorArgs(generator3.ID)
228246
assert.Nil(t, err)
229247
assert.Equal(t, count, 1)
230-
231-
defer db.Close()
232248
}

pkg/server/generator_controller.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,34 @@ func (controller *coloniesController) triggerGenerators() {
4646
log.WithFields(log.Fields{"Error": err}).Error("Failed count generator args from db")
4747
continue
4848
}
49+
now := time.Now()
50+
timeout := false
51+
if generator.LastRun.Unix() <= 0 { // Generator has never run
52+
if generator.FirstPack.Unix() <= 0 { // Generator has never been packed
53+
timeout = false
54+
} else { // Generator has been packed, calulcate deadline based first pack
55+
timeoutDeadline := generator.FirstPack.Add(time.Duration(generator.Timeout) * time.Second)
56+
timeout = now.Unix() > timeoutDeadline.Unix()
57+
}
58+
} else { // Generator has run before
59+
timeoutDeadline := generator.LastRun.Add(time.Duration(generator.Timeout) * time.Second)
60+
timeout = now.Unix() > timeoutDeadline.Unix()
61+
}
4962
if counter >= generator.Trigger {
5063
timesToSubmit := counter / generator.Trigger
5164
for i := 0; i < timesToSubmit; i++ {
5265
log.WithFields(log.Fields{
5366
"GeneratorId": generator.ID,
5467
"Counter": counter}).
5568
Debug("Generator threshold reached, submitting workflow")
56-
controller.submitWorkflow(generator)
69+
controller.submitWorkflow(generator, generator.Trigger)
5770
}
71+
} else if counter >= 1 && generator.Timeout > 0 && timeout {
72+
log.WithFields(log.Fields{
73+
"GeneratorId": generator.ID,
74+
"Counter": counter}).
75+
Debug("Generator threshold reached, submitting workflow")
76+
controller.submitWorkflow(generator, counter)
5877
}
5978
}
6079
}}
@@ -136,6 +155,21 @@ func (controller *coloniesController) packGenerator(generatorID string, colonyID
136155
}
137156
count, err := controller.db.CountGeneratorArgs(generatorID)
138157
log.WithFields(log.Fields{"Arg": arg, "Count": count, "GeneratorId": generatorID}).Debug("Added args to generator")
158+
159+
generator, err := controller.db.GetGeneratorByID(generatorID)
160+
if err != nil {
161+
log.WithFields(log.Fields{"Error": err, "GeneratorId": generatorID}).Error("Failed to get generator")
162+
cmd.errorChan <- err
163+
}
164+
165+
if generator.FirstPack.Unix() < 0 {
166+
err = controller.db.SetGeneratorFirstPack(generatorID)
167+
if err != nil {
168+
log.WithFields(log.Fields{"Error": err, "GeneratorId": generatorID}).Error("Failed to set generator first pack")
169+
cmd.errorChan <- err
170+
}
171+
}
172+
139173
cmd.errorChan <- nil
140174
}}
141175

@@ -163,14 +197,14 @@ func (controller *coloniesController) generatorTriggerLoop() {
163197
}
164198
}
165199

166-
func (controller *coloniesController) submitWorkflow(generator *core.Generator) {
200+
func (controller *coloniesController) submitWorkflow(generator *core.Generator, counter int) {
167201
workflowSpec, err := core.ConvertJSONToWorkflowSpec(generator.WorkflowSpec)
168202
if err != nil {
169203
log.WithFields(log.Fields{"Error": err}).Error("Failed to parse workflow spec")
170204
return
171205
}
172206

173-
generatorArgs, err := controller.db.GetGeneratorArgs(generator.ID, generator.Trigger)
207+
generatorArgs, err := controller.db.GetGeneratorArgs(generator.ID, counter)
174208
var args []string
175209
for _, generatorArg := range generatorArgs {
176210
args = append(args, generatorArg.Arg)
@@ -179,6 +213,7 @@ func (controller *coloniesController) submitWorkflow(generator *core.Generator)
179213
log.WithFields(log.Fields{
180214
"GeneratorId": generator.ID,
181215
"Trigger": generator.Trigger,
216+
"Counter": counter,
182217
"Args": args}).
183218
Debug("Generator submitting workflow")
184219

0 commit comments

Comments
 (0)