Skip to content

Commit 80ec49b

Browse files
magichairNickcw6jibsen-vh
authored
feat(circleci): Cherry Pick #7820 #7986 (#8341)
* feat(circleci-plugin): incremental data collection (#7986) * feat(api_collector_stateful): handle case were response has records from both before & after createdAfter of last collection * feat(circleci-plugin): incremental collection for pipelines * feat(api_collector_stateful): expose Input collector arg for StatefulFinalizableEntity to collect data based on previous collection * feat(circleci-plugin): incremental data collection for workflows * feat(circleci-plugin): incremental data collection for jobs * refactor(circleci-plugin): use common query param function * refactor(circleci-plugin): use BuildQueryParamsWithPageToken func when collecting unfinished job details * fix(circleci-plugin): pipeline collector time range (#7820) * fix(circleci-plugin): only collect pipelines from after data time range * fix(circleci-plugin): ignore 404 not found when collecting jobs or workflows * Cleanup from bad merge --------- Co-authored-by: Nick Williams <65220492+Nickcw6@users.noreply.github.com> Co-authored-by: John Ibsen <john@videa.ai>
1 parent 7d9d491 commit 80ec49b

File tree

5 files changed

+220
-61
lines changed

5 files changed

+220
-61
lines changed

backend/helpers/pluginhelper/api/api_collector_stateful.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
139139
createdAfter := manager.CollectorStateManager.GetSince()
140140
isIncremental := manager.CollectorStateManager.IsIncremental()
141141

142+
var inputIterator Iterator
143+
if args.CollectNewRecordsByList.BuildInputIterator != nil {
144+
inputIterator, err = args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
145+
if err != nil {
146+
return nil, err
147+
}
148+
}
149+
142150
// step 1: create a collector to collect newly added records
143151
err = manager.InitCollector(ApiCollectorArgs{
144152
ApiClient: args.ApiClient,
145153
// common
154+
Input: inputIterator,
146155
Incremental: isIncremental,
147156
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
148157
Query: func(reqData *RequestData) (url.Values, errors.Error) {
@@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
169178

170179
// time filter or diff sync
171180
if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
172-
// if the first record of the page was created before createdAfter, return emtpy set and stop
181+
// if the first record of the page was created before createdAfter and not a zero value, return empty set and stop
173182
firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
174183
if err != nil {
175184
return nil, err
176185
}
177-
if firstCreated.Before(*createdAfter) {
186+
if firstCreated.Before(*createdAfter) && !firstCreated.IsZero() {
178187
return nil, ErrFinishCollect
179188
}
180-
// if the last record was created before createdAfter, return records and stop
189+
190+
// If last record was created before CreatedAfter, including a zero value, check each record individually
181191
lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
182192
if err != nil {
183193
return nil, err
184194
}
185195
if lastCreated.Before(*createdAfter) {
186-
return items, ErrFinishCollect
196+
var validItems []json.RawMessage
197+
// Only collect items that were created after the last successful collection to prevent duplicates
198+
for _, item := range items {
199+
itemCreatedAt, err := args.CollectNewRecordsByList.GetCreated(item)
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
if itemCreatedAt.IsZero() {
205+
// If zero then timestamp is null on the response - accept as valid for downstream processing
206+
validItems = append(validItems, item)
207+
continue
208+
}
209+
210+
if itemCreatedAt.Before(*createdAfter) {
211+
// Once we reach an item that was created before the last successful collection, stop & return
212+
return validItems, ErrFinishCollect
213+
}
214+
validItems = append(validItems, item)
215+
}
187216
}
188217
}
189218
return items, err
@@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct {
267296
Concurrency int // required for Undetermined Strategy, number of concurrent requests
268297
GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) // required for Sequential Strategy, to extract the next page cursor from the given response
269298
GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error) // required for Determined Strategy, to extract the total number of pages from the given response
299+
BuildInputIterator func(isIncremental bool, createdAfter *time.Time) (Iterator, errors.Error)
270300
}
271301

272302
// FinalizableApiCollectorDetailArgs is the arguments for the detail collector

backend/plugins/circleci/tasks/job_collector.go

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21+
"encoding/json"
22+
"reflect"
23+
"time"
24+
2125
"github.com/apache/incubator-devlake/core/dal"
2226
"github.com/apache/incubator-devlake/core/errors"
2327
"github.com/apache/incubator-devlake/core/plugin"
2428
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
2529
"github.com/apache/incubator-devlake/plugins/circleci/models"
26-
"reflect"
2730
)
2831

2932
const RAW_JOB_TABLE = "circleci_api_jobs"
@@ -43,30 +46,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
4346
logger := taskCtx.GetLogger()
4447
logger.Info("collect jobs")
4548

46-
clauses := []dal.Clause{
47-
dal.Select("id, pipeline_id"),
48-
dal.From(&models.CircleciWorkflow{}),
49-
dal.Where("_tool_circleci_workflows.connection_id = ? and _tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
50-
}
49+
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
50+
RawDataSubTaskArgs: *rawDataSubTaskArgs,
51+
ApiClient: data.ApiClient,
52+
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
53+
PageSize: int(data.Options.PageSize),
54+
GetNextPageCustomData: ExtractNextPageToken,
55+
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
56+
clauses := []dal.Clause{
57+
dal.Select("id, pipeline_id"), // pipeline_id not on individual job response but required for result
58+
dal.From(&models.CircleciWorkflow{}),
59+
dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
60+
}
5161

52-
db := taskCtx.GetDal()
53-
cursor, err := db.Cursor(clauses...)
54-
if err != nil {
55-
return err
56-
}
57-
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
58-
if err != nil {
59-
return err
60-
}
62+
if isIncremental {
63+
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
64+
}
65+
66+
db := taskCtx.GetDal()
67+
cursor, err := db.Cursor(clauses...)
68+
if err != nil {
69+
return nil, err
70+
}
71+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
72+
},
73+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
74+
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
75+
Query: BuildQueryParamsWithPageToken,
76+
ResponseParser: ParseCircleciPageTokenResp,
77+
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
78+
},
79+
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
80+
var job struct { // Individual job response lacks created_at field, so have to use started_at
81+
CreatedAt time.Time `json:"started_at"` // This will be null in some cases (e.g. queued, not_running, blocked)
82+
}
83+
if err := json.Unmarshal(item, &job); err != nil {
84+
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal job")
85+
}
86+
return job.CreatedAt, nil
87+
},
88+
},
89+
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
90+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
91+
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow
92+
Query: BuildQueryParamsWithPageToken,
93+
ResponseParser: ParseCircleciPageTokenResp,
94+
AfterResponse: ignoreDeletedBuilds,
95+
},
96+
BuildInputIterator: func() (api.Iterator, errors.Error) {
97+
clauses := []dal.Clause{
98+
dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once
99+
dal.From(&models.CircleciJob{}),
100+
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
101+
}
61102

62-
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
63-
RawDataSubTaskArgs: *rawDataSubTaskArgs,
64-
ApiClient: data.ApiClient,
65-
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
66-
Input: iterator,
67-
GetNextPageCustomData: ExtractNextPageToken,
68-
Query: BuildQueryParamsWithPageToken,
69-
ResponseParser: ParseCircleciPageTokenResp,
103+
db := taskCtx.GetDal()
104+
cursor, err := db.Cursor(clauses...)
105+
if err != nil {
106+
return nil, err
107+
}
108+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciJob{}))
109+
},
110+
},
70111
})
71112
if err != nil {
72113
logger.Error(err, "collect jobs error")

backend/plugins/circleci/tasks/pipeline_collector.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21+
"encoding/json"
22+
"net/http"
23+
2124
"github.com/apache/incubator-devlake/core/errors"
2225
"github.com/apache/incubator-devlake/core/plugin"
2326
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -38,15 +41,41 @@ var CollectPipelinesMeta = plugin.SubTaskMeta{
3841
func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
3942
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE)
4043
logger := taskCtx.GetLogger()
44+
timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
4145
logger.Info("collect pipelines")
42-
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
43-
RawDataSubTaskArgs: *rawDataSubTaskArgs,
44-
ApiClient: data.ApiClient,
45-
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
46-
PageSize: int(data.Options.PageSize),
47-
GetNextPageCustomData: ExtractNextPageToken,
48-
Query: BuildQueryParamsWithPageToken,
49-
ResponseParser: ParseCircleciPageTokenResp,
46+
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
47+
RawDataSubTaskArgs: *rawDataSubTaskArgs,
48+
ApiClient: data.ApiClient,
49+
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
50+
PageSize: int(data.Options.PageSize),
51+
GetNextPageCustomData: ExtractNextPageToken,
52+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
53+
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
54+
Query: BuildQueryParamsWithPageToken,
55+
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
56+
data := CircleciPageTokenResp[[]json.RawMessage]{}
57+
err := api.UnmarshalResponse(res, &data)
58+
59+
if err != nil {
60+
return nil, err
61+
}
62+
filteredItems := []json.RawMessage{}
63+
for _, item := range data.Items {
64+
pipelineCreatedAt, err := extractCreatedAt(item)
65+
66+
if err != nil {
67+
return nil, err
68+
}
69+
if pipelineCreatedAt.Before(*timeAfter) {
70+
return filteredItems, api.ErrFinishCollect
71+
}
72+
filteredItems = append(filteredItems, item)
73+
}
74+
return filteredItems, nil
75+
},
76+
},
77+
GetCreated: extractCreatedAt,
78+
},
5079
})
5180
if err != nil {
5281
logger.Error(err, "collect pipelines error")

backend/plugins/circleci/tasks/shared.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package tasks
1919

2020
import (
2121
"encoding/json"
22+
"net/http"
23+
"net/url"
24+
"time"
25+
2226
"github.com/apache/incubator-devlake/core/dal"
2327
"github.com/apache/incubator-devlake/core/errors"
2428
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
2529
"github.com/apache/incubator-devlake/core/plugin"
2630
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
2731
"github.com/apache/incubator-devlake/plugins/circleci/models"
28-
"net/http"
29-
"net/url"
3032
)
3133

3234
var accountIdGen *didgen.DomainIdGenerator
@@ -107,7 +109,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.R
107109
return res.NextPageToken, nil
108110
}
109111

