Skip to content

Commit 78a9fa1

Browse files
committed
Allow filtering /metrics/detailed queue metrics by queue name
The `/metrics/detailed` endpoint now accepts an optional `queue` query parameter whose value is an Erlang-compatible regular expression. When provided, only queues whose names match the regex are included in the response. This applies to all queue-related metric families (queue_coarse_metrics, queue_metrics, queue_consumer_count, queue_delivery_metrics, queue_exchange_metrics, stream_consumer_metrics, ra_metrics) as well as the queue_info metric. The filter can be combined with the existing `vhost` filter. Examples: /metrics/detailed?family=queue_coarse_metrics&queue=^quorum- /metrics/detailed?family=queue_metrics&vhost=prod&queue=^orders- This is useful for deployments with a large number of queues where only a subset needs to be monitored. For example, a cluster with thousands of classic queues but only a handful of quorum queues following a naming convention can now scrape metrics for just those quorum queues, significantly reducing the metrics payload size, Prometheus scrape time, and storage costs. Only one `queue` parameter is accepted per request. Providing multiple values returns HTTP 400. An invalid regex also returns HTTP 400 with a descriptive error message.
1 parent 8a1f228 commit 78a9fa1

4 files changed

Lines changed: 267 additions & 64 deletions

File tree

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 110 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -290,47 +290,47 @@ register() ->
290290
deregister_cleanup(_) -> ok.
291291

292292
collect_mf('detailed', Callback) ->
293+
VHostsFilter = vhosts_filter_from_pdict(),
294+
QueueFilter = queue_filter_from_pdict(),
293295
IncludedMFs = enabled_mfs_from_pdict(?METRICS_RAW),
294-
collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), IncludedMFs, Callback),
295-
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
296-
%% the detailed endpoint should emit queue_info only if queue metrics were requested
296+
collect(true, ?DETAILED_METRIC_NAME_PREFIX, VHostsFilter, QueueFilter, IncludedMFs, Callback),
297+
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, VHostsFilter, QueueFilter, enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
297298
MFs = proplists:get_keys(IncludedMFs),
298299
case lists:member(queue_coarse_metrics, MFs) orelse
299300
lists:member(queue_consumer_count, MFs) orelse
300301
lists:member(queue_metrics, MFs) of
301302
true ->
302-
emit_queue_info(?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), Callback);
303+
emit_queue_info(?DETAILED_METRIC_NAME_PREFIX, VHostsFilter, QueueFilter, Callback);
303304
false -> ok
304305
end,
305-
%% identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
306306
emit_identity_info(<<"detailed">>, Callback),
307307
ok;
308308
collect_mf('per-object', Callback) ->
309-
collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
309+
collect(true, ?METRIC_NAME_PREFIX, false, false, ?METRICS_RAW, Callback),
310310
totals(Callback),
311-
emit_queue_info(?METRIC_NAME_PREFIX, false, Callback),
311+
emit_queue_info(?METRIC_NAME_PREFIX, false, false, Callback),
312312
emit_identity_info(<<"per-object">>, Callback),
313313
ok;
314314
collect_mf('memory-breakdown', Callback) ->
315-
collect(false, ?METRIC_NAME_PREFIX, false, ?METRICS_MEMORY_BREAKDOWN, Callback),
315+
collect(false, ?METRIC_NAME_PREFIX, false, false, ?METRICS_MEMORY_BREAKDOWN, Callback),
316316
emit_identity_info(<<"memory-breakdown">>, Callback),
317317
ok;
318318
collect_mf(_Registry, Callback) ->
319319
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
320-
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
320+
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, false, ?METRICS_RAW, Callback),
321321
totals(Callback),
322322
case PerObjectMetrics of
323323
true ->
324324
emit_identity_info(<<"per-object">>, Callback),
325-
emit_queue_info(?METRIC_NAME_PREFIX, false, Callback);
325+
emit_queue_info(?METRIC_NAME_PREFIX, false, false, Callback);
326326
false ->
327327
emit_identity_info(<<"aggregated">>, Callback)
328328
end,
329329
ok.
330330

331-
collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
331+
collect(PerObjectMetrics, Prefix, VHostsFilter, QueueFilter, IncludedMFs, Callback) ->
332332
_ = [begin
333-
Data = get_data(Table, PerObjectMetrics, VHostsFilter),
333+
Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueueFilter),
334334
mf(Callback, Prefix, Contents, Data)
335335
end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)],
336336
ok.
@@ -440,27 +440,33 @@ membership({Name, Node}, Members) ->
440440
membership(_, _Members) ->
441441
undefined.
442442

