Skip to content

Commit 18b3662

Browse files
committed
enable fargate credentials
Signed-off-by: Tim Miller <[email protected]>
1 parent 6a6369f commit 18b3662

File tree

17 files changed

+1873
-780
lines changed

17 files changed

+1873
-780
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,20 @@ export IMMUDB_S3_ENDPOINT="https://${IMMUDB_S3_BUCKET_NAME}.s3.${IMMUDB_S3_LOCAT
184184
./immudb
185185
```
186186

187+
If using Fargate, the credentials URL can be sourced automatically:
188+
189+
```bash
190+
export IMMUDB_S3_STORAGE=true
191+
export IMMUDB_S3_ROLE_ENABLED=true
192+
export IMMUDB_S3_USE_FARGATE_CREDENTIALS=true
193+
export IMMUDB_S3_BUCKET_NAME=<BUCKET NAME>
194+
export IMMUDB_S3_LOCATION=<AWS S3 REGION>
195+
export IMMUDB_S3_PATH_PREFIX=testing-001
196+
export IMMUDB_S3_ENDPOINT="https://${IMMUDB_S3_BUCKET_NAME}.s3.${IMMUDB_S3_LOCATION}.amazonaws.com"
197+
198+
./immudb
199+
```
200+
187201
Optionally, you can specify the exact role immudb should be using with:
188202

189203
```bash

cmd/immudb/command/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
8989
cmd.Flags().String("s3-path-prefix", "", "s3 path prefix (multiple immudb instances can share the same bucket if they have different prefixes)")
9090
cmd.Flags().Bool("s3-external-identifier", false, "use the remote identifier if there is no local identifier")
9191
cmd.Flags().String("s3-instance-metadata-url", "http://169.254.169.254", "s3 instance metadata url")
92+
cmd.Flags().String("s3-use-fargate-credentials", "false", "use fargate credentials for s3 authentication: true/false")
9293
cmd.Flags().Int("max-sessions", 100, "maximum number of simultaneously opened sessions")
9394
cmd.Flags().Duration("max-session-inactivity-time", 3*time.Minute, "max session inactivity time is a duration after which an active session is declared inactive by the server. A session is kept active if server is still receiving requests from client (keep-alive or other methods)")
9495
cmd.Flags().Duration("max-session-age-time", 0, "the current default value is infinity. max session age time is a duration after which session will be forcibly closed")

