Skip to content

Commit cbd47d8

Browse files
committed
enable Geo with SS.
1 parent 6689388 commit cbd47d8

File tree

3 files changed

+14
-9
lines changed

3 files changed

+14
-9
lines changed

cmd/etl_worker/app-sidestream.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ network:
3737

3838
env_variables:
3939
MAX_WORKERS: 50
40+
BIGQUERY_PROJECT: 'mlab-sandbox'
41+
BIGQUERY_DATASET: 'mlab_sandbox'
42+
ANNOTATE_IP: 'true'

parser/ss.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,19 @@ func NewSSParser(ins etl.Inserter) *SSParser {
3131
// The legacy filename is like "20170203T00:00:00Z_ALL0.web100"
3232
// The current filename is like "20170315T01:00:00Z_173.205.3.39_0.web100"
3333
// Return time stamp if the filename is in right format
34-
func ExtractLogtimeFromFilename(fileName string) (int64, error) {
34+
func ExtractLogtimeFromFilename(fileName string) (time.Time, error) {
3535
testName := filepath.Base(fileName)
3636
if len(testName) < 19 || !strings.Contains(testName, ".web100") {
3737
log.Println(testName)
38-
return 0, errors.New("Invalid sidestream filename")
38+
return time.Time{}, errors.New("Invalid sidestream filename")
3939
}
4040

4141
t, err := time.Parse("20060102T15:04:05.999999999Z_", testName[0:17]+".000000000Z_")
4242
if err != nil {
43-
return 0, err
43+
return time.Time{}, err
4444
}
4545

46-
return t.Unix(), nil
46+
return t, nil
4747
}
4848

4949
// the first line of SS test is in format "K: cid PollTime LocalAddress LocalPort ... other_web100_variables_separated_by_space"
@@ -89,7 +89,7 @@ func (ss *SSParser) Flush() error {
8989
}
9090

9191
// Prepare data into sidestream BigQeury schema and insert it.
92-
func PackDataIntoSchema(ss_value map[string]string, log_time int64, testName string) (schema.SS, error) {
92+
func PackDataIntoSchema(ss_value map[string]string, log_time time.Time, testName string) (schema.SS, error) {
9393
local_port, err := strconv.Atoi(ss_value["LocalPort"])
9494
if err != nil {
9595
return schema.SS{}, err
@@ -106,12 +106,14 @@ func PackDataIntoSchema(ss_value map[string]string, log_time int64, testName str
106106
Remote_ip: ss_value["RemAddress"],
107107
Remote_port: int64(remote_port),
108108
}
109+
110+
AddGeoDataSSConnSpec(conn_spec, log_time)
109111
snap, err := PopulateSnap(ss_value)
110112
if err != nil {
111113
return schema.SS{}, err
112114
}
113115
web100_log := &schema.Web100LogEntry{
114-
Log_time: log_time,
116+
Log_time: log_time.Unix(),
115117
Version: "unknown",
116118
Group_name: "read",
117119
Connection_spec: *conn_spec,
@@ -120,7 +122,7 @@ func PackDataIntoSchema(ss_value map[string]string, log_time int64, testName str
120122

121123
ss_test := &schema.SS{
122124
Test_id: testName,
123-
Log_time: log_time,
125+
Log_time: log_time.Unix(),
124126
Type: int64(1),
125127
Project: int64(2),
126128
Web100_log_entry: *web100_log,

parser/ss_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010

1111
func TestExtractLogtimeFromFilename(t *testing.T) {
1212
log_time, _ := parser.ExtractLogtimeFromFilename("20170315T01:00:00Z_173.205.3.39_0.web100")
13-
if log_time != 1489539600 {
14-
fmt.Println(log_time)
13+
if log_time.Unix() != 1489539600 {
14+
fmt.Println(log_time.Unix())
1515
t.Fatalf("log time not parsed correctly.")
1616
}
1717
}

0 commit comments

Comments
 (0)