443-
emit_queue_info(Prefix, VHostsFilter, Callback) ->
443+
emit_queue_info(Prefix, VHostsFilter, QueueFilter, Callback) ->
444444
Help = <<"A metric with a constant '1' value and labels that provide some queue details">>,
445445
QInfos = lists:foldl(
446446
fun(Q, Acc) ->
447447
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
448448
case is_map(VHostsFilter) andalso maps:get(VHost, VHostsFilter) == false of
449-
true -> Acc;
449+
true ->
450+
Acc;
450451
false ->
451-
Type = amqqueue:get_type(Q),
452-
Members = rabbit_queue_type:get_nodes(Q),
453-
case membership(amqqueue:get_pid(Q), Members) of
454-
not_a_member ->
452+
case matches_queue_filter(Name, QueueFilter) of
453+
false ->
455454
Acc;
456-
Membership ->
457-
QInfo = [
458-
{vhost, VHost},
459-
{queue, Name},
460-
{queue_type, Type},
461-
{membership, Membership}
462-
],
463-
[{QInfo, 1}|Acc]
455+
true ->
456+
Type = amqqueue:get_type(Q),
457+
Members = rabbit_queue_type:get_nodes(Q),
458+
case membership(amqqueue:get_pid(Q), Members) of
459+
not_a_member ->
460+
Acc;
461+
Membership ->
462+
QInfo = [
463+
{vhost, VHost},
464+
{queue, Name},
465+
{queue_type, Type},
466+
{membership, Membership}
467+
],
468+
[{QInfo, 1}|Acc]
469+
end
464470
end
465471
end
466472
end, [], rabbit_amqqueue:list()),
@@ -635,7 +641,7 @@ emit_gauge_metric_if_defined(Labels, Value) ->
635641
gauge_metric(Labels, Value)
636642
end.
637643

