Skip to content

Commit 2f5504d

Browse files
congxitravilyu
congxi
authored andcommitted
[feature] support flame graph
**Phenomenon and reproduction steps** **Root cause and solution** **Impactions** **Test method** **Affected branch(es)** * main **Checklist** - [ ] Dependencies update required - [ ] Common bug (similar problem in other repo)
1 parent b085d75 commit 2f5504d

File tree

1 file changed

+142
-10
lines changed

1 file changed

+142
-10
lines changed

deepflow-querier-datasource/pkg/plugin/datasource.go

Lines changed: 142 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,14 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
198198
returnMetrics := qj["returnMetrics"].([]interface{})
199199
// 获取returnTags
200200
returnTags := qj["returnTags"].([]interface{})
201-
202201
//基本值判断,刚进入时为空,直接返回空
203202
if sql, ok := qj["sql"]; ok {
204203
if sql == "" {
205204
return response, nil
206205
}
207206
}
207+
// 获取profile_event_type
208+
profile_event_type := qj["profile_event_type"].(string)
208209

209210
//app类型
210211
appType := queryText["appType"].(string)
@@ -219,6 +220,126 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
219220
debug = qj["debug"].(bool)
220221
}
221222

223+
if appType == "profiling" {
224+
// 请求数据
225+
tracingsqlRes, err := d.querier(ctx, appType, debug, "", sql, "", token, profile_event_type, requestUrl, fromTimeInt64, toTimeInt64)
226+
227+
if err != nil {
228+
return response, err
229+
}
230+
231+
// 获取列
232+
var columns []interface{}
233+
234+
if _, ok := tracingsqlRes.Result["columns"]; !ok {
235+
// 缺失columns
236+
return response, fmt.Errorf("the columns field is missing in the returned data grid")
237+
}
238+
columns = tracingsqlRes.Result["columns"].([]interface{})
239+
240+
// 获取值
241+
if _, ok := tracingsqlRes.Result["values"]; !ok {
242+
//缺失values
243+
return response, fmt.Errorf("the values field is missing in the returned data grid")
244+
}
245+
246+
if tracingsqlRes.Result["values"] == nil {
247+
// return response, fmt.Errorf("the values is null")
248+
//数据
249+
frame := data.NewFrame("response")
250+
251+
frame.Fields = append(frame.Fields,
252+
data.NewField("level", nil, []float64{0}),
253+
)
254+
frame.Fields = append(frame.Fields,
255+
data.NewField("value", nil, []float64{0}),
256+
)
257+
frame.Fields = append(frame.Fields,
258+
data.NewField("label", nil, []string{"_"}),
259+
)
260+
frame.Fields = append(frame.Fields,
261+
data.NewField("self", nil, []float64{0}),
262+
)
263+
response.Frames = append(response.Frames, frame)
264+
return response, nil
265+
}
266+
//
267+
dataAll := make(map[string]interface{})
268+
269+
values := tracingsqlRes.Result["values"].([]interface{})
270+
271+
for i := 0; i < len(values); i++ {
272+
subValue := values[i].([]interface{})
273+
if len(subValue) != len(columns) {
274+
// value的子值和字段长度不一致
275+
return response, fmt.Errorf(fmt.Sprintf("sub-value: %v and columns: %v have different lengths", subValue, columns))
276+
}
277+
for j := 0; j < len(columns); j++ {
278+
column := columns[j].(string)
279+
switch column {
280+
case "level", "total_value", "self_value":
281+
var filed_name string
282+
if column == "level" {
283+
filed_name = "level"
284+
} else if column == "total_value" {
285+
filed_name = "value"
286+
} else if column == "self_value" {
287+
filed_name = "self"
288+
}
289+
slice, ok := dataAll[filed_name].([]float64)
290+
if !ok {
291+
slice = []float64{}
292+
}
293+
if _, ok := subValue[j].(json.Number); ok {
294+
floatValue, err := subValue[j].(json.Number).Float64()
295+
if err != nil {
296+
return response, fmt.Errorf("unexpected type for %v, assertion failed for float64, type %T", subValue[j], subValue[j])
297+
298+
}
299+
dataAll[filed_name] = append(slice, floatValue)
300+
} else {
301+
return response, fmt.Errorf("unexpected type for %v, assertion failed, type %T", subValue[j], subValue[j])
302+
}
303+
case "function":
304+
filed_name := "label"
305+
// 断言为 []string
306+
slice, ok := dataAll[filed_name].([]string)
307+
if !ok {
308+
slice = []string{}
309+
}
310+
// 追加值并进行类型转换
311+
stringValue, ok := subValue[j].(string)
312+
if !ok {
313+
return response, fmt.Errorf("unexpected type for %v, expected string", subValue[j])
314+
}
315+
dataAll[filed_name] = append(slice, stringValue)
316+
}
317+
}
318+
}
319+
//记录日志
320+
//column和value 匹配后数据
321+
log.DefaultLogger.Info("__________The data after matching columns and value", columns, dataAll)
322+
323+
//数据
324+
frame := data.NewFrame("response")
325+
326+
frame.Fields = append(frame.Fields,
327+
data.NewField("level", nil, dataAll["level"]),
328+
)
329+
frame.Fields = append(frame.Fields,
330+
data.NewField("value", nil, dataAll["value"]),
331+
)
332+
frame.Fields = append(frame.Fields,
333+
data.NewField("label", nil, dataAll["label"]),
334+
)
335+
frame.Fields = append(frame.Fields,
336+
data.NewField("self", nil, dataAll["self"]),
337+
)
338+
response.Frames = append(response.Frames, frame)
339+
return response, nil
340+
}
341+
342+
//
222343
if appType == "appTracingFlame" {
223344

224345
if _, ok := qj["_id"]; !ok {
@@ -262,7 +383,7 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
262383
tagTranslate := make(map[string]interface{})
263384

264385
//获取l7_protocol 翻译
265-
l7_protocol, err := d.querier(ctx, debug, "flow_log", "show tag l7_protocol values from l7_flow_log", sources, token, requestUrl, fromTimeInt64, toTimeInt64)
386+
l7_protocol, err := d.querier(ctx, appType, debug, "flow_log", "show tag l7_protocol values from l7_flow_log", sources, "", token, requestUrl, fromTimeInt64, toTimeInt64)
266387

267388
if err != nil {
268389
return response, err
@@ -277,7 +398,7 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
277398
tagTranslate["l7_protocol"] = tag17Protocol
278399

279400
//获取response_status翻译
280-
response_status, err := d.querier(ctx, debug, "flow_log", "show tag response_status values from l7_flow_log", sources, token, requestUrl, fromTimeInt64, toTimeInt64)
401+
response_status, err := d.querier(ctx, appType, debug, "flow_log", "show tag response_status values from l7_flow_log", sources, "", token, requestUrl, fromTimeInt64, toTimeInt64)
281402
if err != nil {
282403
return response, err
283404
}
@@ -289,7 +410,7 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
289410
tagTranslate["response_status"] = tagResponseStatus
290411

291412
// 获取tap_side翻译
292-
tap_side, err := d.querier(ctx, debug, "flow_log", "show tag tap_side values from l7_flow_log", sources, token, requestUrl, fromTimeInt64, toTimeInt64)
413+
tap_side, err := d.querier(ctx, appType, debug, "flow_log", "show tag tap_side values from l7_flow_log", sources, token, "", requestUrl, fromTimeInt64, toTimeInt64)
293414
if err != nil {
294415
return response, err
295416
}
@@ -339,7 +460,7 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
339460
tracingWhereNew := strings.TrimSuffix(tracingWhere, " or ")
340461
tracingsql := sql + tracingWhereNew + " order by `start_time`"
341462
// 请求数据
342-
tracingsqlRes, err := d.querier(ctx, debug, "flow_log", tracingsql, sources, token, requestUrl, fromTimeInt64, toTimeInt64)
463+
tracingsqlRes, err := d.querier(ctx, appType, debug, "flow_log", tracingsql, sources, token, "", requestUrl, fromTimeInt64, toTimeInt64)
343464

344465
if err != nil {
345466
return response, err
@@ -503,7 +624,7 @@ func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, quer
503624
//
504625

505626
// 请求querier
506-
body, err := d.querier(ctx, debug, db, sql, sources, token, requestUrl, fromTimeInt64, toTimeInt64)
627+
body, err := d.querier(ctx, appType, debug, db, sql, sources, token, "", requestUrl, fromTimeInt64, toTimeInt64)
507628

508629
if err != nil {
509630
return response, err
@@ -1120,7 +1241,7 @@ func (d *Datasource) verifyParams(qj, queryText map[string]interface{}) (err err
11201241
}
11211242

11221243
// 三方querier接口查询
1123-
func (d *Datasource) querier(ctx context.Context, debug bool, db, sql, sources, token, requestUrl string, fromTimeInt64, toTimeInt64 int64) (res newtypes.ApiMetrics, err error) {
1244+
func (d *Datasource) querier(ctx context.Context, appType string, debug bool, db, sql, sources, token, profileEventType, requestUrl string, fromTimeInt64, toTimeInt64 int64) (res newtypes.ApiMetrics, err error) {
11241245

11251246
var body newtypes.ApiMetrics
11261247

@@ -1155,14 +1276,25 @@ func (d *Datasource) querier(ctx context.Context, debug bool, db, sql, sources,
11551276
data.Set("data_precision", sources)
11561277
}
11571278

1279+
if profileEventType != "" {
1280+
postData["profile_event_type"] = profileEventType
1281+
data.Set("profile_event_type", profileEventType)
1282+
}
1283+
11581284
// postDataMap, _ := json.Marshal(postData)
11591285
// StrPostData := string(postDataMap)
11601286
//请求querier接口
11611287
log.DefaultLogger.Info("__________request querier interface", "data", data)
11621288

11631289
//请求url
11641290
debugStr := strconv.FormatBool(debug)
1165-
querier := requestUrl + "/v1/query/?debug=" + debugStr
1291+
1292+
var querier string
1293+
if appType == "profiling" {
1294+
querier = requestUrl + "/v1/profile/ProfileGrafana?debug=" + debugStr
1295+
} else {
1296+
querier = requestUrl + "/v1/query/?debug=" + debugStr
1297+
}
11661298

11671299
req, err := http.NewRequestWithContext(ctx, http.MethodPost, querier, bytes.NewReader([]byte(data.Encode())))
11681300

@@ -1208,7 +1340,7 @@ func (d *Datasource) querier(ctx context.Context, debug bool, db, sql, sources,
12081340
}
12091341

12101342
// 记录日志
1211-
//log.DefaultLogger.Info("格式化后接口返回", "数据", body)
1343+
// log.DefaultLogger.Info("格式化后接口返回", "数据", body)
12121344

12131345
return body, nil
12141346
}
@@ -1250,7 +1382,7 @@ func (d *Datasource) CheckHealth(ctx context.Context, _ *backend.CheckHealthRequ
12501382
token = dsj["token"].(string)
12511383
}
12521384

1253-
_, err := d.querier(ctx, false, "", "show databases", "", token, requestUrl, 0, 0)
1385+
_, err := d.querier(ctx, "", false, "", "show databases", "", "", token, requestUrl, 0, 0)
12541386

12551387
if err != nil {
12561388
return newHealthCheckErrorf(err.Error()), nil

0 commit comments

Comments
 (0)