@@ -164,6 +164,8 @@ type ClickHouseReader struct {
164
164
165
165
fluxIntervalForTraceDetail time.Duration
166
166
cache cache.Cache
167
+ metadataDB string
168
+ metadataTable string
167
169
}
168
170
169
171
// NewTraceReader returns a TraceReader for the database
@@ -259,6 +261,8 @@ func NewReaderFromClickhouseConnection(
259
261
260
262
fluxIntervalForTraceDetail : fluxIntervalForTraceDetail ,
261
263
cache : cache ,
264
+ metadataDB : options .primary .MetadataDB ,
265
+ metadataTable : options .primary .MetadataTable ,
262
266
}
263
267
}
264
268
@@ -4126,6 +4130,97 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
4126
4130
return & response , nil
4127
4131
}
4128
4132
4133
+ func (r * ClickHouseReader ) FetchRelatedValues (ctx context.Context , req * v3.FilterAttributeValueRequest ) ([]string , error ) {
4134
+ var andConditions []string
4135
+
4136
+ andConditions = append (andConditions , fmt .Sprintf ("unix_milli >= %d" , req .StartTimeMillis ))
4137
+ andConditions = append (andConditions , fmt .Sprintf ("unix_milli <= %d" , req .EndTimeMillis ))
4138
+
4139
+ if len (req .ExistingFilterItems ) != 0 {
4140
+ for _ , item := range req .ExistingFilterItems {
4141
+ // we only support string for related values
4142
+ if item .Key .DataType != v3 .AttributeKeyDataTypeString {
4143
+ continue
4144
+ }
4145
+
4146
+ var colName string
4147
+ switch item .Key .Type {
4148
+ case v3 .AttributeKeyTypeResource :
4149
+ colName = "resource_attributes"
4150
+ case v3 .AttributeKeyTypeTag :
4151
+ colName = "attributes"
4152
+ default :
4153
+ // we only support resource and tag for related values as of now
4154
+ continue
4155
+ }
4156
+ // IN doesn't make use of map value index, we convert it to = or !=
4157
+ operator := item .Operator
4158
+ if v3 .FilterOperator (strings .ToLower (string (item .Operator ))) == v3 .FilterOperatorIn {
4159
+ operator = "="
4160
+ } else if v3 .FilterOperator (strings .ToLower (string (item .Operator ))) == v3 .FilterOperatorNotIn {
4161
+ operator = "!="
4162
+ }
4163
+ addCondition := func (val string ) {
4164
+ andConditions = append (andConditions , fmt .Sprintf ("mapContains(%s, '%s') AND %s['%s'] %s %s" , colName , item .Key .Key , colName , item .Key .Key , operator , val ))
4165
+ }
4166
+ switch v := item .Value .(type ) {
4167
+ case string :
4168
+ fmtVal := utils .ClickHouseFormattedValue (v )
4169
+ addCondition (fmtVal )
4170
+ case []string :
4171
+ for _ , val := range v {
4172
+ fmtVal := utils .ClickHouseFormattedValue (val )
4173
+ addCondition (fmtVal )
4174
+ }
4175
+ case []interface {}:
4176
+ for _ , val := range v {
4177
+ fmtVal := utils .ClickHouseFormattedValue (val )
4178
+ addCondition (fmtVal )
4179
+ }
4180
+ }
4181
+ }
4182
+ }
4183
+ whereClause := strings .Join (andConditions , " AND " )
4184
+
4185
+ var selectColumn string
4186
+ switch req .TagType {
4187
+ case v3 .TagTypeResource :
4188
+ selectColumn = "resource_attributes" + "['" + req .FilterAttributeKey + "']"
4189
+ case v3 .TagTypeTag :
4190
+ selectColumn = "attributes" + "['" + req .FilterAttributeKey + "']"
4191
+ default :
4192
+ selectColumn = "attributes" + "['" + req .FilterAttributeKey + "']"
4193
+ }
4194
+
4195
+ filterSubQuery := fmt .Sprintf (
4196
+ "SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT 100" ,
4197
+ selectColumn ,
4198
+ r .metadataDB ,
4199
+ r .metadataTable ,
4200
+ whereClause ,
4201
+ )
4202
+ zap .L ().Debug ("filterSubQuery for related values" , zap .String ("query" , filterSubQuery ))
4203
+
4204
+ rows , err := r .db .Query (ctx , filterSubQuery )
4205
+ if err != nil {
4206
+ return nil , fmt .Errorf ("error while executing query: %s" , err .Error ())
4207
+ }
4208
+ defer rows .Close ()
4209
+
4210
+ var attributeValues []string
4211
+ for rows .Next () {
4212
+ var value string
4213
+ if err := rows .Scan (& value ); err != nil {
4214
+ return nil , fmt .Errorf ("error while scanning rows: %s" , err .Error ())
4215
+ }
4216
+ if value != "" {
4217
+ attributeValues = append (attributeValues , value )
4218
+ }
4219
+ }
4220
+
4221
+ return attributeValues , nil
4222
+ }
4223
+
4129
4224
func (r * ClickHouseReader ) GetLogAttributeValues (ctx context.Context , req * v3.FilterAttributeValueRequest ) (* v3.FilterAttributeValueResponse , error ) {
4130
4225
var err error
4131
4226
var filterValueColumn string
@@ -4227,6 +4322,13 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
4227
4322
}
4228
4323
}
4229
4324
4325
+ if req .IncludeRelated {
4326
+ relatedValues , _ := r .FetchRelatedValues (ctx , req )
4327
+ attributeValues .RelatedValues = & v3.FilterAttributeValueResponse {
4328
+ StringAttributeValues : relatedValues ,
4329
+ }
4330
+ }
4331
+
4230
4332
return & attributeValues , nil
4231
4333
4232
4334
}
@@ -4907,6 +5009,13 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.
4907
5009
}
4908
5010
}
4909
5011
5012
+ if req .IncludeRelated {
5013
+ relatedValues , _ := r .FetchRelatedValues (ctx , req )
5014
+ attributeValues .RelatedValues = & v3.FilterAttributeValueResponse {
5015
+ StringAttributeValues : relatedValues ,
5016
+ }
5017
+ }
5018
+
4910
5019
return & attributeValues , nil
4911
5020
}
4912
5021
0 commit comments