cmd/immudb/command/parse_options.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func parseOptions() (options *server.Options, err error) {
106106
s3PathPrefix := viper.GetString("s3-path-prefix")
107107
s3ExternalIdentifier := viper.GetBool("s3-external-identifier")
108108
s3MetadataURL := viper.GetString("s3-instance-metadata-url")
109+
s3UseFargateCredentials := viper.GetBool("s3-use-fargate-credentials")
109110

110111
remoteStorageOptions := server.DefaultRemoteStorageOptions().
111112
WithS3Storage(s3Storage).
@@ -118,7 +119,8 @@ func parseOptions() (options *server.Options, err error) {
118119
WithS3Location(s3Location).
119120
WithS3PathPrefix(s3PathPrefix).
120121
WithS3ExternalIdentifier(s3ExternalIdentifier).
121-
WithS3InstanceMetadataURL(s3MetadataURL)
122+
WithS3InstanceMetadataURL(s3MetadataURL).
123+
WithS3UseFargateCredentials(s3UseFargateCredentials)
122124

123125
sessionOptions := sessions.DefaultOptions().
124126
WithMaxSessions(viper.GetInt("max-sessions")).

embedded/remotestorage/s3/s3.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type Storage struct {
5656

5757
awsInstanceMetadataURL string
5858
awsCredsRefreshPeriod time.Duration
59+
60+
useFargateCredentials bool
5961
}
6062

6163
var (
@@ -99,6 +101,7 @@ func Open(
99101
location string,
100102
prefix string,
101103
awsInstanceMetadataURL string,
104+
useFargateCredentials bool,
102105
) (remotestorage.Storage, error) {
103106

104107
// Endpoint must always end with '/'
@@ -137,6 +140,7 @@ func Open(
137140
},
138141
awsInstanceMetadataURL: awsInstanceMetadataURL,
139142
awsCredsRefreshPeriod: time.Minute,
143+
useFargateCredentials: useFargateCredentials,
140144
}
141145

142146
err := s3storage.getRoleCredentials()
@@ -807,6 +811,45 @@ func (s *Storage) getRoleCredentials() error {
807811
}
808812

809813
func (s *Storage) requestCredentials() (string, string, string, error) {
814+
if s.useFargateCredentials {
815+
// Use Fargate credentials
816+
817+
const fargateMetadataEndpoint = "http://169.254.170.2"
818+
819+
fargateCredentialsRelativeURI := os.Getenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI")
820+
if fargateCredentialsRelativeURI == "" {
821+
return "", "", "", errors.New("environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set or empty")
822+
}
823+
fargateCredentialsURL := fargateMetadataEndpoint + fargateCredentialsRelativeURI
824+
825+
fargateReq, err := http.NewRequest("GET", fargateCredentialsURL, nil)
826+
if err != nil {
827+
return "", "", "", errors.New("cannot form fargate credentials request")
828+
}
829+
830+
fargateResp, err := http.DefaultClient.Do(fargateReq)
831+
if err != nil {
832+
return "", "", "", errors.New("cannot get fargate credentials")
833+
}
834+
defer fargateResp.Body.Close()
835+
836+
creds, err := io.ReadAll(fargateResp.Body)
837+
if err != nil {
838+
return "", "", "", errors.New("cannot read fargate credentials")
839+
}
840+
841+
var credentials struct {
842+
AccessKeyID string `json:"AccessKeyId"`
843+
SecretAccessKey string `json:"SecretAccessKey"`
844+
SessionToken string `json:"Token"`
845+
}
846+
if err := json.Unmarshal(creds, &credentials); err != nil {
847+
return "", "", "", errors.New("cannot parse fargate credentials")
848+
}
849+
850+
return credentials.AccessKeyID, credentials.SecretAccessKey, credentials.SessionToken, nil
851+
}
852+
810853
tokenReq, err := http.NewRequest("PUT", fmt.Sprintf("%s%s",
811854
s.awsInstanceMetadataURL,
812855
"/latest/api/token",

embedded/remotestorage/s3/s3_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func TestOpen(t *testing.T) {
4141
"",
4242
"prefix",
4343
"",
44+
false,
4445
)
4546
require.NoError(t, err)
4647
require.NotNil(t, s)
@@ -90,6 +91,7 @@ func TestCornerCases(t *testing.T) {
9091
"",
9192
"",
9293
"",
94+
false,
9395
)
9496
require.ErrorIs(t, err, ErrInvalidArguments)
9597
require.ErrorIs(t, err, ErrInvalidArgumentsBucketEmpty)
@@ -107,6 +109,7 @@ func TestCornerCases(t *testing.T) {
107109
"",
108110
"",
109111
"",
112+
false,
110113
)
111114
require.ErrorIs(t, err, ErrInvalidArguments)
112115
require.ErrorIs(t, err, ErrInvalidArgumentsBucketSlash)
@@ -124,6 +127,7 @@ func TestCornerCases(t *testing.T) {
124127
"",
125128
"",
126129
"",
130+
false,
127131
)
128132
require.NoError(t, err)
129133
require.Equal(t, "", s.(*Storage).prefix)
@@ -138,6 +142,7 @@ func TestCornerCases(t *testing.T) {
138142
"",
139143
"/test/",
140144
"",
145+
false,
141146
)
142147
require.NoError(t, err)
143148
require.Equal(t, "test/", s.(*Storage).prefix)
@@ -152,6 +157,7 @@ func TestCornerCases(t *testing.T) {
152157
"",
153158
"/test",
154159
"",
160+
false,
155161
)
156162
require.NoError(t, err)
157163
require.Equal(t, "test/", s.(*Storage).prefix)
@@ -168,6 +174,7 @@ func TestCornerCases(t *testing.T) {
168174
"",
169175
"",
170176
"",
177+
false,
171178
)
172179
require.NoError(t, err)
173180
require.Equal(t, "s3(misconfigured)", s.String())
@@ -184,6 +191,7 @@ func TestCornerCases(t *testing.T) {
184191
"",
185192
"",
186193
"",
194+
false,
187195
)
188196
require.NoError(t, err)
189197

@@ -211,6 +219,7 @@ func TestCornerCases(t *testing.T) {
211219
"",
212220
"",
213221
"",
222+
false,
214223
)
215224
require.NoError(t, err)
216225

@@ -230,6 +239,7 @@ func TestCornerCases(t *testing.T) {
230239
"",
231240
"",
232241
"",
242+
false,
233243
)
234244
require.Error(t, err)
235245
require.Nil(t, s)
@@ -246,6 +256,7 @@ func TestCornerCases(t *testing.T) {
246256
"",
247257
"",
248258
"",
259+
false,
249260
)
250261
require.NoError(t, err)
251262

@@ -300,6 +311,7 @@ func TestSignatureV4(t *testing.T) {
300311
"us-east-1",
301312
"",
302313
"",
314+
false,
303315
)
304316
require.NoError(t, err)
305317

embedded/remotestorage/s3/s3_with_minio_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func TestS3WithServer(t *testing.T) {
4545
"",
4646
fmt.Sprintf("prefix_%x", randomBytes),
4747
"",
48+
false,
4849
)
4950
require.NoError(t, err)
5051

embedded/sql/engine_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3202,6 +3202,69 @@ func TestQuery(t *testing.T) {
32023202
})
32033203
}
32043204

3205+
func TestExtractFromTimestamp(t *testing.T) {
3206+
st, err := store.Open(t.TempDir(), store.DefaultOptions().WithMultiIndexing(true))
3207+
require.NoError(t, err)
3208+
defer closeStore(t, st)
3209+
3210+
engine, err := NewEngine(st, DefaultOptions().WithPrefix(sqlPrefix))
3211+
require.NoError(t, err)
3212+
3213+
t.Run("extract from constant expressions", func(t *testing.T) {
3214+
assertQueryShouldProduceResults(
3215+
t,
3216+
engine,
3217+
`SELECT
3218+
EXTRACT(YEAR FROM '2020-01-15'),
3219+
EXTRACT(MONTH FROM '2020-01-15'),
3220+
EXTRACT(DAY FROM '2020-01-15'::TIMESTAMP),
3221+
EXTRACT(HOUR FROM '2020-01-15 12:30:24'),
3222+
EXTRACT(MINUTE FROM '2020-01-15 12:30:24'),
3223+
EXTRACT(SECOND FROM '2020-01-15 12:30:24'::TIMESTAMP)
3224+
`,
3225+
`SELECT * FROM (
3226+
VALUES (2020, 01, 15, 12, 30, 24)
3227+
)`,
3228+
)
3229+
})
3230+
3231+
t.Run("extract from table", func(t *testing.T) {
3232+
_, _, err := engine.Exec(
3233+
context.Background(),
3234+
nil,
3235+
`CREATE TABLE events(ts TIMESTAMP PRIMARY KEY);
3236+
3237+
INSERT INTO events(ts) VALUES
3238+
('2021-07-04 14:45:30'::TIMESTAMP),
3239+
('1999-12-31 23:59:59'::TIMESTAMP);
3240+
`,
3241+
nil,
3242+
)
3243+
require.NoError(t, err)
3244+
3245+
assertQueryShouldProduceResults(
3246+
t,
3247+
engine,
3248+
`SELECT
3249+
EXTRACT(YEAR FROM ts),
3250+
EXTRACT(MONTH FROM ts),
3251+
EXTRACT(DAY FROM ts),
3252+
EXTRACT(HOUR FROM ts),
3253+
EXTRACT(MINUTE FROM ts),
3254+
EXTRACT(SECOND FROM ts)
3255+
FROM events
3256+
ORDER BY ts
3257+
`,
3258+
`SELECT * FROM (
3259+
VALUES
3260+
(1999, 12, 31, 23, 59, 59),
3261+
(2021, 07, 04, 14, 45, 30)
3262+
)`,
3263+
)
3264+
})
3265+
3266+
}
3267+
32053268
func TestJSON(t *testing.T) {
32063269
opts := store.DefaultOptions().WithMultiIndexing(true)
32073270
opts.WithIndexOptions(opts.IndexOpts.WithMaxActiveSnapshots(1))
@@ -9450,6 +9513,51 @@ func TestFunctions(t *testing.T) {
94509513
)
94519514
require.NoError(t, err)
94529515

9516+
t.Run("coalesce", func(t *testing.T) {
9517+
type testCase struct {
9518+
query string
9519+
expectedValues string
9520+
err error
9521+
}
9522+
9523+
cases := []testCase{
9524+
{
9525+
query: "SELECT COALESCE (NULL)",
9526+
expectedValues: "NULL",
9527+
},
9528+
{
9529+
query: "SELECT COALESCE (NULL, NULL)",
9530+
expectedValues: "NULL",
9531+
},
9532+
{
9533+
query: "SELECT COALESCE(NULL, 1, 1.5, 3)",
9534+
expectedValues: "1",
9535+
},
9536+
{
9537+
query: "SELECT COALESCE('one', 'two', 'three')",
9538+
expectedValues: "'one'",
9539+
},
9540+
{
9541+
query: "SELECT COALESCE(1, 'test')",
9542+
err: ErrInvalidTypes,
9543+
},
9544+
}
9545+
9546+
for _, tc := range cases {
9547+
if tc.err != nil {
9548+
_, err := engine.queryAll(context.Background(), nil, tc.query, nil)
9549+
require.ErrorIs(t, err, tc.err)
9550+
continue
9551+
}
9552+
9553+
assertQueryShouldProduceResults(
9554+
t,
9555+
engine,
9556+
tc.query,
9557+
fmt.Sprintf("SELECT * FROM (VALUES (%s))", tc.expectedValues))
9558+
}
9559+
})
9560+
94539561
t.Run("timestamp functions", func(t *testing.T) {
94549562
_, err := engine.queryAll(context.Background(), nil, "SELECT NOW(1) FROM mytable", nil)
94559563
require.ErrorIs(t, err, ErrIllegalArguments)

embedded/sql/functions.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
)
2626

2727
const (
28+
CoalesceFnCall string = "COALESCE"
2829
LengthFnCall string = "LENGTH"
2930
SubstringFnCall string = "SUBSTRING"
3031
ConcatFnCall string = "CONCAT"
@@ -47,6 +48,7 @@ const (
4748
)
4849

4950
var builtinFunctions = map[string]Function{
51+
CoalesceFnCall: &CoalesceFn{},
5052
LengthFnCall: &LengthFn{},
5153
SubstringFnCall: &SubstringFn{},
5254
ConcatFnCall: &ConcatFn{},
@@ -67,6 +69,37 @@ type Function interface {
6769
Apply(tx *SQLTx, params []TypedValue) (TypedValue, error)
6870
}
6971

72+
type CoalesceFn struct{}
73+
74+
func (f *CoalesceFn) InferType(cols map[string]ColDescriptor, params map[string]SQLValueType, implicitTable string) (SQLValueType, error) {
75+
return AnyType, nil
76+
}
77+
78+
func (f *CoalesceFn) RequiresType(t SQLValueType, cols map[string]ColDescriptor, params map[string]SQLValueType, implicitTable string) error {
79+
return nil
80+
}
81+
82+
func (f *CoalesceFn) Apply(tx *SQLTx, params []TypedValue) (TypedValue, error) {
83+
t := AnyType
84+
85+
for _, p := range params {
86+
if !p.IsNull() {
87+
if t == AnyType {
88+
t = p.Type()
89+
} else if p.Type() != t && !(IsNumericType(t) && IsNumericType(p.Type())) {
90+
return nil, fmt.Errorf("coalesce: %w", ErrInvalidTypes)
91+
}
92+
}
93+
}
94+
95+
for _, p := range params {
96+
if !p.IsNull() {
97+
return p, nil
98+
}
99+
}
100+
return NewNull(t), nil
101+
}
102+
70103
// -------------------------------------
71104
// String Functions
72105
// -------------------------------------

0 commit comments

Comments
 (0)