Skip to content

Commit dca1459

Browse files
zhihonlSwapneil Singh
authored and
Swapneil Singh
committed
filter_kubernetes: allow retrieving kubernetes metadata from CloudWatch Agent
Adds Use_Pod_association option. When running in a kubernetes container, setting this option to On vends an `entity` object with pod metadata from CWA to a cloudwatch_logs output with the new `add_entity` option set to true. (PR #2 in amazon-contributing/upstream-to-fluent-bit)
1 parent 6b3b67f commit dca1459

File tree

49 files changed

+2762
-19
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2762
-19
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
*~
55
_book/
66
lib/jemalloc
7+
cmake-build-debug/
78
tests/internal/flb_tests_internal.h
89
tests/runtime/flb_tests_runtime.h
10+
tests/internal/cmake-build-debug/
11+
tests/runtime/cmake-build-debug/
912
build/*
1013
include/fluent-bit/flb_info.h
1114
include/fluent-bit/flb_plugins.h

include/fluent-bit/flb_hash.h

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct flb_hash {
5555
int max_entries;
5656
int total_count;
5757
int cache_ttl;
58+
int force_remove_pointer;
5859
size_t size;
5960
struct mk_list entries;
6061
struct flb_hash_table *table;
@@ -63,6 +64,8 @@ struct flb_hash {
6364
struct flb_hash *flb_hash_create(int evict_mode, size_t size, int max_entries);
6465
struct flb_hash *flb_hash_create_with_ttl(int cache_ttl, int evict_mode,
6566
size_t size, int max_entries);
67+
struct flb_hash *flb_hash_create_with_ttl_force_destroy(int cache_ttl, int evict_mode,
68+
size_t size, int max_entries);
6669
void flb_hash_destroy(struct flb_hash *ht);
6770

6871
int flb_hash_add(struct flb_hash *ht,

plugins/filter_aws/aws.c

+15-1
Original file line numberDiff line numberDiff line change
@@ -558,14 +558,22 @@ static int cb_aws_filter(const void *data, size_t bytes,
558558
ctx->availability_zone_len);
559559
}
560560

561-
if (ctx->instance_id_include) {
561+
if (ctx->instance_id_include && !ctx->enable_entity) {
562562
msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN);
563563
msgpack_pack_str_body(&tmp_pck,
564564
FLB_FILTER_AWS_INSTANCE_ID_KEY,
565565
FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN);
566566
msgpack_pack_str(&tmp_pck, ctx->instance_id_len);
567567
msgpack_pack_str_body(&tmp_pck,
568568
ctx->instance_id, ctx->instance_id_len);
569+
} else if (ctx->instance_id_include && ctx->enable_entity) {
570+
msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN);
571+
msgpack_pack_str_body(&tmp_pck,
572+
FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY,
573+
FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN);
574+
msgpack_pack_str(&tmp_pck, ctx->instance_id_len);
575+
msgpack_pack_str_body(&tmp_pck,
576+
ctx->instance_id, ctx->instance_id_len);
569577
}
570578

571579
if (ctx->instance_type_include) {
@@ -740,6 +748,12 @@ static struct flb_config_map config_map[] = {
740748
0, FLB_TRUE, offsetof(struct flb_filter_aws, hostname_include),
741749
"Enable EC2 instance hostname"
742750
},
751+
{
752+
FLB_CONFIG_MAP_BOOL, "enable_entity", "false",
753+
0, FLB_TRUE, offsetof(struct flb_filter_aws, enable_entity),
754+
"Enable entity prefix for fields used for constructing entity."
755+
"This currently only affects instance ID"
756+
},
743757
{0}
744758
};
745759

plugins/filter_aws/aws.h

+8
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
#define FLB_FILTER_AWS_AVAILABILITY_ZONE_KEY_LEN 2
5252
#define FLB_FILTER_AWS_INSTANCE_ID_KEY "ec2_instance_id"
5353
#define FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN 15
54+
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY "aws_entity_ec2_instance_id"
55+
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN 26
5456
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY "ec2_instance_type"
5557
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY_LEN 17
5658
#define FLB_FILTER_AWS_PRIVATE_IP_KEY "private_ip"
@@ -111,6 +113,12 @@ struct flb_filter_aws {
111113
size_t hostname_len;
112114
int hostname_include;
113115

116+
/*
117+
* Enable entity prefix appending. This appends
118+
* 'aws_entity' to relevant keys
119+
*/
120+
int enable_entity;
121+
114122
/* number of new keys added by this plugin */
115123
int new_keys;
116124

plugins/filter_kubernetes/kube_conf.c

+37
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
9999
ctx->api_https = FLB_FALSE;
100100
}
101101

102+
if (ctx->use_pod_association) {
103+
ctx->kubernetes_api_host = flb_strdup(FLB_API_HOST);
104+
ctx->kubernetes_api_port = FLB_API_PORT;
105+
}
106+
107+
102108
}
103109
else if (!url) {
104110
ctx->api_host = flb_strdup(FLB_API_HOST);
@@ -190,6 +196,12 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
190196
flb_plg_info(ctx->ins, "https=%i host=%s port=%i",
191197
ctx->api_https, ctx->api_host, ctx->api_port);
192198
}
199+
200+
201+
ctx->pod_hash_table = flb_hash_create_with_ttl_force_destroy(ctx->pod_service_map_ttl,
202+
FLB_HASH_EVICT_OLDER,
203+
FLB_HASH_TABLE_SIZE,
204+
FLB_HASH_TABLE_SIZE);
193205
return ctx;
194206
}
195207

