Skip to content

Commit 0c89227

Browse files
authored
Add scamper queues for batch and daily (#737)
* add scamper queue for batch and daily * change cloud function. * fix typo * add log * fix unittest * Create app-scamper.yaml * Update queue.yaml * more fix * Update .travis.yml * add log * add log * fix data type * Update pt.go * Update pt_test.go * more log * Update .travis.yml * clean up
1 parent 9a9bd01 commit 0c89227

File tree

7 files changed

+136
-5
lines changed

7 files changed

+136
-5
lines changed

.travis.yml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ deploy:
207207
$TRAVIS_BRANCH == ss-sandbox-* ||
208208
$TRAVIS_BRANCH == fast-sandbox-* ||
209209
$TRAVIS_BRANCH == pt-sandbox-* ||
210-
$TRAVIS_BRANCH == disco-sandbox-*
210+
$TRAVIS_BRANCH == disco-sandbox-* ||
211+
$TRAVIS_BRANCH == scamper-sandbox-*
211212

212213
## Service: etl-ndt-parser -- AppEngine Flexible Environment.
213214
- provider: script
@@ -256,6 +257,17 @@ deploy:
256257
repo: m-lab/etl
257258
all_branches: true
258259
condition: $TRAVIS_BRANCH == pt-sandbox-* || $TRAVIS_BRANCH == sandbox-*
260+
261+
## Service: etl-scamper-parser -- AppEngine Flexible Environment.
262+
- provider: script
263+
script:
264+
$TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-sandbox
265+
SERVICE_ACCOUNT_mlab_sandbox $TRAVIS_BUILD_DIR/cmd/etl_worker app-scamper.yaml
266+
skip_cleanup: true
267+
on:
268+
repo: m-lab/etl
269+
all_branches: true
270+
condition: $TRAVIS_BRANCH == scamper-sandbox-* || $TRAVIS_BRANCH == sandbox-*
259271

260272
## Service: etl-disco-parser -- AppEngine Flexible Environment.
261273
- provider: script
@@ -303,6 +315,8 @@ deploy:
303315
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-staging
304316
SERVICE_ACCOUNT_mlab_staging $TRAVIS_BUILD_DIR/cmd/etl_worker app-traceroute.yaml
305317
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-staging
318+
SERVICE_ACCOUNT_mlab_staging $TRAVIS_BUILD_DIR/cmd/etl_worker app-scamper.yaml
319+
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-staging
306320
SERVICE_ACCOUNT_mlab_staging $TRAVIS_BUILD_DIR/cmd/etl_worker app-sidestream.yaml
307321
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-staging
308322
SERVICE_ACCOUNT_mlab_staging $TRAVIS_BUILD_DIR/cmd/etl_worker fast-sidestream.yaml
@@ -369,6 +383,8 @@ deploy:
369383
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-oti
370384
SERVICE_ACCOUNT_mlab_oti $TRAVIS_BUILD_DIR/cmd/etl_worker app-traceroute.yaml
371385
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-oti
386+
SERVICE_ACCOUNT_mlab_oti $TRAVIS_BUILD_DIR/cmd/etl_worker app-scamper.yaml
387+
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-oti
372388
SERVICE_ACCOUNT_mlab_oti $TRAVIS_BUILD_DIR/cmd/etl_worker app-sidestream.yaml
373389
&& $TRAVIS_BUILD_DIR/travis/deploy_app.sh mlab-oti
374390
SERVICE_ACCOUNT_mlab_oti $TRAVIS_BUILD_DIR/cmd/etl_worker app-disco.yaml

appengine/queue.yaml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,49 @@ queue:
696696
max_backoff_seconds: 20
697697
max_doublings: 0
698698

699+
# Scamper JSON barch queue.
700+
- name: etl-scamper-batch-0
701+
target: etl-batch-parser
702+
rate: 1/s
703+
bucket_size: 10
704+
# max concurrent is limited, because we will share the pipeline with other experiment types
705+
max_concurrent_requests: 20
706+
retry_parameters:
707+
task_age_limit: 12h
708+
# min and max are same, so that queue rate only diminishes as tasks are drained
709+
# it is set rather low, because traceroute days have relatively few files compared to NDT
710+
# and we want the rate limited by the rate/concurrent requests, not by the backoff
711+
min_backoff_seconds: 20
712+
max_backoff_seconds: 20
713+
- name: etl-scamper-batch-1
714+
target: etl-batch-parser
715+
rate: 1/s
716+
bucket_size: 10
717+
max_concurrent_requests: 20
718+
retry_parameters:
719+
task_age_limit: 12h
720+
min_backoff_seconds: 20
721+
max_backoff_seconds: 20
722+
- name: etl-scamper-batch-2
723+
target: etl-batch-parser
724+
rate: 1/s
725+
bucket_size: 10
726+
max_concurrent_requests: 20
727+
retry_parameters:
728+
task_age_limit: 12h
729+
min_backoff_seconds: 20
730+
max_backoff_seconds: 20
731+
- name: etl-scamper-batch-3
732+
target: etl-batch-parser
733+
rate: 1/s
734+
bucket_size: 10
735+
max_concurrent_requests: 20
736+
retry_parameters:
737+
task_age_limit: 12h
738+
min_backoff_seconds: 20
739+
max_backoff_seconds: 20
740+
741+
699742
- name: etl-traceroute-queue
700743
target: etl-traceroute-parser
701744
# Average rate at which to release tasks to the service. Default is 5/sec
@@ -707,6 +750,17 @@ queue:
707750
# Maximum number of concurrent requests.
708751
max_concurrent_requests: 5
709752

753+
- name: etl-scamper-queue
754+
target: etl-scamper-parser
755+
# Average rate at which to release tasks to the service. Default is 5/sec
756+
# This is actually the rate at which tokens are added to the bucket.
757+
rate: 1/s
758+
# Number of tokens that can accumulate in the bucket. Default is 5. This should
759+
# have very little impact for our environment.
760+
bucket_size: 10
761+
# Maximum number of concurrent requests.
762+
max_concurrent_requests: 5
763+
710764
- name: etl-sidestream-queue
711765
target: etl-sidestream-parser
712766
# Average rate at which to release tasks to the service. Default is 5/sec

cmd/etl_worker/app-scamper.yaml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Use github release to trigger deployment.
2+
3+
runtime: custom
4+
env: flex
5+
service: etl-scamper-parser
6+
7+
# Resource and scaling options. For more background, see:
8+
# https://cloud.google.com/appengine/docs/flexible/go/configuring-your-app-with-app-yaml
9+
10+
# TODO(dev): adjust CPU and memory based on actual requirements.
11+
resources:
12+
cpu: 2
13+
# Instances support between [(cpu * 0.9) - 0.4, (cpu * 6.5) - 0.4]
14+
# Actual memory available is exposed via GAE_MEMORY_MB environment variable.
15+
# even though the parser has extremely limited RAM requirements, the
16+
# minimum RAM assignable to 2 CPUs is around 1.5GB
17+
# We see some OOMs at 3 GB (in etl-batch-parser), so let's try 6
18+
memory_gb: 6
19+
20+
automatic_scaling:
21+
# We expect fairly steady load, so a modest minimum will rarely cost us anything.
22+
min_num_instances: 2
23+
max_num_instances: 20
24+
cool_down_period_sec: 3000
25+
# We don't care much about latency, so a high utilization is desireable.
26+
cpu_utilization:
27+
target_utilization: 0.6
28+
29+
# Note: add a public port for GCE auto discovery by prometheus.
30+
# TODO(dev): are any values redundant or irrelevant?
31+
network:
32+
instance_tag: etl-parser
33+
name: default
34+
# Forward port 9090 on the GCE instance address to the same port in the
35+
# container address. Only forward TCP traffic.
36+
# Note: the default AppEngine container port 8080 cannot be forwarded.
37+
forwarded_ports:
38+
- 9090/tcp
39+
40+
env_variables:
41+
# These should be substituted in the travis deployment script.
42+
RELEASE_TAG: ${TRAVIS_TAG}
43+
COMMIT_HASH: ${TRAVIS_COMMIT}
44+
45+
MAX_WORKERS: 5
46+
BIGQUERY_PROJECT: '${INJECTED_PROJECT}' # Overrides GCLOUD_PROJECT
47+
# BIGQUERY_DATASET: 'base_tables' # Overrided computed dataset.

etl/globals.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ var (
233233
"paris-traceroute": PT,
234234
"switch": SW,
235235
"tcpinfo": TCPINFO,
236+
"traceroute": PT,
236237
}
237238

238239
// DataTypeToTable maps from data type to BigQuery table name.

etl/globals_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ func TestValidateTestPath(t *testing.T) {
9292
"archive-mlab-oti", "", "paris-traceroute", "2019/06/11", "20190611", "000002", "", "mlab2", "bom01", "paris-traceroute", "0000", "", ".tgz",
9393
},
9494
},
95+
{
96+
name: "scamper-tgz",
97+
path: `gs://archive-mlab-oti/ndt/traceroute/2019/06/20/20190620T224809.435046Z-traceroute-mlab1-den06-ndt.tgz`,
98+
wantType: etl.PT,
99+
want: &etl.DataPath{
100+
"archive-mlab-oti", "ndt", "traceroute", "2019/06/20", "20190620", "224809", "traceroute", "mlab1", "den06", "ndt", "", "", ".tgz",
101+
},
102+
},
95103
}
96104
for _, tt := range tests {
97105
t.Run(tt.name, func(t *testing.T) {

functions/enqueue.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ exports.queueForFile = function (filename) {
1515
experiment_to_task_queue = {
1616
"switch": "etl-disco-queue",
1717
"ndt": "etl-ndt-queue",
18+
"ndt/traceroute": "etl-scamper-queue",
1819
"sidestream": "etl-sidestream-queue",
19-
"paris-traceroute": "etl-paris-traceroute-queue"
20+
"paris-traceroute": "etl-traceroute-queue"
2021
};
2122
// TODO - fix this.
2223
for (key in experiment_to_task_queue) {
@@ -98,4 +99,4 @@ exports.createProdTaskOnFileNotification = function (event, callback) {
9899

99100
exports.createSandboxTaskOnEmbargoFileNotification = exports.createSandboxTaskOnFileNotification
100101
exports.createStagingTaskOnEmbargoFileNotification = exports.createStagingTaskOnFileNotification
101-
exports.createProdTaskOnEmbargoFileNotification = exports.createProdTaskOnFileNotification
102+
exports.createProdTaskOnEmbargoFileNotification = exports.createProdTaskOnFileNotification

parser/pt.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func ParseJSON(testName string, rawContent []byte, tableName string, taskFilenam
163163
err = json.Unmarshal([]byte(output), &scamperResult)
164164
if err != nil {
165165
// fail and return here.
166+
log.Printf("extra jasonnet processing failed for %s", testName)
166167
return schema.PTTest{}, err
167168
}
168169
}
@@ -498,7 +499,7 @@ func (pt *PTParser) NumBufferedTests() int {
498499

499500
// IsParsable returns the canonical test type and whether to parse data.
500501
func (pt *PTParser) IsParsable(testName string, data []byte) (string, bool) {
501-
if strings.HasSuffix(testName, ".paris") {
502+
if strings.HasSuffix(testName, ".paris") || strings.HasSuffix(testName, ".jsonl") {
502503
return "paris", true
503504
}
504505
return "unknown", false
@@ -517,7 +518,7 @@ func (pt *PTParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
517518
}
518519

519520
// Process the json output of Scamper binary.
520-
if strings.Contains(pt.taskFileName, "jsonl") {
521+
if strings.Contains(testName, "jsonl") {
521522
ptTest, err := ParseJSON(testName, rawContent, pt.TableName(), pt.taskFileName)
522523
if err == nil {
523524
err := pt.AddRow(&ptTest)
@@ -527,6 +528,9 @@ func (pt *PTParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
527528
pt.PutAsync(pt.TakeRows())
528529
pt.AddRow(&ptTest)
529530
}
531+
} else {
532+
// Modify metrics
533+
log.Printf("JSON parsing failed with error %v for %s", err, testName)
530534
}
531535
return nil
532536
}

0 commit comments

Comments
 (0)