@@ -15,7 +15,6 @@ import (
1515 "github.com/vkcom/statshouse/internal/data_model/gen2/tlmetadata"
1616 "github.com/vkcom/statshouse/internal/data_model/gen2/tlstatshouse"
1717 "github.com/vkcom/statshouse/internal/format"
18- "github.com/vkcom/statshouse/internal/pcache"
1918 "github.com/vkcom/statshouse/internal/vkgo/rpc"
2019 "github.com/vkcom/statshouse/internal/vkgo/srvfunc"
2120)
@@ -101,70 +100,6 @@ func (s *Agent) LoadMetaMetricJournal(ctxParent context.Context, version int64,
101100 return ret .Events , ret .CurrentVersion , nil
102101}
103102
104- func (s * Agent ) LoadOrCreateMapping (ctxParent context.Context , key string , floodLimitKey interface {}) (pcache.Value , time.Duration , error ) {
105- extra := rpc.InvokeReqExtra {FailIfNoConnection : true }
106- // Use 2 alive random aggregators for mapping
107- s0 , s1 := s .getRandomLiveShardReplicas ()
108- if s0 == nil {
109- s .AddValueCounter (0 , format .BuiltinMetricMetaAgentMapping ,
110- []int32 {0 , format .TagValueIDAggMappingMetaMetrics , format .TagValueIDAgentMappingStatusAllDead },
111- 0 , 1 )
112- return nil , 0 , fmt .Errorf ("all aggregators are dead" )
113- }
114- now := time .Now ()
115-
116- args := tlstatshouse.GetTagMapping2 {
117- Key : key ,
118- }
119- s0 .fillProxyHeader (& args .FieldsMask , & args .Header )
120- args .SetCreate (true )
121-
122- if floodLimitKey != nil {
123- // cache passes nil floodLimitKey when updating existing records, so in theory, we will never need to actually create key
124- // when extra is nil. But if we attempt to do it, will record attempts in common key for all metrics.
125- e := floodLimitKey .(format.CreateMappingExtra )
126- args .Metric = e .Metric
127- args .ClientEnv = e .ClientEnv
128- args .TagIdKey = e .TagIDKey
129- }
130-
131- var ret tlstatshouse.GetTagMappingResult
132-
133- ctx , cancel := context .WithTimeout (ctxParent , data_model .AgentMappingTimeout1 )
134- defer cancel ()
135- s0client := s0 .client ()
136- err := s0client .GetTagMapping2 (ctx , args , & extra , & ret )
137- if err == nil {
138- s .AddValueCounter (0 , format .BuiltinMetricMetaAgentMapping ,
139- []int32 {0 , format .TagValueIDAggMappingMetaMetrics , format .TagValueIDAgentMappingStatusOKFirst },
140- time .Since (now ).Seconds (), 1 )
141- return pcache .Int32ToValue (ret .Value ), time .Duration (ret .TtlNanosec ), nil
142- }
143- if s1 == nil {
144- s .AddValueCounter (0 , format .BuiltinMetricMetaAgentMapping ,
145- []int32 {0 , format .TagValueIDAggMappingMetaMetrics , format .TagValueIDAgentMappingStatusErrSingle },
146- time .Since (now ).Seconds (), 1 )
147- return nil , 0 , fmt .Errorf ("the only live aggregator %q returned error: %w" , s0client .Address , err )
148- }
149-
150- s1 .fillProxyHeader (& args .FieldsMask , & args .Header )
151-
152- ctx2 , cancel2 := context .WithTimeout (ctxParent , data_model .AgentMappingTimeout2 )
153- defer cancel2 ()
154- s1client := s1 .client ()
155- err2 := s1client .GetTagMapping2 (ctx2 , args , & extra , & ret )
156- if err2 == nil {
157- s .AddValueCounter (0 , format .BuiltinMetricMetaAgentMapping ,
158- []int32 {0 , format .TagValueIDAggMappingMetaMetrics , format .TagValueIDAgentMappingStatusOKSecond },
159- time .Since (now ).Seconds (), 1 )
160- return pcache .Int32ToValue (ret .Value ), time .Duration (ret .TtlNanosec ), nil
161- }
162- s .AddValueCounter (0 , format .BuiltinMetricMetaAgentMapping ,
163- []int32 {0 , format .TagValueIDAggMappingMetaMetrics , format .TagValueIDAgentMappingStatusErrBoth },
164- time .Since (now ).Seconds (), 1 )
165- return nil , 0 , fmt .Errorf ("two live aggregators %q %q returned errors: %v %w" , s0client .Address , s1client .Address , err , err2 )
166- }
167-
168103func (s * Agent ) GetTagMappingBootstrap (ctxParent context.Context ) ([]tlstatshouse.Mapping , time.Duration , error ) {
169104 extra := rpc.InvokeReqExtra {FailIfNoConnection : true }
170105 // Use 2 alive random aggregators for mapping
0 commit comments