@@ -203,6 +215,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
203215
flb_hash_destroy(ctx->hash_table);
204216
}
205217

218+
if (ctx->pod_hash_table) {
219+
flb_hash_destroy(ctx->pod_hash_table);
220+
}
221+
206222
if (ctx->merge_log == FLB_TRUE) {
207223
flb_free(ctx->unesc_buf);
208224
}
@@ -211,6 +227,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
211227
if (ctx->parser == NULL && ctx->regex) {
212228
flb_regex_destroy(ctx->regex);
213229
}
230+
if (ctx->deploymentRegex) {
231+
flb_regex_destroy(ctx->deploymentRegex);
232+
}
214233

215234
flb_free(ctx->api_host);
216235
flb_free(ctx->token);
@@ -222,6 +241,24 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
222241
flb_upstream_destroy(ctx->upstream);
223242
}
224243

244+
if(ctx->pod_association_tls) {
245+
flb_tls_destroy(ctx->pod_association_tls);
246+
}
247+
248+
if (ctx->pod_association_upstream) {
249+
flb_upstream_destroy(ctx->pod_association_upstream);
250+
}
251+
252+
if (ctx->kubernetes_upstream) {
253+
flb_upstream_destroy(ctx->kubernetes_upstream);
254+
}
255+
if (ctx->kubernetes_api_host) {
256+
flb_free(ctx->kubernetes_api_host);
257+
}
258+
if (ctx->platform) {
259+
flb_free(ctx->platform);
260+
}
261+
225262
#ifdef FLB_HAVE_TLS
226263
if (ctx->tls) {
227264
flb_tls_destroy(ctx->tls);

plugins/filter_kubernetes/kube_conf.h

+72
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,40 @@
6464
#define FLB_KUBE_TAG_PREFIX "kube.var.log.containers."
6565
#endif
6666

67+
/*
68+
* Maximum attribute length for Entity's KeyAttributes
69+
* values
70+
* https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html#:~:text=Maximum%20length%20of%201024.
71+
*/
72+
#define KEY_ATTRIBUTES_MAX_LEN 1024
73+
#define SERVICE_NAME_SOURCE_MAX_LEN 64
74+
75+
/*
76+
* Configmap used for verifying whether if FluentBit is
77+
* on EKS or native Kubernetes
78+
*/
79+
#define KUBE_SYSTEM_NAMESPACE "kube-system"
80+
#define AWS_AUTH_CONFIG_MAP "aws-auth"
81+
82+
/*
83+
* Possible platform values for Kubernetes plugin
84+
*/
85+
#define NATIVE_KUBERNETES_PLATFORM "k8s"
86+
#define EKS_PLATFORM "eks"
87+
6788
struct kube_meta;
6889

90+
struct service_attributes {
91+
char name[KEY_ATTRIBUTES_MAX_LEN];
92+
int name_len;
93+
char environment[KEY_ATTRIBUTES_MAX_LEN];
94+
int environment_len;
95+
char name_source[SERVICE_NAME_SOURCE_MAX_LEN];
96+
int name_source_len;
97+
int fields;
98+
99+
};
100+
69101
/* Filter context */
70102
struct flb_kube {
71103
/* Configuration parameters */
@@ -119,6 +151,7 @@ struct flb_kube {
119151

120152
/* Regex context to parse records */
121153
struct flb_regex *regex;
154+
struct flb_regex *deploymentRegex;
122155
struct flb_parser *parser;
123156

124157
/* TLS CA certificate file */
@@ -158,6 +191,45 @@ struct flb_kube {
158191

159192
int kube_meta_cache_ttl;
160193

194+
/* Configuration used for enabling pod to service name mapping*/
195+
int use_pod_association;
196+
char *pod_association_host;
197+
char *pod_association_endpoint;
198+
int pod_association_port;
199+
200+
/*
201+
* TTL is used to check how long should the mapped entry
202+
* remain in the hash table
203+
*/
204+
struct flb_hash *pod_hash_table;
205+
int pod_service_map_ttl;
206+
int pod_service_map_refresh_interval;
207+
flb_sds_t pod_service_preload_cache_path;
208+
struct flb_upstream *pod_association_upstream;
209+
/*
210+
* This connection is used for calling Kubernetes configmaps
211+
* endpoint so pod association can determine the environment.
212+
* Example: EKS or Native Kubernetes.
213+
*/
214+
char *kubernetes_api_host;
215+
int kubernetes_api_port;
216+
struct flb_upstream *kubernetes_upstream;
217+
char *platform;
218+
/*
219+
* This value is used for holding the platform config
220+
* value. Platform will be overriden with this variable
221+
* if it's set
222+
*/
223+
char *set_platform;
224+
225+
//Agent TLS certs
226+
struct flb_tls *pod_association_tls;
227+
char *pod_association_host_server_ca_file;
228+
char *pod_association_host_client_cert_file;
229+
char *pod_association_host_client_key_file;
230+
int pod_association_host_tls_debug;
231+
int pod_association_host_tls_verify;
232+
161233
struct flb_tls *tls;
162234

163235
struct flb_config *config;

0 commit comments

Comments
 (0)