Skip to content

Commit c127c1c

Browse files
authored
Merge pull request #426 from target/S3Redundancy
Optional Redundancy logging to remote S3 location
2 parents cbc086a + 9704d8a commit c127c1c

File tree

12 files changed

+406
-16
lines changed

12 files changed

+406
-16
lines changed

build/go/frontend/Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ RUN mkdir /var/log/strelka/ && \
3030
chgrp -R 0 /var/log/strelka/ && \
3131
chmod -R g=u /var/log/strelka/
3232

33+
#Create blank strelka.log file to make sure watcher has something to start with
34+
RUN touch /var/log/strelka/strelka.log
35+
RUN chmod -R 777 /var/log/strelka/strelka.log
36+
3337
# Set container entrypoint. This could be set/overridden elsewhere in deployment (e.g. k8s, docker-compose, etc.)
3438
# Currently overwritten in ./build/docker-compose.yml
3539
ENTRYPOINT ["strelka-frontend", "-locallog=true", "-kafkalog=false"]

configs/go/frontend/frontend.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,10 @@ broker:
2020
keylocation: "path to key location"
2121
calocation: "path to target ca bundle"
2222
topic: "topic name here"
23+
s3redundancy: "Boolean to pipe logs to S3 if kafka connection interrupted"
24+
s3:
25+
accesskey: "S3 Access Key"
26+
secretkey: "S3 Secret Key"
27+
bucketName: "S3 bucket name"
28+
region: "Region that the S3 Bucket resides in"
29+
endpoint: "Endpoint that the S3 bucket refers to"

docs/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,11 @@ For the options below, only one response setting may be configured.
479479
* "broker.keylocation": File Path to key file to be used to authenticate to Kafka Topic (Optional)
480480
* "broker.calocation": File Path to CA Certificate bundle to be used to authenticate to Kafka Topic (Optional)
481481
* "broker.topic": Full topic name of the Kafka Topic to connect to (Optional)
482+
* "s3.accesskey": Access Key of the bucket to send redundancy files to (Optional)
483+
* "s3.secretkey": Secret Key of the bucket that you want send redundancy files to (Optional)
484+
* "s3.bucketName": Name of the bucket to send redundancy files to (Optional)
485+
* "s3.region": Region that the bucket to send redundancy files resides in (Optional)
486+
* "s3.endpoint": Endpoint of the bucket to send redundancy files to (Optional)
482487
483488
#### manager
484489
* "coordinator.addr": network address of the coordinator (defaults to strelka_coordinator_1:6379)
@@ -750,6 +755,15 @@ Currently this is toggled on and off in the Frontend Dockerfile, which is overwr
750755
751756
The Kafka Producer that is created with the abbove command line options is fully configurable, and placeholder fields have already been added to the frontend.yaml configuration file. This file will need to be updated in order to point to an existing Kafka Topic, as desired. In cases where some fields are not used (e.g when security has not been enable on the desired Kafka Topic, etc) then unused fields in the broker configuration section of the frontend.yaml file may simply be replaced with an empty string.
752757
758+
#### Optional: S3 Redundancy
759+
Dependant on a Kafka producer being created and a boolean in the Kafka config set to true, S3 redundancy can be toggled on in order to account for any issues with a Kafka connection. S3, in this case, is referring to either a AWS S3 bucket, or a Ceph Opensource Object Storage bucket.
760+
761+
Currently, if the option for S3 redundancy is toggled on, if the Kafka connection as desribed in the Kafka logging section of this document is interrupted, then, after the local log file is updated, the contents of that log file will be uploaded to the configureable S3 location. By default logs are kept for three hours after the start of the interuption of the Kafka connection, and, will rotate logs in S3 on the hour to maintain relevancy in the remote bucket location.
762+
763+
Once connection is re-established to the original Kafka broker, then the stored logs are sent in parallel to new logs to the Kafka broker. If a restart of the Frontend is required to reset the connection, then the logs will be sent to the Kafka Broker (if they are not stale) at the next start up.
764+
765+
This option is set to false by default.
766+
753767
## Scanners
754768
Each scanner parses files of a specific flavor and performs data collection and/or file extraction on them. Scanners are typically named after the type of file they are intended to scan (e.g. "ScanHtml", "ScanPe", "ScanRar") but may also be named after the type of function or tool they use to perform their tasks (e.g. "ScanExiftool", "ScanHeader", "ScanOcr").
755769

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ require (
1313
)
1414

