Skip to content

Commit 41f9856

Browse files
committed
merge main
2 parents a591d3f + b1947ec commit 41f9856

File tree

17 files changed

+259
-200
lines changed

17 files changed

+259
-200
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ require (
8181
go.uber.org/multierr v1.11.0
8282
go.uber.org/zap v1.27.0
8383
golang.org/x/mod v0.29.0
84+
golang.org/x/sync v0.17.0
8485
google.golang.org/protobuf v1.36.10
8586
)
8687

@@ -352,7 +353,6 @@ require (
352353
golang.org/x/arch v0.20.0 // indirect
353354
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect
354355
golang.org/x/oauth2 v0.32.0 // indirect
355-
golang.org/x/sync v0.17.0 // indirect
356356
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect
357357
golang.org/x/time v0.14.0 // indirect
358358
golang.org/x/tools v0.38.0 // indirect

internal/collector/otel_collector_plugin.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ func (oc *Collector) updateNginxAppProtectTcplogReceivers(
613613
oc.config.Collector.Receivers.TcplogReceivers = make(map[string]*config.TcplogReceiver)
614614
}
615615

616-
napSysLogServer := oc.findAvailableSyslogServers(ctx, nginxConfigContext.NAPSysLogServers)
616+
napSysLogServer := oc.findAvailableSyslogServer(ctx, nginxConfigContext.NAPSysLogServer)
617617

618618
if napSysLogServer != "" {
619619
if !oc.doesTcplogReceiverAlreadyExist(napSysLogServer) {
@@ -746,40 +746,29 @@ func (oc *Collector) updateResourceAttributes(
746746
return actionUpdated
747747
}
748748

749-
func (oc *Collector) findAvailableSyslogServers(ctx context.Context, napSyslogServers []string) string {
750-
napSyslogServersMap := make(map[string]bool)
751-
for _, server := range napSyslogServers {
752-
napSyslogServersMap[server] = true
753-
}
754-
755-
if oc.previousNAPSysLogServer != "" {
756-
if _, ok := napSyslogServersMap[oc.previousNAPSysLogServer]; ok {
757-
return oc.previousNAPSysLogServer
758-
}
749+
func (oc *Collector) findAvailableSyslogServer(ctx context.Context, napSyslogServer string) string {
750+
if oc.previousNAPSysLogServer != "" &&
751+
normaliseAddress(oc.previousNAPSysLogServer) == normaliseAddress(napSyslogServer) {
752+
return napSyslogServer
759753
}
760754

761-
for _, napSyslogServer := range napSyslogServers {
762-
listenConfig := &net.ListenConfig{}
763-
ln, err := listenConfig.Listen(ctx, "tcp", napSyslogServer)
764-
if err != nil {
765-
slog.DebugContext(ctx, "NAP syslog server is not reachable", "address", napSyslogServer,
766-
"error", err)
767-
768-
continue
769-
}
770-
closeError := ln.Close()
771-
if closeError != nil {
772-
slog.DebugContext(ctx, "Failed to close syslog server", "address", napSyslogServer, "error", closeError)
773-
}
774-
775-
slog.DebugContext(ctx, "Found valid NAP syslog server", "address", napSyslogServer)
755+
listenConfig := &net.ListenConfig{}
756+
ln, err := listenConfig.Listen(ctx, "tcp", napSyslogServer)
757+
if err != nil {
758+
slog.DebugContext(ctx, "NAP syslog server is not reachable", "address", napSyslogServer,
759+
"error", err)
776760

777-
oc.previousNAPSysLogServer = napSyslogServer
761+
return ""
762+
}
778763

779-
return napSyslogServer
764+
closeError := ln.Close()
765+
if closeError != nil {
766+
slog.DebugContext(ctx, "Failed to close syslog server", "address", napSyslogServer, "error", closeError)
780767
}
781768

782-
return ""
769+
oc.previousNAPSysLogServer = napSyslogServer
770+
771+
return napSyslogServer
783772
}
784773

785774
func isOSSReceiverChanged(nginxReceiver config.NginxReceiver, nginxConfigContext *model.NginxConfigContext) bool {
@@ -865,3 +854,16 @@ func setProxyWithBasicAuth(ctx context.Context, proxy *config.Proxy, parsedProxy
865854
proxyURL := parsedProxyURL.String()
866855
setProxyEnvs(ctx, proxyURL, "Setting Proxy with basic auth")
867856
}
857+
858+
func normaliseAddress(address string) string {
859+
host, port, err := net.SplitHostPort(address)
860+
if err != nil {
861+
return address
862+
}
863+
864+
if host == "localhost" {
865+
host = "127.0.0.1"
866+
}
867+
868+
return net.JoinHostPort(host, port)
869+
}

internal/collector/otel_collector_plugin_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) {
753753
require.NoError(t, err)
754754

755755
nginxConfigContext := &model.NginxConfigContext{
756-
NAPSysLogServers: []string{"localhost:15632"},
756+
NAPSysLogServer: "localhost:15632",
757757
}
758758

759759
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
@@ -786,7 +786,7 @@ func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) {
786786
t.Run("Test 4: NewCollector tcplogReceiver added and deleted another", func(tt *testing.T) {
787787
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(ctx,
788788
&model.NginxConfigContext{
789-
NAPSysLogServers: []string{"localhost:1555"},
789+
NAPSysLogServer: "localhost:1555",
790790
},
791791
)
792792

@@ -939,49 +939,49 @@ func TestCollector_findAvailableSyslogServers(t *testing.T) {
939939
name string
940940
expectedSyslogServer string
941941
previousNAPSysLogServer string
942-
syslogServers []string
942+
syslogServers string
943943
portInUse bool
944944
}{
945945
{
946946
name: "Test 1: port available",
947947
expectedSyslogServer: "localhost:15632",
948948
previousNAPSysLogServer: "",
949-
syslogServers: []string{"localhost:15632"},
949+
syslogServers: "localhost:15632",
950950
portInUse: false,
951951
},
952952
{
953953
name: "Test 2: port in use",
954954
expectedSyslogServer: "",
955955
previousNAPSysLogServer: "",
956-
syslogServers: []string{"localhost:15632"},
956+
syslogServers: "localhost:15632",
957957
portInUse: true,
958958
},
959959
{
960960
name: "Test 3: syslog server already configured",
961961
expectedSyslogServer: "localhost:15632",
962962
previousNAPSysLogServer: "localhost:15632",
963-
syslogServers: []string{"localhost:15632"},
963+
syslogServers: "localhost:15632",
964964
portInUse: false,
965965
},
966966
{
967967
name: "Test 4: new syslog server",
968968
expectedSyslogServer: "localhost:15632",
969969
previousNAPSysLogServer: "localhost:1122",
970-
syslogServers: []string{"localhost:15632"},
970+
syslogServers: "localhost:15632",
971971
portInUse: false,
972972
},
973973
{
974-
name: "Test 5: port in use find next server",
974+
name: "Test 6: port hasn't changed",
975975
expectedSyslogServer: "localhost:1122",
976-
previousNAPSysLogServer: "",
977-
syslogServers: []string{"localhost:15632", "localhost:1122"},
976+
previousNAPSysLogServer: "localhost:1122",
977+
syslogServers: "localhost:1122",
978978
portInUse: true,
979979
},
980980
{
981-
name: "Test 6: port hasn't changed",
981+
name: "Test 7: port hasn't changed, but is now localhost",
982982
expectedSyslogServer: "localhost:1122",
983-
previousNAPSysLogServer: "localhost:1122",
984-
syslogServers: []string{"localhost:1122"},
983+
previousNAPSysLogServer: "127.0.0.1:1122",
984+
syslogServers: "localhost:1122",
985985
portInUse: true,
986986
},
987987
}
@@ -994,12 +994,12 @@ func TestCollector_findAvailableSyslogServers(t *testing.T) {
994994

995995
if test.portInUse {
996996
listenConfig := &net.ListenConfig{}
997-
ln, listenError := listenConfig.Listen(ctx, "tcp", "localhost:15632")
997+
ln, listenError := listenConfig.Listen(ctx, "tcp", test.syslogServers)
998998
require.NoError(t, listenError)
999999
defer ln.Close()
10001000
}
10011001

1002-
actual := collector.findAvailableSyslogServers(ctx, test.syslogServers)
1002+
actual := collector.findAvailableSyslogServer(ctx, test.syslogServers)
10031003
assert.Equal(tt, test.expectedSyslogServer, actual)
10041004
})
10051005
}

internal/config/config.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func ResolveConfig() (*Config, error) {
157157
Features: viperInstance.GetStringSlice(FeaturesKey),
158158
Labels: resolveLabels(),
159159
LibDir: viperInstance.GetString(LibDirPathKey),
160+
SyslogServer: resolveSyslogServer(),
160161
}
161162

162163
defaultCollector(collector, config)
@@ -462,6 +463,12 @@ func registerFlags() {
462463
"A comma-separated list of features enabled for the agent.",
463464
)
464465

466+
fs.String(
467+
SyslogServerPort,
468+
DefSyslogServerPort,
469+
"The port Agent will start the syslog server on for logs collection",
470+
)
471+
465472
registerCommonFlags(fs)
466473
registerCommandFlags(fs)
467474
registerAuxiliaryCommandFlags(fs)
@@ -627,6 +634,12 @@ func registerClientFlags(fs *flag.FlagSet) {
627634
DefResponseTimeout,
628635
"Duration to wait for a response before retrying request",
629636
)
637+
638+
fs.Int(
639+
ClientGRPCMaxParallelFileOperationsKey,
640+
DefMaxParallelFileOperations,
641+
"Maximum number of file downloads or uploads performed in parallel",
642+
)
630643
}
631644

632645
func registerCommandFlags(fs *flag.FlagSet) {
@@ -953,6 +966,12 @@ func resolveLog() *Log {
953966
}
954967
}
955968

969+
func resolveSyslogServer() *SyslogServer {
970+
return &SyslogServer{
971+
Port: viperInstance.GetString(SyslogServerPort),
972+
}
973+
}
974+
956975
func resolveLabels() map[string]interface{} {
957976
input := viperInstance.GetStringMapString(LabelsRootKey)
958977

@@ -1093,12 +1112,13 @@ func resolveClient() *Client {
10931112
Time: viperInstance.GetDuration(ClientKeepAliveTimeKey),
10941113
PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey),
10951114
},
1096-
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1097-
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1098-
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1099-
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1100-
ResponseTimeout: viperInstance.GetDuration(ClientGRPCResponseTimeoutKey),
1101-
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1115+
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1116+
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1117+
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1118+
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1119+
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1120+
ResponseTimeout: viperInstance.GetDuration(ClientGRPCResponseTimeoutKey),
1121+
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
11021122
},
11031123
Backoff: &BackOff{
11041124
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),

internal/config/config_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,9 @@ func createConfig() *Config {
11651165
Level: "debug",
11661166
Path: "./test-path",
11671167
},
1168+
SyslogServer: &SyslogServer{
1169+
Port: "1512",
1170+
},
11681171
Client: &Client{
11691172
HTTP: &HTTP{
11701173
Timeout: 15 * time.Second,
@@ -1175,12 +1178,13 @@ func createConfig() *Config {
11751178
Time: 10 * time.Second,
11761179
PermitWithoutStream: false,
11771180
},
1178-
MaxMessageSize: 1048575,
1179-
MaxMessageReceiveSize: 1048575,
1180-
MaxMessageSendSize: 1048575,
1181-
MaxFileSize: 485753,
1182-
FileChunkSize: 48575,
1183-
ResponseTimeout: 30 * time.Second,
1181+
MaxMessageSize: 1048575,
1182+
MaxMessageReceiveSize: 1048575,
1183+
MaxMessageSendSize: 1048575,
1184+
MaxFileSize: 485753,
1185+
FileChunkSize: 48575,
1186+
MaxParallelFileOperations: 10,
1187+
ResponseTimeout: 30 * time.Second,
11841188
},
11851189
Backoff: &BackOff{
11861190
InitialInterval: 200 * time.Millisecond,

internal/config/defaults.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const (
2323
DefNginxReloadBackoffMaxInterval = 3 * time.Second
2424
DefNginxReloadBackoffMaxElapsedTime = 10 * time.Second
2525

26+
DefSyslogServerPort = "1514"
27+
2628
DefCommandServerHostKey = ""
2729
DefCommandServerPortKey = 0
2830
DefCommandServerTypeKey = "grpc"
@@ -58,12 +60,13 @@ const (
5860
DefAuxiliaryCommandTLServerNameKey = ""
5961

6062
// Client GRPC Settings
61-
DefMaxMessageSize = 0 // 0 = unset
62-
DefMaxMessageRecieveSize = 4194304 // default 4 MB
63-
DefMaxMessageSendSize = 4194304 // default 4 MB
64-
DefMaxFileSize uint32 = 1048576 // 1MB
65-
DefFileChunkSize uint32 = 524288 // 0.5MB
66-
DefResponseTimeout = 10 * time.Second
63+
DefMaxMessageSize = 0 // 0 = unset
64+
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65+
DefMaxMessageSendSize = 4194304 // default 4 MB
66+
DefMaxFileSize uint32 = 1048576 // 1MB
67+
DefFileChunkSize uint32 = 524288 // 0.5MB
68+
DefMaxParallelFileOperations = 5
69+
DefResponseTimeout = 10 * time.Second
6770

6871
// Client HTTP Settings
6972
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ var (
3434
ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time"
3535
ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout"
3636

37-
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
38-
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
39-
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
40-
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
41-
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
42-
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
43-
ClientGRPCResponseTimeoutKey = pre(ClientRootKey) + "grpc_response_timeout"
37+
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
38+
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
39+
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
40+
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
41+
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
42+
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
43+
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"
44+
ClientGRPCResponseTimeoutKey = pre(ClientRootKey) + "grpc_response_timeout"
4445

4546
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
4647
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"
@@ -137,6 +138,8 @@ var (
137138
NginxExcludeLogsKey = pre(DataPlaneConfigRootKey, "nginx") + "exclude_logs"
138139
NginxApiTlsCa = pre(DataPlaneConfigRootKey, "nginx") + "api_tls_ca"
139140

141+
SyslogServerPort = pre("syslog_server") + "port"
142+
140143
FileWatcherMonitoringFrequencyKey = pre(FileWatcherKey) + "monitoring_frequency"
141144
NginxExcludeFilesKey = pre(FileWatcherKey) + "exclude_files"
142145
)

internal/config/testdata/nginx-agent.conf

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ features:
2323
- metrics
2424
- api-action
2525
- logs-nap
26-
26+
27+
28+
syslog_server:
29+
port: 1512
30+
2731
data_plane_config:
2832
nginx:
2933
reload_monitoring_period: 30s
@@ -51,6 +55,7 @@ client:
5155
max_file_size: 485753
5256
response_timeout: 30s
5357
file_chunk_size: 48575
58+
max_parallel_file_operations: 10
5459
backoff:
5560
initial_interval: 200ms
5661
max_interval: 10s

0 commit comments

Comments
 (0)