|
| 1 | +-module(prometheus_rabbitmq_core_metrics_collector). |
| 2 | +-export([register/0, |
| 3 | + register/1, |
| 4 | + deregister_cleanup/1, |
| 5 | + collect_mf/2, |
| 6 | + collect_metrics/2]). |
| 7 | + |
| 8 | +-import(prometheus_model_helpers, [create_mf/4, |
| 9 | + create_mf/5, |
| 10 | + label_pairs/1, |
| 11 | + gauge_metrics/1, |
| 12 | + gauge_metric/1, |
| 13 | + gauge_metric/2, |
| 14 | + counter_metric/1, |
| 15 | + counter_metric/2, |
| 16 | + untyped_metric/1, |
| 17 | + untyped_metric/2]). |
| 18 | + |
| 19 | +-include("prometheus_rabbitmq_exporter.hrl"). |
| 20 | +-behaviour(prometheus_collector). |
| 21 | + |
| 22 | +-define(METRIC_NAME_PREFIX, "rabbitmq_core_"). |
| 23 | + |
| 24 | +-define(METRICS, |
| 25 | + [ |
| 26 | + {connection_coarse_metrics, [{2, connection_recv_oct, counter, "Count of octects received on the connection."}, |
| 27 | + {3, connection_send_oct, counter, "Count of octects sent on the connection."}, |
| 28 | + {4, connection_reductions, counter, "Count of reductions that take place on the queue process."} |
| 29 | + ]}, |
| 30 | + {queue_coarse_metrics, [{2, queue_messages_ready, gauge, "Number of messages ready to be delivered to clients."}, |
| 31 | + {3, queue_messages_unacknowledge, gauge, "Number of messages delivered to clients but not yet acknowledged."}, |
| 32 | + {4, queue_messages, gauge, "Sum of ready and unacknowledged messages (queue depth)."}, |
| 33 | + {5, queue_reductions, counter, "Count of reductions that take place on the queue process."} |
| 34 | + ]}, |
| 35 | + {channel_exchange_metrics, [{2, channel_exchange_publish, gauge, "Count of messages published."}, |
| 36 | + {3, channel_exchange_confirm, gauge, "Count of messages confirmed."}, |
| 37 | + {4, channel_exchange_return_unroutable, gauge, "Count of messages returned to publisher as unroutable."} |
| 38 | + ]}, |
| 39 | + {channel_process_metrics, [{2, channel_process_reductions, counter, "Count of reductions that take place on the channel process."} |
| 40 | + ]}, |
| 41 | + {queue_metrics, [{2, queue_disk_reads, gauge, "Total number of times messages have been read from disk by this queue.", disk_reads}, |
| 42 | + {2, queue_disk_writes, gauge, "Total number of times messages have been written to disk by this queue.", disk_writes} |
| 43 | + ]}, |
| 44 | + {node_persister_metrics, |
| 45 | + [{2, node_io_read_count, counter, "Read operations since node start.", io_read_count}, |
| 46 | + {2, node_io_read_bytes, gauge, "Bytes read since node start.", io_read_bytes}, |
| 47 | + {2, node_io_read_time, gauge, "Total time of read operations.", io_read_time}, |
| 48 | + {2, node_io_write_count, counter, "Write operations since node start.", io_write_count}, |
| 49 | + {2, node_io_write_bytes, gauge, "Bytes written since node start.", io_write_bytes}, |
| 50 | + {2, node_io_write_time, gauge, "Total time of write operations.", io_write_time}, |
| 51 | + {2, node_io_sync_count, counter, "Sync operations since node start.", io_sync_count}, |
| 52 | + {2, node_io_sync_time, gauge, "Total time of sync operations.", io_sync_time}, |
| 53 | + {2, node_io_seek_count, counter, "Seek operations since node start.", io_seek_count}, |
| 54 | + {2, node_io_seek_time, gauge, "Total time of seek operations.", io_seek_time}, |
| 55 | + {2, node_io_reopen_count, counter, "Times files have been reopened by the file handle cache.", io_reopen_count}, |
| 56 | + {2, node_mnesia_ram_tx_count, counter, "Mnesia transactions in RAM since node start.", |
| 57 | + mnesia_ram_tx_count}, |
| 58 | + {2, node_mnesia_disk_tx_count, counter, "Mnesia transactions in disk since node start.", |
| 59 | + mnesia_disk_tx_count}, |
| 60 | + {2, node_msg_store_read_count, counter, "Read operations in the message store since node start.", |
| 61 | + msg_store_read_count}, |
| 62 | + {2, node_msg_store_write_count, counter, "Write operations in the message store since node start.", |
| 63 | + msg_store_write_count}, |
| 64 | + {2, queue_index_journal_write_count, counter, "Write operations in the queue index journal since node start.", queue_index_journal_write_count}, |
| 65 | + {2, queue_index_write_count, counter, "Queue index write operations since node start.", index_write_count}, |
| 66 | + {2, queue_index_read_count, counter, "Queue index read operations since node start.", index_read_count}, |
| 67 | + {2, queue_io_file_handle_open_attempt_count, counter, "File descriptor open attempts.", |
| 68 | + io_file_handle_open_attempt_count}, |
| 69 | + {2, queue_io_file_handle_open_attempt_time, gauge, "Total time of file descriptor open attempts.", |
| 70 | + io_file_handle_open_attempt_time} |
| 71 | + ]}, |
| 72 | + {node_coarse_metrics, |
| 73 | + [{2, node_fd_used, gauge, "File descriptors used.", fd_used}, |
| 74 | + {2, node_sockets_used, gauge, "Sockets used.", sockets_used}, |
| 75 | + {2, node_mem_used, gauge, "Memory used in bytes.", mem_used}, |
| 76 | + {2, node_disk_free, gauge, "Disk free in bytes.", disk_free}, |
| 77 | + {2, node_proc_used, gauge, "Erlang processes used.", proc_used}, |
| 78 | + {2, node_gc_num, counter, "GC runs.", gc_num}, |
| 79 | + {2, node_gc_bytes_reclaimed, counter, "Bytes reclaimed by GC.", gc_bytes_reclaimed}, |
| 80 | + {2, node_context_switches, counter, "Context switches since node start.", context_switches} |
| 81 | + ]}, |
| 82 | + {connection_churn_metrics, |
| 83 | + [{2, connection_created, counter, "Connections created."}, |
| 84 | + {3, connection_closed, counter, "Connections closed."}, |
| 85 | + {4, channel_created, counter, "Channels created."}, |
| 86 | + {5, channel_closed, counter, "Channels closed."}, |
| 87 | + {6, queue_declared, counter, "Queues declared."}, |
| 88 | + {7, queue_deleted, counter, "Queues deleted."} |
| 89 | + ]}, |
| 90 | + {node_node_metrics, |
| 91 | + [{2, node_node_send_bytes, counter, "Count of bytes sent to node.", send_bytes}, |
| 92 | + {2, node_node_recv_bytes, counter, "Count of bytes received from node.", recv_bytes} |
| 93 | + ]}, |
| 94 | + {channel_queue_metrics, |
| 95 | + [{2, channel_queue_get, counter, "Count of messages delivered in acknowledgement mode in response to basic.get.", get}, |
| 96 | + {2, channel_queue_get_no_ack, counter, "Count of messages delivered in no-acknowledgement mode in response to basic.get.", get_no_ack}, |
| 97 | + {2, channel_queue_deliver, counter, "Count of messages delivered in acknowledgement mode to consumers.", deliver}, |
| 98 | + {2, channel_queue_deliver_no_ack, counter, "Count of messages delivered in no-acknowledgement mode to consumers.", deliver_no_ack}, |
| 99 | + {2, channel_queue_redeliver, counter, "Count of subset of delivered messages which had the redelivered flag set.", redeliver}, |
| 100 | + {2, channel_queue_ack, counter, "Count of messages acknowledged.", ack}, |
| 101 | + {2, channel_queue_get_empty, counter, "Count of basic.get operations on empty queues.", get_empty} |
| 102 | + ]}, |
| 103 | + {channel_metrics, |
| 104 | + [{2, channel_consumer_count, gauge, "Consumers count.", consumer_count}, |
| 105 | + {2, channel_messages_unacknowledged, gauge, "Count of messages unacknowledged.", messages_unacknowledged}, |
| 106 | + {2, channel_messages_unconfirmed, gauge, "Count of messages unconfirmed.", messages_unconfirmed}, |
| 107 | + {2, channel_messages_uncommited, gauge, "Count of messages uncommited.", messages_uncommited}, |
| 108 | + {2, channel_messages_prefetch_count, gauge, "Limit to the number of unacknowledged messages on every connection on a channel.", prefetch_count}, |
| 109 | + {2, channel_messages_global_prefetch_count, gauge, "Global limit to the number of unacknowledged messages shared between all connections on a channel.", global_prefetch_count} |
| 110 | + ]}, |
| 111 | + {connection_metrics, |
| 112 | + [{2, connection_recv_count, counter, "Count of bytes received on the connection.", recv_count}, |
| 113 | + {2, connection_send_count, counter, "Count of bytes send on the connection.", send_count} |
| 114 | + ]}, |
| 115 | + {node_metrics, |
| 116 | + [{2, node_fd_total, gauge, "File descriptors available.", fd_total}, |
| 117 | + {2, node_sockets_total, gauge, "Sockets available.", sockets_total}, |
| 118 | + {2, node_mem_limit, gauge, "Memory usage high watermark.", mem_limit}, |
| 119 | + {2, node_disk_free_limit, gauge, "Free disk space low watermark.", disk_free_limit}, |
| 120 | + {2, node_proc_total, gauge, "Erlang processes limit.", proc_total}, |
| 121 | + {2, node_uptime, gauge, "Time in milliseconds since node start.", uptime}, |
| 122 | + {2, node_run_queue, gauge, "Runtime run queue.", run_queue}, |
| 123 | + {2, node_processors, gauge, "Logical processors.", processors}, |
| 124 | + {2, node_net_ticktime, gauge, "Periodic tick interval between all pairs of nodes to maintain the connections and to detect disconnections.", net_ticktime} |
| 125 | + ]}, |
| 126 | + {channel_queue_exchange_metrics, |
| 127 | + [{2, channel_queue_exchange_publish, counter, "Count of messages published."} |
| 128 | + ]} |
| 129 | + ]). |
| 130 | + |
| 131 | +-define(TOTALS, [ |
| 132 | + {connection_created, connections, gauge, "RabbitMQ Connections count."}, |
| 133 | + {channel_created, channels, gauge, "RabbitMQ Channels count."}, |
| 134 | + {consumer_created, consumers, gauge, "RabbitMQ Consumers count."}, |
| 135 | + {queue_metrics, queues, gauge, "RabbitMQ Queues count."} |
| 136 | + ]). |
| 137 | + |
| 138 | +%%==================================================================== |
| 139 | +%% Collector API |
| 140 | +%%==================================================================== |
| 141 | + |
| 142 | +register() -> |
| 143 | + register(default). |
| 144 | + |
| 145 | +register(Registry) -> |
| 146 | + ok = prometheus_registry:register_collector(Registry, ?MODULE). |
| 147 | + |
| 148 | +deregister_cleanup(_) -> ok. |
| 149 | + |
| 150 | +collect_mf(_Registry, Callback) -> |
| 151 | + [begin |
| 152 | + Data = ets:tab2list(Table), |
| 153 | + mf(Callback, Contents, Data) |
| 154 | + end || {Table, Contents} <- ?METRICS], |
| 155 | + [begin |
| 156 | + Size = ets:info(Table, size), |
| 157 | + mf_totals(Callback, Name, Type, Help, Size) |
| 158 | + end || {Table, Name, Type, Help} <- ?TOTALS], |
| 159 | + ok. |
| 160 | + |
| 161 | +mf(Callback, Contents, Data) -> |
| 162 | + [begin |
| 163 | + Fun = fun(D) -> element(Index, D) end, |
| 164 | + Callback(create_mf(?METRIC_NAME(Name), Help, catch_boolean(Type), ?MODULE, |
| 165 | + {Type, Fun, Data})) |
| 166 | + end || {Index, Name, Type, Help} <- Contents], |
| 167 | + [begin |
| 168 | + Fun = fun(D) -> proplists:get_value(Key, element(Index, D)) end, |
| 169 | + Callback(create_mf(?METRIC_NAME(Name), Help, catch_boolean(Type), ?MODULE, |
| 170 | + {Type, Fun, Data})) |
| 171 | + end || {Index, Name, Type, Help, Key} <- Contents]. |
| 172 | + |
| 173 | +mf_totals(Callback, Name, Type, Help, Size) -> |
| 174 | + Callback(create_mf(?METRIC_NAME(Name), Help, catch_boolean(Type), Size)). |
| 175 | + |
| 176 | +collect_metrics(_, {Type, Fun, Items}) -> |
| 177 | + [metric(Type, labels(Item), Fun(Item)) || Item <- Items]. |
| 178 | + |
| 179 | +labels(Item) -> |
| 180 | + label(element(1, Item)). |
| 181 | + |
| 182 | +label(#resource{virtual_host = VHost, kind = exchange, name = Name}) -> |
| 183 | + [{vhost, VHost}, {exchange, Name}]; |
| 184 | +label(#resource{virtual_host = VHost, kind = queue, name = Name}) -> |
| 185 | + [{vhost, VHost}, {queue, Name}]; |
| 186 | +label({P, {#resource{virtual_host = QVHost, kind = queue, name = QName}, |
| 187 | + #resource{virtual_host = EVHost, kind = exchange, name = EName}}}) when is_pid(P) -> |
| 188 | + %% channel_queue_exchange_metrics {channel_id, {queue_id, exchange_id}} |
| 189 | + [{channel, P}, {queue_vhost, QVHost}, {queue_name, QName}, |
| 190 | + {exchange_vhost, EVHost}, {exchange_name, EName}]; |
| 191 | +label({I1, I2}) -> |
| 192 | + label(I1) ++ label(I2); |
| 193 | +label(P) when is_pid(P) -> |
| 194 | + [{channel, P}]; |
| 195 | +label(A) when is_atom(A) -> |
| 196 | + [{node, A}]. |
| 197 | + |
| 198 | +metric(counter, Labels, Value) -> |
| 199 | + emit_counter_metric_if_defined(Labels, Value); |
| 200 | +metric(gauge, Labels, Value) -> |
| 201 | + emit_gauge_metric_if_defined(Labels, Value); |
| 202 | +metric(untyped, Labels, Value) -> |
| 203 | + untyped_metric(Labels, Value); |
| 204 | +metric(boolean, Labels, Value0) -> |
| 205 | + Value = case Value0 of |
| 206 | + true -> 1; |
| 207 | + false -> 0; |
| 208 | + undefined -> undefined |
| 209 | + end, |
| 210 | + untyped_metric(Labels, Value). |
| 211 | + |
| 212 | + |
| 213 | +%%==================================================================== |
| 214 | +%% Private Parts |
| 215 | +%%==================================================================== |
| 216 | +catch_boolean(boolean) -> |
| 217 | + untyped; |
| 218 | + catch_boolean(T) -> |
| 219 | + T. |
| 220 | + |
| 221 | +emit_counter_metric_if_defined(Labels, Value) -> |
| 222 | + case Value of |
| 223 | + undefined -> undefined; |
| 224 | + '' -> |
| 225 | + counter_metric(Labels, undefined); |
| 226 | + Value -> |
| 227 | + counter_metric(Labels, Value) |
| 228 | + end. |
| 229 | + |
| 230 | +emit_gauge_metric_if_defined(Labels, Value) -> |
| 231 | + case Value of |
| 232 | + undefined -> undefined; |
| 233 | + '' -> |
| 234 | + gauge_metric(Labels, undefined); |
| 235 | + Value -> |
| 236 | + gauge_metric(Labels, Value) |
| 237 | + end. |
0 commit comments