1515
require (
16+
github.com/fsnotify/fsnotify v1.5.4 // indirect
17+
github.com/jmespath/go-jmespath v0.4.0 // indirect
18+
)
19+
20+
require (
21+
github.com/aws/aws-sdk-go v1.44.55
1622
github.com/cespare/xxhash/v2 v2.2.0 // indirect
1723
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
1824
golang.org/x/net v0.17.0 // indirect

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
663663
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
664664
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
665665
github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
666+
github.com/aws/aws-sdk-go v1.44.55 h1:h+p61sPEsLOpnQ2mKnGPrIe1MFUKwwA0X5eQYAcjOMU=
667+
github.com/aws/aws-sdk-go v1.44.55/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
666668
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
667669
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
668670
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -1180,6 +1182,9 @@ github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuT
11801182
github.com/jhump/protoreflect v1.14.1/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
11811183
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
11821184
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
1185+
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
1186+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
1187+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
11831188
github.com/joefitzgerald/rainbow-reporter v0.1.0/go.mod h1:481CNgqmVHQZzdIbN52CupLJyoVwB10FQ/IQlF1pdL8=
11841189
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
11851190
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=

src/go/cmd/strelka-frontend/main.go

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/sha256"
67
"encoding/json"
@@ -10,8 +11,11 @@ import (
1011
"io/ioutil"
1112
"log"
1213
"net"
14+
"os"
15+
"strconv"
1316
"time"
1417

18+
"github.com/fsnotify/fsnotify"
1519
"github.com/go-redis/redis/v8"
1620
"github.com/google/uuid"
1721
"google.golang.org/grpc"
@@ -22,6 +26,7 @@ import (
2226
"github.com/target/strelka/src/go/api/strelka"
2327
"github.com/target/strelka/src/go/pkg/rpc"
2428
"github.com/target/strelka/src/go/pkg/structs"
29+
tosss3 "github.com/target/strelka/src/go/pkg/tossS3"
2530

2631
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
2732
)
@@ -279,6 +284,13 @@ func main() {
279284
log.Fatalf("failed to listen: %v", err)
280285
}
281286

287+
//Check to see if redundancy toggled for Kafka Producer, defaults to false
288+
var boolS3 = false
289+
boolS3, err = strconv.ParseBool(conf.Broker.S3redundancy)
290+
if err != nil {
291+
log.Printf("failed to parse boolean for S3 Redundancy, setting to default (False). %v", err)
292+
}
293+
282294
responses := make(chan *strelka.ScanResponse, 100)
283295
defer close(responses)
284296
if conf.Response.Log != "" {
@@ -290,6 +302,9 @@ func main() {
290302
}
291303
if !*locallog && *kafkalog {
292304
log.Printf("Creating new Kafka producer.")
305+
306+
// Full kafka configuration documentation:
307+
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
293308
p, err := kafka.NewProducer(&kafka.ConfigMap{
294309
"bootstrap.servers": conf.Broker.Bootstrap,
295310
"security.protocol": conf.Broker.Protocol,
@@ -321,9 +336,8 @@ func main() {
321336
}
322337
}()
323338

324-
// Produce to Kafka from logs
339+
// Produce messages to topic (asynchronously)
325340
go func() {
326-
// Produce messages to topic (asynchronously)
327341
topic := conf.Broker.Topic
328342
for r := range responses {
329343
rawIn := json.RawMessage(r.Event)
@@ -346,6 +360,92 @@ func main() {
346360
}, nil)
347361
}
348362
}()
363+
364+
//Optional function to pipe to S3 if change detected in local log file
365+
if *&boolS3 {
366+
//Make watcher for seeing if strelka.log file has been changed
367+
watcher, err := fsnotify.NewWatcher()
368+
if err != nil {
369+
log.Fatal(err)
370+
}
371+
372+
defer watcher.Close()
373+
374+
//Watcher for making sure that logs go to S3 if Kafka fails
375+
err = watcher.Add("/var/log/strelka/strelka.log")
376+
if err != nil {
377+
log.Printf("An error occured adding watcher")
378+
log.Fatal(err)
379+
}
380+
381+
// Additional go function added to upload to S3 whenever change has been detected in strelka.log file.
382+
go func() {
383+
for {
384+
select {
385+
case event, ok := <-watcher.Events:
386+
if !ok {
387+
return
388+
}
389+
if event.Op&fsnotify.Write == fsnotify.Write {
390+
localLog, err := os.Open("/var/log/strelka/strelka.log") // For read access.
391+
if err != nil {
392+
log.Println("ERROR failed to open strelka.log for size verification:", err)
393+
}
394+
395+
logMetadata, err := localLog.Stat()
396+
if err != nil {
397+
log.Println("ERROR failed to retrieve strelka.log metadata:", err)
398+
}
399+
400+
//Make sure that the strelka.log file hasn't just been truncated before uploading
401+
if logMetadata.Size() != 0 {
402+
tosss3.UploadToS3(conf.S3.AccessKey, conf.S3.SecretKey, conf.S3.BucketName, conf.S3.Region, conf.S3.Endpoint)
403+
log.Println("Change to strelka.log file detected, upload to S3 in progress.")
404+
}
405+
}
406+
case err, ok := <-watcher.Errors:
407+
if !ok {
408+
return
409+
}
410+
log.Println("ERROR:", err)
411+
}
412+
}
413+
}()
414+
415+
// Produce messages to topic from logs
416+
go func() {
417+
topic := conf.Broker.Topic
418+
s3logs := tosss3.ListS3BucketContents(conf.S3.AccessKey, conf.S3.SecretKey, conf.S3.BucketName, conf.S3.Region, conf.S3.Endpoint)
419+
for _, item := range s3logs.Contents {
420+
// marshall the json message
421+
log.Println("item key is: " + *item.Key)
422+
var rawCurrData = tosss3.DownloadFromS3(conf.S3.AccessKey, conf.S3.SecretKey, conf.S3.BucketName, *item.Key, conf.S3.Region, conf.S3.Endpoint)
423+
for _, splitLog := range bytes.Split(rawCurrData, []byte("\n")) {
424+
rawIn := json.RawMessage(string(splitLog))
425+
bytesMess, err := rawIn.MarshalJSON()
426+
if err != nil {
427+
log.Printf("Unable to marshal byte encoded event for S3 log, check error message for more details: %v", err)
428+
}
429+
430+
p.Produce(&kafka.Message{
431+
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: -1},
432+
Value: bytesMess,
433+
Headers: []kafka.Header{
434+
{Key: "@timestamp", Value: []byte(time.Now().Format("2006-01-02T15:04:05-0700"))},
435+
},
436+
}, nil)
437+
}
438+
439+
}
440+
441+
//truncate strelka log file at the end of sending to Kafka
442+
log.Printf("Beginning to truncate local strelka log.")
443+
err := os.Truncate("/var/log/strelka/strelka.log", 0)
444+
if err != nil {
445+
log.Printf("Failed to truncate strelka.log file after sending messages to Kafka: %v", err)
446+
}
447+
}()
448+
}
349449
}
350450
} else if conf.Response.Report != 0 {
351451
go func() {

src/go/pkg/structs/structs.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ type ConfThroughput struct {
2828

2929
type ConfFiles struct {
3030
Patterns []string // required
31-
Mimetypes []string //optional
32-
Minsize int //optional
33-
Maxsize int //optional
34-
LimitPattern int //optional
35-
LimitTotal int //optional
36-
Modified int //optional
31+
Mimetypes []string // optional
32+
Minsize int // optional
33+
Maxsize int // optional
34+
LimitPattern int // optional
35+
LimitTotal int // optional
36+
Modified int // optional
3737
Delete bool // optional
3838
Gatekeeper bool // required
3939
Processed string // optional
@@ -48,20 +48,29 @@ type ConfCoordinator struct {
4848
}
4949

5050
type ConfKafka struct {
51-
Bootstrap string //required
52-
Protocol string //required
53-
Certlocation string //required
54-
Keylocation string //required
55-
Calocation string //required
56-
Topic string //required
51+
Bootstrap string // required
52+
Protocol string // required
53+
Certlocation string // required
54+
Keylocation string // required
55+
Calocation string // required
56+
Topic string // required
57+
S3redundancy string // optional, defaults to false
58+
}
59+
60+
type ConfS3 struct {
61+
AccessKey string // optional, can be left blank if S3redundancy set to false in ConfKafka
62+
SecretKey string // optional, can be left blank if S3redundancy set to false in ConfKafka
63+
BucketName string // optional, can be left blank if S3redundancy set to false in ConfKafka
64+
Region string // optional, can be left blank if S3redundancy set to false in ConfKafka
65+
Endpoint string // optional, can be left blank if S3redundancy set to false in ConfKafka
5766
}
5867

5968
type ConfGatekeeper struct {
6069
Addr string // required
6170
DB int // required
6271
Pool int // required
6372
Read time.Duration // required
64-
TTL time.Duration //required
73+
TTL time.Duration // required
6574
}
6675

6776
// determines what action the client takes with responses, defaults to discarding messages
@@ -95,7 +104,8 @@ type Frontend struct {
95104
Coordinator ConfCoordinator // required
96105
Gatekeeper ConfGatekeeper // required
97106
Response ConfResponse // optional
98-
Broker ConfKafka //required
107+
Broker ConfKafka // required
108+
S3 ConfS3 // optional
99109
}
100110

101111
type Manager struct {

src/go/pkg/tossS3/tossS3Delete.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package tosss3
2+
3+
import (
4+
"log"
5+
6+
"github.com/aws/aws-sdk-go/aws"
7+
"github.com/aws/aws-sdk-go/aws/credentials"
8+
"github.com/aws/aws-sdk-go/aws/session"
9+
"github.com/aws/aws-sdk-go/service/s3"
10+
)
11+
12+
func tossS3Delete(AccessKey string, AccessSecret string, myBucket string, filename string, region string, endpoint string) {
13+
14+
// Create a Session with a custom creds
15+
var awsConfig = &aws.Config{
16+
Region: aws.String(region),
17+
Endpoint: aws.String(endpoint),
18+
Credentials: credentials.NewStaticCredentials(string(AccessKey), string(AccessSecret), ""),
19+
}
20+
21+
// The session the S3 Uploader will use
22+
sess := session.Must(session.NewSession(awsConfig))
23+
24+
// Create S3 service client
25+
svc := s3.New(sess)
26+
27+
// Delete log from S3 now that it's been read
28+
_, err := svc.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String(string(myBucket)), Key: aws.String(filename)})
29+
if err != nil {
30+
log.Printf("Unable to delete object %q from bucket %q, %v", filename, myBucket, err)
31+
}
32+
33+
err = svc.WaitUntilObjectNotExists(&s3.HeadObjectInput{
34+
Bucket: aws.String(string(myBucket)),
35+
Key: aws.String(filename),
36+
})
37+
if err != nil {
38+
log.Printf("Failed to delete file from S3, %v", err)
39+
}
40+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package tosss3
2+
3+
import (
4+
"log"
5+
6+
"github.com/aws/aws-sdk-go/aws"
7+
"github.com/aws/aws-sdk-go/aws/credentials"
8+
"github.com/aws/aws-sdk-go/aws/session"
9+
"github.com/aws/aws-sdk-go/service/s3"
10+
"github.com/aws/aws-sdk-go/service/s3/s3manager"
11+
)
12+
13+
func DownloadFromS3(AccessKey string, AccessSecret string, myBucket string, filename string, region string, endpoint string) []byte {
14+
15+
// Create a Session with a custom creds
16+
var awsConfig = &aws.Config{
17+
Region: aws.String(region),
18+
Endpoint: aws.String(endpoint),
19+
Credentials: credentials.NewStaticCredentials(string(AccessKey), string(AccessSecret), ""),
20+
}
21+
22+
// The session the S3 Uploader will use
23+
sess := session.Must(session.NewSession(awsConfig))
24+
25+
// Create downloader in order to retrieve log files (Should hopefully only be one)
26+
downloader := s3manager.NewDownloader(sess)
27+
28+
// Prune out old logs before downloading to reduce time to catch up
29+
tossS3PruneLogs(AccessKey, AccessSecret, myBucket, region, endpoint)
30+
31+
buff := &aws.WriteAtBuffer{}
32+
33+
// Iterate through buckets, download to buffer
34+
logFileFromS3, err := downloader.Download(buff, &s3.GetObjectInput{
35+
Bucket: aws.String(string(myBucket)),
36+
Key: aws.String(filename),
37+
})
38+
if err != nil {
39+
log.Printf("failed to download file, %v", err)
40+
}
41+
log.Printf("Persistance log downloaded from S3, %d bytes\n", logFileFromS3)
42+
43+
tossS3Delete(AccessKey, AccessSecret, myBucket, filename, region, endpoint)
44+
45+
return buff.Bytes()
46+
}

0 commit comments

Comments
 (0)