@@ -709,30 +709,67 @@ inline std::vector<dsn::http_result> get_metrics(const std::vector<node_desc> &n
709
709
return results;
710
710
}
711
711
712
+ // Adapt the result returned by `get_metrics` into the structure that could be processed by
713
+ // `remote_command`.
714
+ template <typename ... Args>
715
+ inline dsn::error_s process_get_metrics_result (const dsn::http_result &result,
716
+ const node_desc &node,
717
+ const char *what,
718
+ Args &&...args)
719
+ {
720
+ if (dsn_unlikely (!result.error ())) {
721
+ return FMT_ERR (result.error ().code (),
722
+ " ERROR: query {} metrics from node {} failed, msg={}" ,
723
+ fmt::format (what, std::forward<Args>(args)...),
724
+ node.hp ,
725
+ result.error ());
726
+ }
727
+
728
+ if (dsn_unlikely (result.status () != dsn::http_status_code::kOk )) {
729
+ return FMT_ERR (dsn::ERR_HTTP_ERROR,
730
+ " ERROR: query {} metrics from node {} failed, http_status={}, msg={}" ,
731
+ fmt::format (what, std::forward<Args>(args)...),
732
+ node.hp ,
733
+ dsn::get_http_status_message (result.status ()),
734
+ result.body ());
735
+ }
736
+
737
+ return dsn::error_s::ok ();
738
+ }
739
+
712
740
#define RETURN_SHELL_IF_GET_METRICS_FAILED (result, node, what, ...) \
713
741
do { \
714
- if (dsn_unlikely (!result.error ())) { \
715
- std::cout << " ERROR: send http request to query " << fmt::format (what, ##__VA_ARGS__) \
716
- << " metrics from node " << node.hp << " failed: " << result.error () \
717
- << std::endl; \
718
- return true ; \
719
- } \
720
- if (dsn_unlikely (result.status () != dsn::http_status_code::kOk )) { \
721
- std::cout << " ERROR: send http request to query " << what << " metrics from node " \
722
- << node.hp << " failed: " << dsn::get_http_status_message (result.status ()) \
723
- << std::endl \
724
- << result.body () << std::endl; \
742
+ const auto &res = process_get_metrics_result (result, node, what, ##__VA_ARGS__); \
743
+ if (dsn_unlikely (!res)) { \
744
+ fmt::println (res.description ()); \
725
745
return true ; \
726
746
} \
727
747
} while (0 )
728
748
749
+ // Adapt the result of some parsing operations on the metrics returned by `get_metrics` into the
750
+ // structure that could be processed by `remote_command`.
751
+ template <typename ... Args>
752
+ inline dsn::error_s process_parse_metrics_result (const dsn::error_s &result,
753
+ const node_desc &node,
754
+ const char *what,
755
+ Args &&...args)
756
+ {
757
+ if (dsn_unlikely (!result)) {
758
+ return FMT_ERR (result.code (),
759
+ " ERROR: {} metrics response from node {} failed, msg={}" ,
760
+ fmt::format (what, std::forward<Args>(args)...),
761
+ node.hp ,
762
+ result);
763
+ }
764
+
765
+ return dsn::error_s::ok ();
766
+ }
767
+
729
768
#define RETURN_SHELL_IF_PARSE_METRICS_FAILED (expr, node, what, ...) \
730
769
do { \
731
- const auto &res = (expr); \
770
+ const auto &res = process_parse_metrics_result (expr, node, what, ##__VA_ARGS__); \
732
771
if (dsn_unlikely (!res)) { \
733
- std::cout << " ERROR: parse " << fmt::format (what, ##__VA_ARGS__) \
734
- << " metrics response from node " << node.hp << " failed: " << res \
735
- << std::endl; \
772
+ fmt::println (res.description ()); \
736
773
return true ; \
737
774
} \
738
775
} while (0 )
@@ -832,37 +869,59 @@ class aggregate_stats_calcs
832
869
}
833
870
834
871
// Create the aggregations as needed.
872
+ DEF_CALC_CREATOR (assignments)
835
873
DEF_CALC_CREATOR (sums)
836
874
DEF_CALC_CREATOR (increases)
837
875
DEF_CALC_CREATOR (rates)
838
876
839
877
#undef DEF_CALC_CREATOR
840
878
879
+ #define CALC_ASSIGNMENT_STATS (entities ) \
880
+ do { \
881
+ if (_assignments) { \
882
+ RETURN_NOT_OK (_assignments->assign (entities)); \
883
+ } \
884
+ } while (0 )
885
+
841
886
#define CALC_ACCUM_STATS (entities ) \
842
887
do { \
843
888
if (_sums) { \
844
889
RETURN_NOT_OK (_sums->add_assign (entities)); \
845
890
} \
846
891
} while (0 )
847
892
848
- // Perform the chosen accum aggregations on the fetched metrics.
893
+ // Perform the chosen aggregations (both assignment and accum) on the fetched metrics.
849
894
dsn::error_s aggregate_metrics (const std::string &json_string)
850
895
{
851
896
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT (value, json_string, query_snapshot);
852
897
898
+ return aggregate_metrics (query_snapshot);
899
+ }
900
+
901
+ dsn::error_s aggregate_metrics (const dsn::metric_query_brief_value_snapshot &query_snapshot)
902
+ {
903
+ CALC_ASSIGNMENT_STATS (query_snapshot.entities );
853
904
CALC_ACCUM_STATS (query_snapshot.entities );
854
905
855
906
return dsn::error_s::ok ();
856
907
}
857
908
858
- // Perform all of the chosen aggregations (both accum and delta ) on the fetched metrics.
909
+ // Perform the chosen aggregations (assignement, accum, delta and rate ) on the fetched metrics.
859
910
dsn::error_s aggregate_metrics (const std::string &json_string_start,
860
911
const std::string &json_string_end)
861
912
{
862
913
DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES (
863
914
json_string_start, json_string_end, query_snapshot_start, query_snapshot_end);
864
915
865
- // Apply ending sample to the accum aggregations.
916
+ return aggregate_metrics (query_snapshot_start, query_snapshot_end);
917
+ }
918
+
919
+ dsn::error_s
920
+ aggregate_metrics (const dsn::metric_query_brief_value_snapshot &query_snapshot_start,
921
+ const dsn::metric_query_brief_value_snapshot &query_snapshot_end)
922
+ {
923
+ // Apply ending sample to the assignment and accum aggregations.
924
+ CALC_ASSIGNMENT_STATS (query_snapshot_end.entities );
866
925
CALC_ACCUM_STATS (query_snapshot_end.entities );
867
926
868
927
const std::array deltas_list = {&_increases, &_rates};
@@ -884,9 +943,12 @@ class aggregate_stats_calcs
884
943
885
944
#undef CALC_ACCUM_STATS
886
945
946
+ #undef CALC_ASSIGNMENT_STATS
947
+
887
948
private:
888
949
DISALLOW_COPY_AND_ASSIGN (aggregate_stats_calcs);
889
950
951
+ std::unique_ptr<aggregate_stats> _assignments;
890
952
std::unique_ptr<aggregate_stats> _sums;
891
953
std::unique_ptr<aggregate_stats> _increases;
892
954
std::unique_ptr<aggregate_stats> _rates;
@@ -1940,7 +2002,7 @@ get_table_stats(shell_context *sc, uint32_t sample_interval_ms, std::vector<row_
1940
2002
RETURN_SHELL_IF_PARSE_METRICS_FAILED (
1941
2003
calcs->aggregate_metrics (results_start[i].body (), results_end[i].body ()),
1942
2004
nodes[i],
1943
- " row data requests" );
2005
+ " aggregate row data requests" );
1944
2006
}
1945
2007
1946
2008
return true ;
@@ -1990,7 +2052,7 @@ inline bool get_partition_stats(shell_context *sc,
1990
2052
RETURN_SHELL_IF_PARSE_METRICS_FAILED (
1991
2053
calcs->aggregate_metrics (results_start[i].body (), results_end[i].body ()),
1992
2054
nodes[i],
1993
- " row data requests for table(id={})" ,
2055
+ " aggregate row data requests for table(id={})" ,
1994
2056
table_id);
1995
2057
}
1996
2058
0 commit comments