Skip to content

Commit b93e0ac

Browse files
authored
Merge pull request #13 from yandex-cloud/feature/stream
Stream name support
2 parents 4079e9c + 73df12e commit b93e0ac

File tree

10 files changed

+45
-25
lines changed

10 files changed

+45
-25
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ for
99

1010
All configuration parameters can be templated via [metadata service](#metadata). You can define templated value as follows: `{{key:default}}` or `{{key}}` (in this case `""` used as default value). All strings matched this template will be replaced by metadata values. In `default_payload` parameter you also can use this template as a JSON value (without quotes), so that JSON struct or array will be included in default payload.
1111

12-
| Key | Description |
13-
|:---|:---|
12+
| Key | Description |
13+
|:------------------|:---|
1414
| `group_id` | (_optional_) [Log group](https://cloud.yandex.ru/docs/logging/concepts/log-group) ID. Has higher priority than `folder_id`. |
1515
| `folder_id` | (_optional_) Folder ID. Has lower priority than `group_id`. Can be auto-detected via [metadata service](#metadata) if `group_id` and `folder_id` are not set. |
1616
| `resource_type` | (_optional_) Resource type of log entries. Can be templated via entry payload as follows: `{entry/json/path}`. |
1717
| `resource_id` | (_optional_) Resource id of log entries. Can be templated via entry payload as follows: `{entry/json/path}`. |
18+
| `stream_name` | (_optional_) Resource id of log entries. Can be templated via entry payload as follows: `{entry/json/path}`. |
1819
| `message_tag_key` | Key of the field to be assigned to the message tag. By default, will be skipped. |
1920
| `message_key` | Key of the field, which will go to `message` attribute of LogEntry. |
2021
| `level_key` | Key of the field, which contains log level, optional. |

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ require (
66
github.com/fluent/fluent-bit-go v0.0.0-20220311094233-780004bf5562
77
github.com/startdusk/strnaming v0.7.0
88
github.com/stretchr/testify v1.7.0
9-
github.com/yandex-cloud/go-genproto v0.0.0-20220511112233-d69ecdb30a46
10-
github.com/yandex-cloud/go-sdk v0.0.0-20220511115426-9feb5f1ee87f
9+
github.com/yandex-cloud/go-genproto v0.0.0-20220718095639-7971ba320057
10+
github.com/yandex-cloud/go-sdk v0.0.0-20220718100739-50070cd9746e
1111
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3
1212
google.golang.org/grpc v1.46.0
1313
google.golang.org/protobuf v1.28.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
9696
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
9797
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
9898
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
99-
github.com/yandex-cloud/go-genproto v0.0.0-20220511112233-d69ecdb30a46 h1:xDrCEN8UsG7MGlzKYoG/Q7H+2Fp+1pBubVMmuIXbP64=
100-
github.com/yandex-cloud/go-genproto v0.0.0-20220511112233-d69ecdb30a46/go.mod h1:HEUYX/p8966tMUHHT+TsS0hF/Ca/NYwqprC5WXSDMfE=
101-
github.com/yandex-cloud/go-sdk v0.0.0-20220511115426-9feb5f1ee87f h1:Ek9unlbD/EY2D+v/JY5kbGR6AKH5Y5eNScp1o+BLh+c=
102-
github.com/yandex-cloud/go-sdk v0.0.0-20220511115426-9feb5f1ee87f/go.mod h1:lfdaCackTXJEzhnyUdx32rmE7RTA2mGc2/KCNaNl4is=
99+
github.com/yandex-cloud/go-genproto v0.0.0-20220718095639-7971ba320057 h1:E7Hb3za9EHWx/AiXNGjBxOPudLFSelaRtyVsvxturM4=
100+
github.com/yandex-cloud/go-genproto v0.0.0-20220718095639-7971ba320057/go.mod h1:HEUYX/p8966tMUHHT+TsS0hF/Ca/NYwqprC5WXSDMfE=
101+
github.com/yandex-cloud/go-sdk v0.0.0-20220718100739-50070cd9746e h1:p/APVZzglWfiQJY+rIPH94SgAqD7PQzoAuAvvVU0iY0=
102+
github.com/yandex-cloud/go-sdk v0.0.0-20220718100739-50070cd9746e/go.mod h1:NszadvtI2oAulirw9kCjAXjSL31EPqe35CbLi0g5Se8=
103103
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
104104
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
105105
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

model/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Resource struct {
2424
type Entry struct {
2525
Timestamp time.Time
2626
Level string
27+
StreamName string
2728
Message string
2829
JSONPayload *structpb.Struct
2930
}

plugin/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ func getParseKeys(getConfigValue func(string) string, metadataProvider metadata.
1111
keyMessageTagKey = "message_tag_key"
1212
keyResourceType = "resource_type"
1313
keyResourceID = "resource_id"
14+
keySteamName = "stream_name"
1415
)
1516

1617
level := metadata.Parse(getConfigValue(keyLevelKey), metadataProvider)
@@ -19,12 +20,14 @@ func getParseKeys(getConfigValue func(string) string, metadataProvider metadata.
1920

2021
resourceType := metadata.Parse(getConfigValue(keyResourceType), metadataProvider)
2122
resourceID := metadata.Parse(getConfigValue(keyResourceID), metadataProvider)
23+
streamName := metadata.Parse(getConfigValue(keySteamName), metadataProvider)
2224

2325
return &parseKeys{
2426
level: level,
2527
message: message,
2628
messageTag: messageTag,
2729
resourceType: newTemplate(resourceType),
2830
resourceID: newTemplate(resourceID),
31+
streamName: newTemplate(streamName),
2932
}
3033
}

plugin/key.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type parseKeys struct {
1515
messageTag string
1616
resourceType *template
1717
resourceID *template
18+
streamName *template
1819
}
1920

2021
func (pk *parseKeys) entry(ts time.Time, record map[interface{}]interface{}, tag string) (*model.Entry, model.Resource, error) {
@@ -34,6 +35,10 @@ func (pk *parseKeys) entry(ts time.Time, record map[interface{}]interface{}, tag
3435
if err != nil {
3536
return nil, model.Resource{}, fmt.Errorf("failed to parse resource ID: %s", err.Error())
3637
}
38+
streamName, err := pk.streamName.parse(record)
39+
if err != nil {
40+
return nil, model.Resource{}, fmt.Errorf("failed to parse stream name: %s", err.Error())
41+
}
3742
resource := model.Resource{
3843
Type: resourceType,
3944
ID: resourceID,
@@ -65,6 +70,7 @@ func (pk *parseKeys) entry(ts time.Time, record map[interface{}]interface{}, tag
6570
}
6671
return &model.Entry{
6772
Level: level,
73+
StreamName: streamName,
6874
Message: message,
6975
JSONPayload: payload,
7076
Timestamp: ts,

plugin/key_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func TestEntry_Success(t *testing.T) {
1616
messageTag: "tag_key",
1717
resourceType: newTemplate("resource_type"),
1818
resourceID: newTemplate("resource_id"),
19+
streamName: newTemplate("stream_name"),
1920
}
2021
ts := time.Now()
2122
record := map[interface{}]interface{}{
@@ -39,6 +40,7 @@ func TestEntry_TemplatedResource_Success(t *testing.T) {
3940
pk := parseKeys{
4041
resourceType: newTemplate("{simple}"),
4142
resourceID: newTemplate("resource_{json/path}"),
43+
streamName: newTemplate("stream_name"),
4244
}
4345
ts := time.Now()
4446
record := map[interface{}]interface{}{
@@ -57,6 +59,7 @@ func TestEntry_TemplatedResourceID_Fail(t *testing.T) {
5759
pk := parseKeys{
5860
resourceType: newTemplate("{simple}"),
5961
resourceID: newTemplate("resource_{json/path}"),
62+
streamName: newTemplate("stream_name"),
6063
}
6164
ts := time.Now()
6265
record := map[interface{}]interface{}{
@@ -71,6 +74,7 @@ func TestEntry_TemplatedResourceType_Fail(t *testing.T) {
7174
pk := parseKeys{
7275
resourceType: newTemplate("{simple}"),
7376
resourceID: newTemplate("resource_{json/path}"),
77+
streamName: newTemplate("stream_name"),
7478
}
7579
ts := time.Now()
7680
record := map[interface{}]interface{}{

plugin/plugin_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func TestInit_AllConfig_Success(t *testing.T) {
2727
"message_tag_key": "message_tag",
2828
"resource_type": "resource_type",
2929
"resource_id": "resource_id",
30+
"stream_name": "stream_name",
3031
}
3132
metadataProvider := test.MetadataProvider{}
3233
client := &test.Client{}
@@ -39,6 +40,7 @@ func TestInit_AllConfig_Success(t *testing.T) {
3940
assert.Equal(t, "message_tag", plugin.keys.messageTag)
4041
assert.Equal(t, &template{"resource_type", [][]string{}}, plugin.keys.resourceType)
4142
assert.Equal(t, &template{"resource_id", [][]string{}}, plugin.keys.resourceID)
43+
assert.Equal(t, &template{"stream_name", [][]string{}}, plugin.keys.streamName)
4244
}
4345
func TestInit_AllConfigTemplated_Success(t *testing.T) {
4446
configMap = map[string]string{
@@ -69,8 +71,8 @@ func TestInit_AllConfigTemplated_Success(t *testing.T) {
6971

7072
func TestTransform_Success(t *testing.T) {
7173
records := []map[interface{}]interface{}{
72-
{"type": "1_type", "id": "1_id", "name": 10},
73-
{"type": "2_type", "id": "2_id", "name": 20},
74+
{"type": "1_type", "id": "1_id", "name": 10, "stream": "stream1"},
75+
{"type": "2_type", "id": "2_id", "name": 20, "stream": "stream1"},
7476
}
7577
var cur uint64 = 0
7678
var recordProvider = func() (ret int, ts interface{}, rec map[interface{}]interface{}) {
@@ -84,6 +86,7 @@ func TestTransform_Success(t *testing.T) {
8486
keys: &parseKeys{
8587
resourceType: newTemplate("{type}"),
8688
resourceID: newTemplate("{id}"),
89+
streamName: newTemplate("{stream}"),
8790
},
8891
}
8992

@@ -105,16 +108,16 @@ func TestTransform_Success(t *testing.T) {
105108
}
106109
func TestTransform_IdentifyingResource_Success(t *testing.T) {
107110
records := []map[interface{}]interface{}{
108-
{"type": "1_type", "id": "1_id", "name": 10},
109-
{"type": "1_type", "id": "2_id", "name": 20},
110-
{"type": "1_type", "id": "2_id", "name": 21},
111-
{"type": "2_type", "id": "1_id", "name": 30},
112-
{"type": "2_type", "id": "1_id", "name": 31},
113-
{"type": "2_type", "id": "1_id", "name": 32},
114-
{"type": "2_type", "id": "2_id", "name": 40},
115-
{"type": "2_type", "id": "2_id", "name": 41},
116-
{"type": "2_type", "id": "2_id", "name": 42},
117-
{"type": "2_type", "id": "2_id", "name": 43},
111+
{"type": "1_type", "id": "1_id", "name": 10, "stream": "stream1"},
112+
{"type": "1_type", "id": "2_id", "name": 20, "stream": "stream1"},
113+
{"type": "1_type", "id": "2_id", "name": 21, "stream": "stream1"},
114+
{"type": "2_type", "id": "1_id", "name": 30, "stream": "stream1"},
115+
{"type": "2_type", "id": "1_id", "name": 31, "stream": "stream1"},
116+
{"type": "2_type", "id": "1_id", "name": 32, "stream": "stream1"},
117+
{"type": "2_type", "id": "2_id", "name": 40, "stream": "stream1"},
118+
{"type": "2_type", "id": "2_id", "name": 41, "stream": "stream1"},
119+
{"type": "2_type", "id": "2_id", "name": 42, "stream": "stream1"},
120+
{"type": "2_type", "id": "2_id", "name": 43, "stream": "stream1"},
118121
}
119122
var cur uint64 = 0
120123
var recordProvider = func() (ret int, ts interface{}, rec map[interface{}]interface{}) {
@@ -128,16 +131,17 @@ func TestTransform_IdentifyingResource_Success(t *testing.T) {
128131
keys: &parseKeys{
129132
resourceType: newTemplate("{type}"),
130133
resourceID: newTemplate("{id}"),
134+
streamName: newTemplate("{stream}"),
131135
},
132136
}
133137

134138
resourceToEntries := plugin.Transform(recordProvider, "tag")
135139

136140
expected := map[model.Resource][]*logging.IncomingLogEntry{
137-
{Type: "1_type", ID: "1_id"}: {{}},
138-
{Type: "1_type", ID: "2_id"}: {{}, {}},
139-
{Type: "2_type", ID: "1_id"}: {{}, {}, {}},
140-
{Type: "2_type", ID: "2_id"}: {{}, {}, {}, {}},
141+
{Type: "1_type", ID: "1_id"}: {{StreamName: "stream1"}},
142+
{Type: "1_type", ID: "2_id"}: {{StreamName: "stream1"}, {StreamName: "stream1"}},
143+
{Type: "2_type", ID: "1_id"}: {{StreamName: "stream1"}, {StreamName: "stream1"}, {StreamName: "stream1"}},
144+
{Type: "2_type", ID: "2_id"}: {{StreamName: "stream1"}, {StreamName: "stream1"}, {StreamName: "stream1"}, {StreamName: "stream1"}},
141145
}
142146
assert.NotNil(t, resourceToEntries)
143147
assert.Equal(t, len(expected), len(resourceToEntries))

plugin/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func getRecordValue(record map[interface{}]interface{}, path []string) (string,
2727
}
2828
cur = typed[index]
2929
default:
30-
return "", errors.New("incorrect path")
30+
return "", fmt.Errorf("incorrect path")
3131
}
3232
}
3333
if cur == nil {

yclient/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func (c *client) loggingWriteRequest(req *model.WriteRequest) *logging.WriteRequ
9999
level, _ := levelFromString(entry.Level)
100100
entries = append(entries, &logging.IncomingLogEntry{
101101
Level: level,
102+
StreamName: entry.StreamName,
102103
Message: entry.Message,
103104
JsonPayload: entry.JSONPayload,
104105
Timestamp: timestamppb.New(entry.Timestamp),

0 commit comments

Comments
 (0)