638-
get_data(connection_metrics = Table, false, _) ->
644+
get_data(connection_metrics = Table, false, _, _) ->
639645
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
640646
{T,
641647
sum(proplists:get_value(recv_cnt, Props), A1),
@@ -644,7 +650,7 @@ get_data(connection_metrics = Table, false, _) ->
644650
sum(proplists:get_value(channels, Props), A4)}
645651
end, empty(Table), Table),
646652
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
647-
get_data(channel_metrics = Table, false, _) ->
653+
get_data(channel_metrics = Table, false, _, _) ->
648654
{Table, A1, A2, A3, A4, A5, A6, A7} =
649655
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
650656
{T,
@@ -659,7 +665,7 @@ get_data(channel_metrics = Table, false, _) ->
659665
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
660666
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
661667
{global_prefetch_count, A7}]}];
662-
get_data(stream_consumer_metrics = MF, false, _) ->
668+
get_data(stream_consumer_metrics = MF, false, _, _) ->
663669
Table = rabbit_stream_consumer_created, %% real table name
664670
try ets:foldl(fun({_, Props}, OldMax) ->
665671
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
@@ -670,7 +676,7 @@ get_data(stream_consumer_metrics = MF, false, _) ->
670676
%% rabbitmq_stream plugin is not enabled
671677
[]
672678
end;
673-
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
679+
get_data(queue_consumer_count = MF, false, VHostsFilter, _) ->
674680
Table = queue_metrics, %% Real table name
675681
{_, A1} = ets:foldl(fun
676682
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
@@ -681,7 +687,7 @@ get_data(queue_consumer_count = MF, false, VHostsFilter) ->
681687
}
682688
end, empty(MF), Table),
683689
[{Table, [{consumers, A1}]}];
684-
get_data(queue_metrics = Table, false, VHostsFilter) ->
690+
get_data(queue_metrics = Table, false, VHostsFilter, _) ->
685691
{Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15} =
686692
ets:foldl(fun
687693
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
@@ -695,7 +701,7 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
695701
{messages_bytes_persistent, A9}, {message_bytes, A10},
696702
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
697703
{disk_reads, A13}, {disk_writes, A14}, {segments, A15}]}];
698-
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
704+
get_data(Table, false, VHostsFilter, _) when Table == channel_exchange_metrics;
699705
Table == queue_coarse_metrics;
700706
Table == queue_delivery_metrics;
701707
Table == channel_queue_metrics;
@@ -728,7 +734,7 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
728734
{T, V1 + A1, V2 + A2, V3 + A3, V4 + A4, V5 + A5, V6 + A6, V7 + A7}
729735
end, empty(Table), Table),
730736
[Result];
731-
get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)->
737+
get_data(exchange_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter)->
732738
ets:foldl(fun
733739
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
734740
map_get(VHost, VHostsFilter)
@@ -737,45 +743,65 @@ get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)
737743
(_Row, Acc) ->
738744
Acc
739745
end, [], Table);
740-
get_data(queue_delivery_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
746+
get_data(queue_delivery_metrics = Table, true, VHostsFilter, QueueFilter) when is_map(VHostsFilter) ->
741747
ets:foldl(fun
742-
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when
748+
({#resource{kind = queue, virtual_host = VHost, name = QName}, _, _, _, _, _, _, _, _} = Row, Acc) when
743749
map_get(VHost, VHostsFilter)
744750
->
745-
[Row | Acc];
751+
case matches_queue_filter(QName, QueueFilter) of
752+
true -> [Row | Acc];
753+
false -> Acc
754+
end;
746755
(_Row, Acc) ->
747756
Acc
748757
end, [], Table);
749-
get_data(queue_exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
758+
get_data(queue_exchange_metrics = Table, true, VHostsFilter, QueueFilter) when is_map(VHostsFilter) ->
750759
ets:foldl(fun
751760
({{
752-
#resource{kind = queue, virtual_host = VHost},
761+
#resource{kind = queue, virtual_host = VHost, name = QName},
753762
#resource{kind = exchange, virtual_host = VHost}
754763
}, _, _} = Row, Acc) when
755764
map_get(VHost, VHostsFilter)
756765
->
757-
[Row | Acc];
766+
case matches_queue_filter(QName, QueueFilter) of
767+
true -> [Row | Acc];
768+
false -> Acc
769+
end;
758770
(_Row, Acc) ->
759771
Acc
760772
end, [], Table);
761-
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
773+
get_data(queue_coarse_metrics = Table, true, VHostsFilter, QueueFilter) when is_map(VHostsFilter) ->
762774
ets:foldl(fun
763-
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
764-
[Row|Acc];
775+
({#resource{kind = queue, virtual_host = VHost, name = QName}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
776+
case matches_queue_filter(QName, QueueFilter) of
777+
true -> [Row|Acc];
778+
false -> Acc
779+
end;
765780
(_, Acc) ->
766781
Acc
767782
end, [], Table);
768-
get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
783+
get_data(MF, true, VHostsFilter, QueueFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
769784
Table = queue_metrics,
770785
ets:foldl(fun
771-
({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
772-
[Row|Acc];
786+
({#resource{kind = queue, virtual_host = VHost, name = QName}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
787+
case matches_queue_filter(QName, QueueFilter) of
788+
true -> [Row|Acc];
789+
false -> Acc
790+
end;
773791
(_, Acc) ->
774792
Acc
775793
end, [], Table);
776-
get_data(queue_consumer_count, true, _) ->
777-
ets:tab2list(queue_metrics);
778-
get_data(stream_consumer_metrics, true, _) ->
794+
get_data(queue_consumer_count, true, _, QueueFilter) ->
795+
filter_by_queue_name(ets:tab2list(queue_metrics), QueueFilter);
796+
get_data(queue_delivery_metrics = Table, true, _, QueueFilter) ->
797+
filter_by_queue_name(ets:tab2list(Table), QueueFilter);
798+
get_data(queue_coarse_metrics = Table, true, _, QueueFilter) ->
799+
filter_by_queue_name(ets:tab2list(Table), QueueFilter);
800+
get_data(MF, true, _, QueueFilter) when MF == queue_metrics ->
801+
filter_by_queue_name(ets:tab2list(queue_metrics), QueueFilter);
802+
get_data(queue_exchange_metrics = Table, true, _, QueueFilter) ->
803+
filter_by_queue_name(ets:tab2list(Table), QueueFilter);
804+
get_data(stream_consumer_metrics, true, _, QueueFilter) ->
779805
Table = rabbit_stream_consumer_created, %% real table name
780806
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
781807
Value = proplists:get_value(offset_lag, Props, 0),
@@ -786,19 +812,20 @@ get_data(stream_consumer_metrics, true, _) ->
786812
Map0)
787813
end, #{}, Table) of
788814
Map1 ->
789-
maps:to_list(Map1)
815+
Rows = maps:to_list(Map1),
816+
filter_by_queue_name(Rows, QueueFilter)
790817
catch error:badarg ->
791818
%% rabbitmq_stream plugin is not enabled
792819
[]
793820
end;
794-
get_data(vhost_status, _, _) ->
821+
get_data(vhost_status, _, _, _) ->
795822
[ { #{<<"vhost">> => VHost},
796823
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
797824
true -> 1;
798825
false -> 0
799826
end}
800827
|| VHost <- rabbit_vhost:list() ];
801-
get_data(node_memory, _, _) ->
828+
get_data(node_memory, _, _, _) ->
802829
BreakdownPL = rabbit_vm:memory(),
803830
KeysOfInterest = [
804831
code,
@@ -830,7 +857,7 @@ get_data(node_memory, _, _) ->
830857
],
831858
Data = maps:to_list(maps:with(KeysOfInterest, maps:from_list(BreakdownPL))),
832859
[{node_memory, Data}];
833-
get_data(exchange_bindings, _, _) ->
860+
get_data(exchange_bindings, _, _, _) ->
834861
Exchanges = lists:foldl(fun
835862
(#exchange{internal = true}, Acc) ->
836863
Acc;
@@ -854,7 +881,7 @@ get_data(exchange_bindings, _, _) ->
854881
[{<<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", Type/binary, "\"">>,
855882
Bindings}|Acc]
856883
end, [], WithCount);
857-
get_data(exchange_names, _, _) ->
884+
get_data(exchange_names, _, _, _) ->
858885
lists:foldl(fun
859886
(#exchange{internal = true}, Acc) ->
860887
Acc;
@@ -864,7 +891,7 @@ get_data(exchange_names, _, _) ->
864891
Label = <<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", (atom_to_binary(EType))/binary, "\"">>,
865892
[{Label, 1}|Acc]
866893
end, [], rabbit_exchange:list());
867-
get_data(Table, _, _) ->
894+
get_data(Table, _, _, _) ->
868895
ets:tab2list(Table).
869896

870897

@@ -925,3 +952,32 @@ vhosts_filter_from_pdict() ->
925952
Enabled = maps:from_list([ {VHost, true} || VHost <- L ]),
926953
maps:merge(All, Enabled)
927954
end.
955+
956+
queue_filter_from_pdict() ->
957+
case get(prometheus_queue_filter) of
958+
undefined -> false;
959+
MP -> MP
960+
end.
961+
962+
matches_queue_filter(_QName, false) ->
963+
true;
964+
matches_queue_filter(QName, MP) ->
965+
re:run(QName, MP, [{capture, none}]) =:= match.
966+
967+
filter_by_queue_name(Rows, false) ->
968+
Rows;
969+
filter_by_queue_name(Rows, QueueFilter) ->
970+
lists:filter(fun(Row) -> row_matches_queue_filter(Row, QueueFilter) end, Rows).
971+
972+
row_matches_queue_filter({#resource{kind = queue, name = QName}, _}, QF) ->
973+
matches_queue_filter(QName, QF);
974+
row_matches_queue_filter({#resource{kind = queue, name = QName}, _, _}, QF) ->
975+
matches_queue_filter(QName, QF);
976+
row_matches_queue_filter({#resource{kind = queue, name = QName}, _, _, _, _}, QF) ->
977+
matches_queue_filter(QName, QF);
978+
row_matches_queue_filter({#resource{kind = queue, name = QName}, _, _, _, _, _, _, _, _}, QF) ->
979+
matches_queue_filter(QName, QF);
980+
row_matches_queue_filter({{#resource{kind = queue, name = QName}, #resource{kind = exchange}}, _, _}, QF) ->
981+
matches_queue_filter(QName, QF);
982+
row_matches_queue_filter(_, _) ->
983+
true.

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,15 @@ collect_detailed_metrics(Prefix, Callback) ->
9696
false
9797
end
9898
end,
99-
collect_all_matching_metrics(Prefix, Callback, VHostFilterFun).
99+
QueueFilter = get(prometheus_queue_filter),
100+
collect_all_matching_metrics(Prefix, Callback, VHostFilterFun, QueueFilter).
100101

101-
collect_all_matching_metrics(Prefix, Callback, VHostFilterFun) ->
102+
collect_all_matching_metrics(Prefix, Callback, VHostFilterFun, QueueFilter) ->
102103
maps:foreach(
103104
fun(Name, #{type := Type, help := Help, values := Values0}) ->
104-
Values = maps:filter(fun(#{vhost := V}, _) ->
105-
VHostFilterFun(V);
106-
(_, _) -> true
105+
Values = maps:filter(fun(Labels, _) ->
106+
matches_vhost(Labels, VHostFilterFun)
107+
andalso matches_queue(Labels, QueueFilter)
107108
end, Values0),
108109
Callback(
109110
create_mf(<<Prefix/binary, (prometheus_model_helpers:metric_name(Name))/binary>>,
@@ -116,6 +117,16 @@ collect_all_matching_metrics(Prefix, Callback, VHostFilterFun) ->
116117
metrics => all,
117118
filter_fun => VHostFilterFun})).
118119

120+
matches_vhost(#{vhost := V}, VHostFilterFun) ->
121+
VHostFilterFun(V);
122+
matches_vhost(_, _) ->
123+
true.
124+
125+
matches_queue(#{queue := Q}, QueueFilter) when QueueFilter =/= undefined ->
126+
re:run(Q, QueueFilter, [{capture, none}]) =:= match;
127+
matches_queue(_, _) ->
128+
true.
129+
119130
collect_max_values(Prefix, Callback) ->
120131
%% max values for QQ metrics
121132
%% eg.

0 commit comments

Comments
 (0)