110-
func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) {
112+
func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time) (url.Values, errors.Error) {
111113
query := url.Values{}
112114
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
113115
query.Set("page-token", pageToken)
@@ -120,3 +122,22 @@ func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.E
120122
err := api.UnmarshalResponse(res, &data)
121123
return data.Items, err
122124
}
125+
126+
func ignoreDeletedBuilds(res *http.Response) errors.Error {
127+
// CircleCI API will return a 404 response for a workflow/job that has been deleted
128+
// due to their data retention policy. We should ignore these errors.
129+
if res.StatusCode == http.StatusNotFound {
130+
return api.ErrIgnoreAndContinue
131+
}
132+
return nil
133+
}
134+
135+
func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) {
136+
var entity struct {
137+
CreatedAt time.Time `json:"created_at"`
138+
}
139+
if err := json.Unmarshal(item, &entity); err != nil {
140+
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal item")
141+
}
142+
return entity.CreatedAt, nil
143+
}

backend/plugins/circleci/tasks/workflow_collector.go

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21+
"encoding/json"
22+
"net/http"
23+
"reflect"
24+
"time"
25+
2126
"github.com/apache/incubator-devlake/core/dal"
2227
"github.com/apache/incubator-devlake/core/errors"
2328
"github.com/apache/incubator-devlake/core/plugin"
2429
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
2530
"github.com/apache/incubator-devlake/plugins/circleci/models"
26-
"reflect"
2731
)
2832

