This repository has been archived by the owner on Sep 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathkinesis.go
178 lines (160 loc) · 5.12 KB
/
kinesis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package main
import (
"crypto/sha256"
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
uuid "github.com/nu7hatch/gouuid"
)
const (
defaultRegion = "us-west-2"
chunkSize = 1048576 - 200
kinesaliteStreamName = "replay-zero-dev"
kinesaliteEndpoint = "https://localhost:4567"
)
func getRegion() string {
var region string
if os.Getenv("AWS_REGION") != "" {
region = os.Getenv("AWS_REGION")
} else {
region = defaultRegion
}
return region
}
type kinesisWrapper struct {
client kinesisiface.KinesisAPI
// allows different clients to log certain messages at different levels
// Ex. Regular Kinesis handler for sending HTTP data is "INFO" (log.Printf)
// Telemetry Kinesis handler is set at "DEBUG" (logDebug)
logger func(string, ...interface{})
}
func buildClient(streamName, streamRole string, logFunc func(string, ...interface{})) *kinesisWrapper {
if streamName == kinesaliteStreamName {
return buildKinesaliteClient(streamName)
}
return buildKinesisClient(streamName, streamRole, logFunc)
}
// Uses STS to assume an IAM role for credentials to write records
// to a real Kinesis stream in AWS
func buildKinesisClient(streamName, streamRole string, logFunc func(string, ...interface{})) *kinesisWrapper {
logFunc("Creating AWS Kinesis client (stream=%s)", streamName)
userSession := session.Must(session.NewSession(&aws.Config{
CredentialsChainVerboseErrors: aws.Bool(verboseCredentialErrors),
Region: aws.String(getRegion()),
}))
logFunc("Fetching temp credentials...")
kinesisTempCreds := stscreds.NewCredentials(userSession, streamRole)
logFunc("Success!")
kinesisHandle := kinesis.New(userSession, &aws.Config{
CredentialsChainVerboseErrors: aws.Bool(verboseCredentialErrors),
Credentials: kinesisTempCreds,
Region: aws.String(getRegion()),
})
sseEnabled, err := streamHasSSE(streamName, kinesisHandle)
if err != nil {
logFunc(err.Error())
}
if !sseEnabled {
logWarn(fmt.Sprintf("Kinesis stream '%s' does NOT have Server-Side Encryption (SSE) enabled", streamName))
}
return &kinesisWrapper{
client: kinesisHandle,
logger: logFunc,
}
}
func streamHasSSE(streamName string, client kinesisiface.KinesisAPI) (bool, error) {
streamInfo, err := client.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
})
if streamInfo == nil || streamInfo.StreamDescription == nil {
return false, fmt.Errorf("Could not determine if SSE is enabled for stream %s: %w", streamName, err)
}
return *streamInfo.StreamDescription.EncryptionType == kinesis.EncryptionTypeKms, err
}
// Kinesalite is a lightweight implementation of Kinesis
// useful for development scenarios.
// https://github.com/mhart/kinesalite
func buildKinesaliteClient(streamName string) *kinesisWrapper {
log.Printf("Creating local Kinesalite client")
log.Printf("Sending unverified traffic to stream endpoint=" + kinesaliteEndpoint)
kinesisHandle := kinesis.New(session.Must(session.NewSession()), &aws.Config{
Endpoint: aws.String(kinesaliteEndpoint),
Credentials: credentials.NewStaticCredentials("x", "x", "x"),
Region: aws.String(getRegion()),
HTTPClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true},
}},
})
return &kinesisWrapper{
client: kinesisHandle,
logger: log.Printf,
}
}
func chunkData(data string, size int) []string {
chunks := []string{}
for c := 0; c < len(data); c += size {
nextChunk := data[c:min(c+size, len(data))]
chunks = append(chunks, nextChunk)
}
return chunks
}
func min(i1, i2 int) int {
if i1 < i2 {
return i1
}
return i2
}
func buildMessages(line string) []EventChunk {
chunks := chunkData(line, chunkSize)
numChunks := len(chunks)
messages := []EventChunk{}
// Multiple chunks need a sort of "group ID"
eventUUID, err := uuid.NewV4()
var correlation string
if err != nil {
msg := fmt.Sprintf("UUID generation failed: %s\nFalling back to SHA1 of input string for chunk correlation", err)
logDebug(msg)
correlation = fmt.Sprintf("%x", sha256.Sum256([]byte(line)))
} else {
correlation = eventUUID.String()
}
for chunkID, chunk := range chunks {
nextMessage := EventChunk{
ChunkNumber: chunkID,
NumChunks: numChunks,
UUID: correlation,
Data: chunk,
}
messages = append(messages, nextMessage)
}
return messages
}
func (c *kinesisWrapper) sendToStream(message interface{}, stream string) error {
dataBytes, err := json.Marshal(message)
if err != nil {
return err
}
partition := "replay-partition-key-" + time.Now().String()
response, err := c.client.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String(stream),
Data: dataBytes,
PartitionKey: &partition,
})
if err != nil {
return err
}
c.logger("Successfully put record to stream=%s\n%+v\n", stream, response)
return nil
}