Skip to content

Commit 382a08e

Browse files
likyhabeizn
andcommitted
fix: move extract existing graphql data to general position (#4921)
Co-authored-by: abeizn <zikuan.an@merico.dev>
1 parent efd6506 commit 382a08e

6 files changed

Lines changed: 29 additions & 16 deletions

File tree

backend/helpers/pluginhelper/api/api_collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
425425
items, err := collector.args.ResponseParser(res)
426426
if err != nil {
427427
if errors.Is(err, ErrFinishCollect) {
428-
logger.Info("a fetch stop by parser, reqInput: #%d", reqData.Params)
428+
logger.Info("a fetch stop by parser, reqInput: #%s", reqData.Params)
429429
handler = nil
430430
} else {
431431
return errors.Default.Wrap(err, fmt.Sprintf("error parsing response from %s", apiUrl))

backend/helpers/pluginhelper/api/graphql_collector.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,23 @@ func (collector *GraphqlCollector) Execute() errors.Error {
134134
if err != nil {
135135
return errors.Default.Wrap(err, "error running auto-migrate")
136136
}
137+
138+
divider := NewBatchSaveDivider(collector.args.Ctx, collector.args.BatchSize, collector.table, collector.params)
139+
137140
// flush data if not incremental collection
138-
if !collector.args.Incremental {
141+
if collector.args.Incremental {
142+
// re extract data for new transformation rules
143+
err = collector.ExtractExistRawData(divider)
144+
if err != nil {
145+
collector.checkError(err)
146+
}
147+
} else {
139148
err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params))
140149
if err != nil {
141150
return errors.Default.Wrap(err, "error deleting data from collector")
142151
}
143152
}
144153

145-
divider := NewBatchSaveDivider(collector.args.Ctx, collector.args.BatchSize, collector.table, collector.params)
146-
147154
collector.args.Ctx.SetProgress(0, -1)
148155
if collector.args.Input != nil {
149156
iterator := collector.args.Input
@@ -208,11 +215,6 @@ func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interfa
208215
SkipCursor: nil,
209216
Size: collector.args.PageSize,
210217
}
211-
err = collector.ExtractExistRawData(divider, reqData)
212-
if err != nil {
213-
collector.checkError(err)
214-
return
215-
}
216218
if collector.args.GetPageInfo != nil {
217219
collector.fetchOneByOne(divider, reqData)
218220
} else {
@@ -280,7 +282,7 @@ func (collector *GraphqlCollector) BatchSaveWithOrigin(divider *BatchSaveDivider
280282
}
281283

282284
// ExtractExistRawData will extract data from existing data from raw layer if increment
283-
func (collector *GraphqlCollector) ExtractExistRawData(divider *BatchSaveDivider, reqData *GraphqlRequestData) errors.Error {
285+
func (collector *GraphqlCollector) ExtractExistRawData(divider *BatchSaveDivider) errors.Error {
284286
// load data from database
285287
db := collector.args.Ctx.GetDal()
286288
logger := collector.args.Ctx.GetLogger()
@@ -304,7 +306,7 @@ func (collector *GraphqlCollector) ExtractExistRawData(divider *BatchSaveDivider
304306
row := &RawData{}
305307

306308
// get the type of query and variables
307-
query, variables, _ := collector.args.BuildQuery(reqData)
309+
query, variables, _ := collector.args.BuildQuery(nil)
308310

309311
// prgress
310312
collector.args.Ctx.SetProgress(0, -1)

backend/plugins/github_graphql/tasks/account_collector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@ func CollectAccount(taskCtx plugin.SubTaskContext) errors.Error {
100100
InputStep: 100,
101101
GraphqlClient: data.GraphqlClient,
102102
BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
103-
accounts := reqData.Input.([]interface{})
104103
query := &GraphqlQueryAccountWrapper{}
104+
if reqData == nil {
105+
return query, map[string]interface{}{}, nil
106+
}
107+
accounts := reqData.Input.([]interface{})
105108
users := []map[string]interface{}{}
106109
for _, iAccount := range accounts {
107110
account := iAccount.(*SimpleAccount)

backend/plugins/github_graphql/tasks/issue_collector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,16 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
116116
PageSize: 100,
117117
Incremental: incremental,
118118
BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
119+
query := &GraphqlQueryIssueWrapper{}
120+
if reqData == nil {
121+
return query, map[string]interface{}{}, nil
122+
}
119123
since := helper.DateTime{}
120124
if incremental {
121125
since = helper.DateTime{Time: *collectorWithState.LatestState.LatestSuccessStart}
122126
} else if collectorWithState.TimeAfter != nil {
123127
since = helper.DateTime{Time: *collectorWithState.TimeAfter}
124128
}
125-
query := &GraphqlQueryIssueWrapper{}
126129
ownerName := strings.Split(data.Options.Name, "/")
127130
variables := map[string]interface{}{
128131
"since": since,

backend/plugins/github_graphql/tasks/job_collector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,11 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) errors.Error {
144144
Incremental: incremental,
145145
GraphqlClient: data.GraphqlClient,
146146
BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
147-
workflowRuns := reqData.Input.([]interface{})
148147
query := &GraphqlQueryCheckRunWrapper{}
148+
if reqData == nil {
149+
return query, map[string]interface{}{}, nil
150+
}
151+
workflowRuns := reqData.Input.([]interface{})
149152
checkSuiteIds := []map[string]interface{}{}
150153
for _, iWorkflowRuns := range workflowRuns {
151154
workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)

backend/plugins/github_graphql/tasks/pr_collector.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
171171
*/
172172
BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
173173
query := &GraphqlQueryPrWrapper{}
174+
if reqData == nil {
175+
return query, map[string]interface{}{}, nil
176+
}
174177
ownerName := strings.Split(data.Options.Name, "/")
175178
variables := map[string]interface{}{
176179
"pageSize": graphql.Int(reqData.Pager.Size),
@@ -191,12 +194,11 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
191194
results := make([]interface{}, 0, 1)
192195
isFinish := false
193196
for _, rawL := range prs {
194-
// collect all data even though in increment mode because of existing data extracting
197+
// collect data even though in increment mode because of updating existing data
195198
if collectorWithState.TimeAfter != nil && !collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
196199
isFinish = true
197200
break
198201
}
199-
//If this is a pr, ignore
200202
githubPr, err := convertGithubPullRequest(rawL, data.Options.ConnectionId, data.Options.GithubId)
201203
if err != nil {
202204
return nil, err

0 commit comments

Comments
 (0)