Skip to content

Commit 02e840a

Browse files
authored
reinit ingestion client on perm denied (#2)
1 parent c1aa576 commit 02e840a

File tree

6 files changed

+222
-82
lines changed

6 files changed

+222
-82
lines changed

client.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
"unsafe"
9+
10+
"google.golang.org/grpc"
11+
12+
"github.com/yandex-cloud/go-genproto/yandex/cloud/logging/v1"
13+
ycsdk "github.com/yandex-cloud/go-sdk"
14+
)
15+
16+
type client struct {
17+
mu sync.RWMutex
18+
writer logging.LogIngestionServiceClient
19+
init func() error
20+
}
21+
22+
var _ logging.LogIngestionServiceClient = (*client)(nil)
23+
24+
func (c *client) Write(ctx context.Context, in *logging.WriteRequest, opts ...grpc.CallOption) (*logging.WriteResponse, error) {
25+
c.mu.RLock()
26+
defer c.mu.RUnlock()
27+
return c.writer.Write(ctx, in, opts...)
28+
}
29+
30+
var (
31+
PluginVersion string
32+
FluentBitVersion string
33+
)
34+
35+
func clientInit(c *client, plugin unsafe.Pointer) func() error {
36+
var initTime time.Time
37+
return func() error {
38+
c.mu.Lock()
39+
defer c.mu.Unlock()
40+
41+
const initBackoff = 30 * time.Second
42+
passed := time.Since(initTime)
43+
if passed < initBackoff {
44+
return fmt.Errorf("%s since last client init haven't passed, only %s", initBackoff, passed)
45+
}
46+
47+
const (
48+
keyAuthorization = "authorization"
49+
keyEndpoint = "endpoint"
50+
defaultEndpoint = "api.cloud.yandex.net:443"
51+
)
52+
53+
authorization := getConfigKey(plugin, keyAuthorization)
54+
if authorization == "" {
55+
return fmt.Errorf("authorization missing")
56+
}
57+
58+
credentials, err := makeCredentials(authorization)
59+
if err != nil {
60+
return err
61+
}
62+
63+
endpoint := getConfigKey(plugin, keyEndpoint)
64+
if endpoint == "" {
65+
endpoint = defaultEndpoint
66+
}
67+
68+
tlsConfig, err := makeTLSConfig(plugin)
69+
if err != nil {
70+
return fmt.Errorf("error creating tls config: %s", err.Error())
71+
}
72+
73+
sdk, err := ycsdk.Build(context.Background(),
74+
ycsdk.Config{
75+
Credentials: credentials,
76+
Endpoint: endpoint,
77+
TLSConfig: tlsConfig,
78+
},
79+
grpc.WithUserAgent(`fluent-bit-plugin-yandex/`+PluginVersion+`; fluent-bit/`+FluentBitVersion),
80+
)
81+
if err != nil {
82+
return fmt.Errorf("error creating sdk: %s", err.Error())
83+
}
84+
c.writer = sdk.LogIngestion().LogIngestion()
85+
initTime = time.Now()
86+
return nil
87+
}
88+
}
89+
90+
func getIngestionClient(plugin unsafe.Pointer) (*client, error) {
91+
c := new(client)
92+
c.init = clientInit(c, plugin)
93+
return c, c.init()
94+
}

config.go

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
package main
22

33
import (
4-
"context"
54
"fmt"
65
"strings"
76
"unsafe"
87

98
"github.com/fluent/fluent-bit-go/output"
10-
"google.golang.org/grpc"
11-
129
"github.com/yandex-cloud/go-genproto/yandex/cloud/logging/v1"
13-
ycsdk "github.com/yandex-cloud/go-sdk"
14-
ingest "github.com/yandex-cloud/go-sdk/gen/logingestion"
1510
)
1611

1712
func getConfigKey(plugin unsafe.Pointer, key string) string {
@@ -104,49 +99,3 @@ func getParseKeys(plugin unsafe.Pointer) *parseKeys {
10499
messageTag: getConfigKey(plugin, keyMessageTagKey),
105100
}
106101
}
107-
108-
var (
109-
PluginVersion string
110-
FluentBitVersion string
111-
)
112-
113-
func getIngestionClient(plugin unsafe.Pointer) (*ingest.LogIngestionServiceClient, error) {
114-
const (
115-
keyAuthorization = "authorization"
116-
keyEndpoint = "endpoint"
117-
defaultEndpoint = "api.cloud.yandex.net:443"
118-
)
119-
120-
authorization := getConfigKey(plugin, keyAuthorization)
121-
if authorization == "" {
122-
return nil, fmt.Errorf("authorization missing")
123-
}
124-
125-
credentials, err := makeCredentials(authorization)
126-
if err != nil {
127-
return nil, err
128-
}
129-
130-
endpoint := getConfigKey(plugin, keyEndpoint)
131-
if endpoint == "" {
132-
endpoint = defaultEndpoint
133-
}
134-
135-
tlsConfig, err := makeTLSConfig(plugin)
136-
if err != nil {
137-
return nil, fmt.Errorf("error creating tls config: %s", err.Error())
138-
}
139-
140-
sdk, err := ycsdk.Build(context.Background(),
141-
ycsdk.Config{
142-
Credentials: credentials,
143-
Endpoint: endpoint,
144-
TLSConfig: tlsConfig,
145-
},
146-
grpc.WithUserAgent(`fluent-bit-plugin-yandex/`+PluginVersion+`; fluent-bit/`+FluentBitVersion),
147-
)
148-
if err != nil {
149-
return nil, fmt.Errorf("error creating sdk: %s", err.Error())
150-
}
151-
return sdk.LogIngestion().LogIngestion(), nil
152-
}

go.mod

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
module github.com/yandex-cloud/fluent-bit-plugin-yandex
22

3-
go 1.16
3+
go 1.17
44

55
require (
66
github.com/fluent/fluent-bit-go v0.0.0-20201210173045-3fd1e0486df2
7-
github.com/yandex-cloud/go-genproto v0.0.0-20210824140926-1bca7bc0c005
8-
github.com/yandex-cloud/go-sdk v0.0.0-20210824141121-182aedd44a25
9-
google.golang.org/grpc v1.28.0
10-
google.golang.org/protobuf v1.26.0
7+
github.com/yandex-cloud/go-genproto v0.0.0-20211202135052-789603780fb2
8+
github.com/yandex-cloud/go-sdk v0.0.0-20211206101223-7c4e7926bf53
9+
google.golang.org/grpc v1.42.0
10+
google.golang.org/protobuf v1.27.1
11+
)
12+
13+
require (
14+
github.com/ghodss/yaml v1.0.0 // indirect
15+
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
16+
github.com/golang/protobuf v1.5.2 // indirect
17+
github.com/hashicorp/errwrap v1.1.0 // indirect
18+
github.com/hashicorp/go-multierror v1.1.1 // indirect
19+
github.com/ugorji/go/codec v1.1.7 // indirect
20+
golang.org/x/net v0.0.0-20211205041911-012df41ee64c // indirect
21+
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
22+
golang.org/x/text v0.3.7 // indirect
23+
google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9 // indirect
24+
gopkg.in/yaml.v2 v2.4.0 // indirect
1125
)

0 commit comments

Comments
 (0)