2933
const RAW_WORKFLOW_TABLE = "circleci_api_workflows"
@@ -43,30 +47,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
4347
logger := taskCtx.GetLogger()
4448
logger.Info("collect workflows")
4549

46-
clauses := []dal.Clause{
47-
dal.Select("id"),
48-
dal.From(&models.CircleciPipeline{}),
49-
dal.Where("_tool_circleci_pipelines.connection_id = ? and _tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
50-
}
50+
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
51+
RawDataSubTaskArgs: *rawDataSubTaskArgs,
52+
ApiClient: data.ApiClient,
53+
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
54+
PageSize: int(data.Options.PageSize),
55+
GetNextPageCustomData: ExtractNextPageToken,
56+
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
57+
clauses := []dal.Clause{
58+
dal.Select("id"),
59+
dal.From(&models.CircleciPipeline{}),
60+
dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
61+
}
5162

52-
db := taskCtx.GetDal()
53-
cursor, err := db.Cursor(clauses...)
54-
if err != nil {
55-
return err
56-
}
57-
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
58-
if err != nil {
59-
return err
60-
}
63+
if isIncremental {
64+
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
65+
}
66+
67+
db := taskCtx.GetDal()
68+
cursor, err := db.Cursor(clauses...)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
73+
},
74+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
75+
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
76+
Query: BuildQueryParamsWithPageToken,
77+
ResponseParser: ParseCircleciPageTokenResp,
78+
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
79+
},
80+
GetCreated: extractCreatedAt,
81+
},
82+
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
83+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
84+
UrlTemplate: "/v2/workflow/{{ .Input.Id }}",
85+
Query: nil,
86+
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
87+
var data json.RawMessage
88+
err := api.UnmarshalResponse(res, &data)
89+
return []json.RawMessage{data}, err
90+
},
91+
AfterResponse: ignoreDeletedBuilds,
92+
},
93+
BuildInputIterator: func() (api.Iterator, errors.Error) {
94+
clauses := []dal.Clause{
95+
dal.Select("id"),
96+
dal.From(&models.CircleciWorkflow{}),
97+
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'on_hold', 'failing')", data.Options.ConnectionId, data.Options.ProjectSlug),
98+
}
6199

62-
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
63-
RawDataSubTaskArgs: *rawDataSubTaskArgs,
64-
ApiClient: data.ApiClient,
65-
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
66-
Input: iterator,
67-
GetNextPageCustomData: ExtractNextPageToken,
68-
Query: BuildQueryParamsWithPageToken,
69-
ResponseParser: ParseCircleciPageTokenResp,
100+
db := taskCtx.GetDal()
101+
cursor, err := db.Cursor(clauses...)
102+
if err != nil {
103+
return nil, err
104+
}
105+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
106+
},
107+
},
70108
})
71109
if err != nil {
72110
logger.Error(err, "collect workflows error")

0 commit comments

Comments
 (0)