Skip to content

Commit ddc21c0

Browse files
authored
Merge pull request #1628 from orozery/objectstore_marshaller
add objectstore marshaller
2 parents 723e8e4 + 94e888e commit ddc21c0

File tree

5 files changed

+97
-4
lines changed

5 files changed

+97
-4
lines changed

Diff for: contrib/objectstore/main.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/skydive-project/skydive/common"
1010
"github.com/skydive-project/skydive/config"
1111
"github.com/skydive-project/skydive/contrib/objectstore/subscriber"
12+
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/flowtransformer"
1213
shttp "github.com/skydive-project/skydive/http"
1314
"github.com/skydive-project/skydive/logging"
1415
"github.com/skydive-project/skydive/websocket"
@@ -39,6 +40,13 @@ func main() {
3940
subscriberUsername := cfg.GetString("subscriber_username")
4041
subscriberPassword := cfg.GetString("subscriber_password")
4142
maxSecondsPerStream := cfg.GetInt("max_seconds_per_stream")
43+
flowTransformerName := cfg.GetString("flow_transformer")
44+
45+
flowTransformer, err := flowtransformer.New(flowTransformerName)
46+
if err != nil {
47+
logging.GetLogger().Errorf("Failed to initialize flow transformer: %s", err.Error())
48+
os.Exit(1)
49+
}
4250

4351
authOpts := &shttp.AuthenticationOpts{
4452
Username: subscriberUsername,
@@ -58,7 +66,7 @@ func main() {
5866
}
5967
structClient := wsClient.UpgradeToStructSpeaker()
6068

61-
s := subscriber.New(endpoint, region, bucket, accessKey, secretKey, objectPrefix, maxSecondsPerStream)
69+
s := subscriber.New(endpoint, region, bucket, accessKey, secretKey, objectPrefix, maxSecondsPerStream, flowTransformer)
6270

6371
// subscribe to the flow updates
6472
structClient.AddStructMessageHandler(s, []string{"flow"})

Diff for: contrib/objectstore/skydive-objectstore.yml.default

+1
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
# subscriber_username:
99
# subscriber_password:
1010
# max_seconds_per_stream: 86400
11+
# flow_transformer: custom1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package custom1
2+
3+
import (
4+
"github.com/skydive-project/skydive/flow"
5+
)
6+
7+
// Flow represents a transformed flow
8+
type Flow struct {
9+
UUID string
10+
LayersPath string
11+
Network *flow.FlowLayer
12+
Transport *flow.TransportLayer
13+
LastUpdateMetric *flow.FlowMetric
14+
Metric *flow.FlowMetric
15+
Start int64
16+
Last int64
17+
FinishType flow.FlowFinishType
18+
}
19+
20+
// FlowTransformer is a custom transformer for flows
21+
type FlowTransformer struct {
22+
}
23+
24+
// Transform transforms a flow before being stored
25+
func (m *FlowTransformer) Transform(f *flow.Flow) interface{} {
26+
return &Flow{
27+
UUID: f.UUID,
28+
LayersPath: f.LayersPath,
29+
Network: f.Network,
30+
Transport: f.Transport,
31+
LastUpdateMetric: f.LastUpdateMetric,
32+
Metric: f.Metric,
33+
Start: f.Start,
34+
Last: f.Last,
35+
FinishType: f.FinishType,
36+
}
37+
}
38+
39+
// New returns a new Custom1Marshaller
40+
func New() *FlowTransformer {
41+
return &FlowTransformer{}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package flowtransformer
2+
3+
import (
4+
"fmt"
5+
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/flowtransformer/custom1"
6+
"github.com/skydive-project/skydive/flow"
7+
)
8+
9+
// FlowTransformer allows generic transformations of a flow
10+
type FlowTransformer interface {
11+
// Transform transforms a flow before being stored
12+
Transform(flow *flow.Flow) interface{}
13+
}
14+
15+
// New creates a new flow transformer based on a name string
16+
func New(flowTransformerName string) (FlowTransformer, error) {
17+
switch flowTransformerName {
18+
case "custom1":
19+
return custom1.New(), nil
20+
case "":
21+
return nil, nil
22+
default:
23+
return nil, fmt.Errorf("Marshaller '%s' is not supported", flowTransformerName)
24+
}
25+
}

Diff for: contrib/objectstore/subscriber/objectstore.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/aws/aws-sdk-go/aws"
1414

1515
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/client"
16+
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/flowtransformer"
1617
"github.com/skydive-project/skydive/flow"
1718
"github.com/skydive-project/skydive/logging"
1819
ws "github.com/skydive-project/skydive/websocket"
@@ -33,6 +34,7 @@ type Subscriber struct {
3334
currentStream stream
3435
maxStreamDuration time.Duration
3536
objectStoreClient client.Client
37+
flowTransformer flowtransformer.FlowTransformer
3638
}
3739

3840
// OnStructMessage is triggered when WS server sends us a message.
@@ -58,7 +60,21 @@ func (s *Subscriber) StoreFlows(flows []*flow.Flow) error {
5860
return nil
5961
}
6062

61-
flowsString, err := json.Marshal(flows)
63+
var jsonMarshalInput interface{}
64+
if s.flowTransformer == nil {
65+
jsonMarshalInput = flows
66+
} else {
67+
transformedFlows := make([]interface{}, 0, len(flows))
68+
for _, f := range flows {
69+
transformedFlow := s.flowTransformer.Transform(f)
70+
if transformedFlow != nil {
71+
transformedFlows = append(transformedFlows, transformedFlow)
72+
}
73+
}
74+
jsonMarshalInput = transformedFlows
75+
}
76+
77+
flowsBytes, err := json.Marshal(jsonMarshalInput)
6278
if err != nil {
6379
logging.GetLogger().Error("Error encoding flows: ", err)
6480
return err
@@ -83,7 +99,7 @@ func (s *Subscriber) StoreFlows(flows []*flow.Flow) error {
8399
// gzip
84100
var b bytes.Buffer
85101
w := gzip.NewWriter(&b)
86-
w.Write([]byte(flowsString))
102+
w.Write(flowsBytes)
87103
w.Close()
88104

89105
currentStream := s.currentStream
@@ -106,13 +122,14 @@ func (s *Subscriber) StoreFlows(flows []*flow.Flow) error {
106122
}
107123

108124
// New returns a new flows subscriber writing to an object storage service
109-
func New(endpoint, region, bucket, accessKey, secretKey, objectPrefix string, maxSecondsPerStream int) *Subscriber {
125+
func New(endpoint, region, bucket, accessKey, secretKey, objectPrefix string, maxSecondsPerStream int, flowTransformer flowtransformer.FlowTransformer) *Subscriber {
110126
objectStoreClient := client.New(endpoint, region, accessKey, secretKey)
111127
s := &Subscriber{
112128
bucket: bucket,
113129
objectPrefix: objectPrefix,
114130
maxStreamDuration: time.Second * time.Duration(maxSecondsPerStream),
115131
objectStoreClient: objectStoreClient,
132+
flowTransformer: flowTransformer,
116133
}
117134
return s
118135
}

0 commit comments

Comments
 (0)