Skip to content

Commit 751aef9

Browse files
committed
Stream events directly to GCS
This takes a fairly different approach to how we emit our logs to GCS. Previously we received them in one container and wrote them out to the local filesystem, and a sidecar would periodically enumerate the files emitted by that process, concatenate them, and send them up to GCS in a single upload. When we shifted to Cloud Run, this approach became problematic because the filesystem is backed by memory, so under heavy load the event handler could see a lot of memory pressure between rotations and between the filesystem and the concatenation for the upload they end up in memory twice. By collapsing the two processes together and simply uploading things directly, we can initiate a new file write, trickle events to that writer, and then flush the active writers. Worst case the client is dumb and buffers things once, but in a perfect world this would initiate an upload of unknown size and we would stream events as they come in, which would dramatically reduce our memory pressure to roughly O(active events). Signed-off-by: Matt Moore <[email protected]>
1 parent 652a950 commit 751aef9

File tree

8 files changed

+87
-523
lines changed

8 files changed

+87
-523
lines changed

modules/cloudevent-recorder/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ module "foo-emits-events" {
7777
}
7878
```
7979
<!-- BEGIN_TF_DOCS -->
80+
8081
## Requirements
8182

8283
No requirements.
@@ -136,7 +137,7 @@ No requirements.
136137
| <a name="input_cloud_storage_config_max_duration"></a> [cloud\_storage\_config\_max\_duration](#input\_cloud\_storage\_config\_max\_duration) | The maximum duration that can elapse before a new Cloud Storage file is created. Min 1 minute, max 10 minutes, default 5 minutes. | `number` | `300` | no |
137138
| <a name="input_deletion_protection"></a> [deletion\_protection](#input\_deletion\_protection) | Whether to enable deletion protection on data resources. | `bool` | `true` | no |
138139
| <a name="input_enable_profiler"></a> [enable\_profiler](#input\_enable\_profiler) | Enable cloud profiler. | `bool` | `false` | no |
139-
| <a name="input_flush_interval"></a> [flush\_interval](#input\_flush\_interval) | Flush interval for logrotate, as a duration string. | `string` | `""` | no |
140+
| <a name="input_flush_interval"></a> [flush\_interval](#input\_flush\_interval) | Flush interval for logrotate, as a duration string. | `string` | `"3m"` | no |
140141
| <a name="input_ignore_unknown_values"></a> [ignore\_unknown\_values](#input\_ignore\_unknown\_values) | Whether to ignore unknown values in the data, when transferring data to BigQuery. | `bool` | `false` | no |
141142
| <a name="input_limits"></a> [limits](#input\_limits) | Resource limits for the regional go service. | <pre>object({<br> cpu = string<br> memory = string<br> })</pre> | `null` | no |
142143
| <a name="input_location"></a> [location](#input\_location) | The location to create the BigQuery dataset in, and in which to run the data transfer jobs from GCS. | `string` | `"US"` | no |

modules/cloudevent-recorder/cmd/logrotate/main.go

Lines changed: 0 additions & 36 deletions
This file was deleted.

modules/cloudevent-recorder/cmd/recorder/main.go

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,29 @@ import (
1010
"os"
1111
"os/signal"
1212
"path/filepath"
13+
"strconv"
14+
"sync"
1315
"syscall"
16+
"time"
1417

1518
cloudevents "github.com/cloudevents/sdk-go/v2"
1619
"github.com/sethvargo/go-envconfig"
20+
"gocloud.dev/blob"
1721

1822
"github.com/chainguard-dev/clog"
1923
_ "github.com/chainguard-dev/clog/gcp/init"
2024
"github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics"
2125
mce "github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics/cloudevents"
2226
"github.com/chainguard-dev/terraform-infra-common/pkg/profiler"
27+
28+
// Add gcsblob support that we need to support gs:// prefixes
29+
_ "gocloud.dev/blob/gcsblob"
2330
)
2431

2532
var env = envconfig.MustProcess(context.Background(), &struct {
26-
Port int `env:"PORT, default=8080"`
27-
LogPath string `env:"LOG_PATH, required"`
33+
Port int `env:"PORT, default=8080"`
34+
FlushInterval time.Duration `env:"FLUSH_INTERVAL, default=3m"`
35+
Bucket string `env:"BUCKET, required"`
2836
}{})
2937

3038
func main() {
@@ -40,20 +48,79 @@ func main() {
4048
if err != nil {
4149
clog.Fatalf("failed to create event client, %v", err)
4250
}
51+
52+
bucket, err := blob.OpenBucket(ctx, env.Bucket)
53+
if err != nil {
54+
clog.Fatalf("failed to open bucket, %v", err)
55+
}
56+
defer bucket.Close()
57+
58+
var m sync.Mutex
59+
writers := make(map[string]*blob.Writer, 10)
60+
61+
// Periodically flush the writers to commit the data to the bucket.
62+
go func() {
63+
done := false
64+
for {
65+
writersToDrain := func() map[string]*blob.Writer {
66+
m.Lock()
67+
defer m.Unlock()
68+
// Swap the writers map so we can safely iterate and close the writers.
69+
writersToDrain := writers
70+
writers = make(map[string]*blob.Writer, 10)
71+
return writersToDrain
72+
}()
73+
74+
for t, w := range writersToDrain {
75+
clog.Infof("Flushing writer[%s]", t)
76+
if err := w.Close(); err != nil {
77+
clog.Errorf("failed to close writer[%s]: %v", t, err)
78+
}
79+
}
80+
81+
if done {
82+
clog.InfoContextf(ctx, "Exiting flush loop")
83+
return
84+
}
85+
select {
86+
case <-time.After(env.FlushInterval):
87+
case <-ctx.Done():
88+
clog.InfoContext(ctx, "Flushing one more time")
89+
done = true
90+
}
91+
}
92+
}()
93+
94+
// Listen for events and as they come in write them to the appropriate
95+
// writer based on event type.
4396
if err := c.StartReceiver(ctx, func(_ context.Context, event cloudevents.Event) error {
44-
dir := filepath.Join(env.LogPath, event.Type())
45-
if err := os.MkdirAll(dir, 0755); err != nil {
97+
writer, err := func() (*blob.Writer, error) {
98+
m.Lock()
99+
defer m.Unlock()
100+
101+
w, ok := writers[event.Type()]
102+
if !ok {
103+
w, err = bucket.NewWriter(ctx, filepath.Join(event.Type(), strconv.FormatInt(time.Now().UnixNano(), 10)), nil)
104+
if err != nil {
105+
clog.Errorf("failed to create writer: %v", err)
106+
return nil, err
107+
}
108+
}
109+
writers[event.Type()] = w
110+
return w, nil
111+
}()
112+
if err != nil {
113+
clog.Errorf("failed to create writer: %v", err)
46114
return err
47115
}
48116

49-
filename := filepath.Join(dir, event.ID())
50-
if err := os.WriteFile(filename, event.Data(), 0600); err != nil {
51-
clog.Warnf("failed to write file %s; %v", filename, err)
52-
if err := os.RemoveAll(filename); err != nil {
53-
clog.Warnf("failed to remove failed write file: %s; %v", filename, err)
54-
}
117+
// Write the event data as a line to the writer.
118+
line := string(event.Data())
119+
if _, err := writer.Write([]byte(line + "\n")); err != nil {
120+
clog.Errorf("failed to write event data: %v", err)
55121
return err
56122
}
123+
57124
return nil
58125
}); err != nil {
59126
clog.Fatalf("failed to start event receiver, %v", err)

modules/cloudevent-recorder/recorder.tf

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,15 @@ resource "google_storage_bucket_iam_binding" "recorder-writes-to-gcs-buckets" {
2020
members = ["serviceAccount:${google_service_account.recorder.email}"]
2121
}
2222

23-
locals {
24-
lenv = [{
25-
name = "LOG_PATH"
26-
value = "/logs"
27-
}]
28-
29-
logrotate_env = var.flush_interval == "" ? local.lenv : concat(local.lenv, [{
30-
name = "FLUSH_INTERVAL"
31-
value = var.flush_interval
32-
}])
33-
}
34-
3523
module "this" {
3624
count = var.method == "trigger" ? 1 : 0
3725
source = "../regional-go-service"
3826
project_id = var.project_id
3927
name = var.name
4028
regions = var.regions
4129

30+
deletion_protection = var.deletion_protection
31+
4232
service_account = google_service_account.recorder.email
4333
containers = {
4434
"recorder" = {
@@ -48,37 +38,18 @@ module "this" {
4838
}
4939
ports = [{ container_port = 8080 }]
5040
env = [{
51-
name = "LOG_PATH"
52-
value = "/logs"
53-
}]
54-
volume_mounts = [{
55-
name = "logs"
56-
mount_path = "/logs"
41+
name = "FLUSH_INTERVAL"
42+
value = var.flush_interval
5743
}]
58-
resources = {
59-
limits = var.limits
60-
}
61-
}
62-
"logrotate" = {
63-
source = {
64-
working_dir = path.module
65-
importpath = "./cmd/logrotate"
66-
}
67-
env = local.logrotate_env
6844
regional-env = [{
6945
name = "BUCKET"
7046
value = { for k, v in google_storage_bucket.recorder : k => v.url }
7147
}]
72-
volume_mounts = [{
73-
name = "logs"
74-
mount_path = "/logs"
75-
}]
48+
resources = {
49+
limits = var.limits
50+
}
7651
}
7752
}
78-
volumes = [{
79-
name = "logs"
80-
empty_dir = {}
81-
}]
8253

8354
scaling = var.scaling
8455

modules/cloudevent-recorder/variables.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,5 +141,5 @@ variable "split_triggers" {
141141
variable "flush_interval" {
142142
description = "Flush interval for logrotate, as a duration string."
143143
type = string
144-
default = ""
144+
default = "3m"
145145
}

0 commit comments

Comments
 (0)