Skip to content

Commit 8b2ec00

Browse files
committed
chore: use main context for webhook source
1 parent 12d5613 commit 8b2ec00

File tree

6 files changed

+293
-395
lines changed

6 files changed

+293
-395
lines changed

cmd/webhooksource-adapter/main.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,5 @@ const (
1212
)
1313

1414
func main() {
15-
sctx := signals.NewContext()
16-
17-
ctx := adapter.WithController(sctx, webhooksource.NewController)
18-
ctx = adapter.WithHAEnabled(ctx)
19-
ctx = adapter.WithConfigWatcherEnabled(ctx)
20-
21-
adapter.MainWithContext(ctx, component, webhooksource.NewEnvConfig, webhooksource.NewAdapter)
15+
adapter.MainWithContext(signals.NewContext(), component, webhooksource.NewEnvConfig, webhooksource.NewAdapter)
2216
}

pkg/sources/adapter/webhooksource/adapter.go

Lines changed: 1 addition & 299 deletions
Original file line numberDiff line numberDiff line change
@@ -2,87 +2,16 @@ package webhooksource
22

33
import (
44
"context"
5-
"errors"
6-
"fmt"
7-
"io"
8-
"net/http"
9-
"strings"
10-
"time"
115

126
cloudevents "github.com/cloudevents/sdk-go/v2"
13-
"go.uber.org/zap"
14-
"golang.org/x/sync/errgroup"
157

168
"knative.dev/eventing/pkg/adapter/v2"
179
"knative.dev/pkg/logging"
1810

1911
"github.com/zeiss/pkg/conv"
20-
"github.com/zeiss/pkg/utilx"
2112
"github.com/zeiss/typhoon/pkg/apis/sources"
2213
)
2314

24-
var _ adapter.Adapter = (*mtWebhookAdapter)(nil)
25-
26-
type mtWebhookAdapter struct {
27-
eventType string
28-
eventSource string
29-
extensionAttributesFrom *ExtensionAttributesFrom
30-
username string
31-
password string
32-
corsAllowOrigin string
33-
34-
ceClient cloudevents.Client
35-
logger *zap.SugaredLogger
36-
mt *adapter.MetricTag
37-
}
38-
39-
// NewEnvConfig satisfies pkgadapter.EnvConfigConstructor.
40-
func NewEnvConfig() adapter.EnvConfigAccessor {
41-
return &envAccessor{}
42-
}
43-
44-
type envAccessor struct {
45-
adapter.EnvConfig
46-
47-
EventType string `envconfig:"WEBHOOK_EVENT_TYPE" required:"true"`
48-
EventSource string `envconfig:"WEBHOOK_EVENT_SOURCE" required:"true"`
49-
EventExtensionAttributesFrom *ExtensionAttributesFrom `envconfig:"WEBHOOK_EVENT_EXTENSION_ATTRIBUTES_FROM"`
50-
BasicAuthUsername string `envconfig:"WEBHOOK_BASICAUTH_USERNAME"`
51-
BasicAuthPassword string `envconfig:"WEBHOOK_BASICAUTH_PASSWORD"`
52-
CORSAllowOrigin string `envconfig:"WEBHOOK_CORS_ALLOW_ORIGIN"`
53-
}
54-
55-
// ExtensionAttributesFrom defines the HTTP elements that should be used to populate CloudEvent extension attributes.
56-
type ExtensionAttributesFrom struct {
57-
headers bool
58-
host bool
59-
method bool
60-
path bool
61-
queries bool
62-
}
63-
64-
// Decode an array of KeyMountedValues
65-
func (ea *ExtensionAttributesFrom) Decode(value string) error {
66-
for _, o := range strings.Split(value, ",") {
67-
switch o {
68-
case "method":
69-
ea.method = true
70-
case "path":
71-
ea.path = true
72-
case "host":
73-
ea.host = true
74-
case "queries":
75-
ea.queries = true
76-
case "headers":
77-
ea.headers = true
78-
default:
79-
return fmt.Errorf("CloudEvent extension from HTTP element not supported: %s", o)
80-
}
81-
}
82-
83-
return nil
84-
}
85-
8615
// NewAdapter satisfies pkgadapter.AdapterConstructor.
8716
func NewAdapter(ctx context.Context, envAcc adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
8817
logger := logging.FromContext(ctx)
@@ -95,7 +24,7 @@ func NewAdapter(ctx context.Context, envAcc adapter.EnvConfigAccessor, ceClient
9524

9625
env := envAcc.(*envAccessor)
9726

98-
return &mtWebhookAdapter{
27+
return &webhookHandler{
9928
eventType: env.EventType,
10029
eventSource: env.EventSource,
10130
extensionAttributesFrom: env.EventExtensionAttributesFrom,
@@ -108,230 +37,3 @@ func NewAdapter(ctx context.Context, envAcc adapter.EnvConfigAccessor, ceClient
10837
mt: mt,
10938
}
11039
}
111-
112-
const (
113-
serverPort uint16 = 8080
114-
serverShutdownGracePeriod = time.Second * 10
115-
queryPrefix = "q"
116-
headerPrefix = "h"
117-
)
118-
119-
// Start runs the HTTP event handler.
120-
func (a *mtWebhookAdapter) Start(ctx context.Context) error {
121-
a.logger.Info("Starting webhook event handler")
122-
123-
ctx = adapter.ContextWithMetricTag(ctx, a.mt)
124-
125-
m := http.NewServeMux()
126-
m.HandleFunc("/", a.handleAll(ctx))
127-
m.HandleFunc("/health", healthCheckHandler)
128-
129-
s := &http.Server{
130-
Addr: fmt.Sprintf(":%d", serverPort),
131-
Handler: m,
132-
ReadTimeout: 5 * time.Second,
133-
}
134-
135-
return runHandler(ctx, s)
136-
}
137-
138-
// runHandler runs the HTTP event handler until ctx gets cancelled.
139-
func runHandler(ctx context.Context, s *http.Server) error {
140-
g, ctx := errgroup.WithContext(ctx)
141-
142-
g.Go(func() error {
143-
return s.ListenAndServe()
144-
})
145-
146-
errCh := make(chan error)
147-
go func() {
148-
errCh <- s.ListenAndServe()
149-
}()
150-
151-
handleServerError := func(err error) error {
152-
if errors.Is(err, http.ErrServerClosed) {
153-
return fmt.Errorf("during server runtime: %w", err)
154-
}
155-
156-
return nil
157-
}
158-
159-
select {
160-
case <-ctx.Done():
161-
logging.FromContext(ctx).Info("HTTP event handler is shutting down")
162-
163-
ctx, cancel := context.WithTimeout(context.Background(), serverShutdownGracePeriod)
164-
defer cancel()
165-
166-
// nolint:contextcheck
167-
if err := s.Shutdown(ctx); err != nil {
168-
return fmt.Errorf("during server shutdown: %w", err)
169-
}
170-
171-
return handleServerError(<-errCh)
172-
173-
case err := <-errCh:
174-
return handleServerError(err)
175-
}
176-
}
177-
178-
// handleAll receives all webhook events at a single resource, it
179-
// is up to this function to parse event wrapper and dispatch.
180-
// nolint:gocyclo
181-
func (h *mtWebhookAdapter) handleAll(ctx context.Context) http.HandlerFunc {
182-
return func(w http.ResponseWriter, r *http.Request) {
183-
if utilx.NotEmpty(h.corsAllowOrigin) {
184-
w.Header().Set("Access-Control-Allow-Origin", h.corsAllowOrigin)
185-
}
186-
187-
if r.Body == nil {
188-
h.handleError(errors.New("request without body not supported"), http.StatusBadRequest, w)
189-
return
190-
}
191-
192-
if utilx.And(utilx.NotEmpty(h.username), utilx.NotEmpty(h.password)) {
193-
us, ps, ok := r.BasicAuth()
194-
if !ok {
195-
h.handleError(errors.New("wrong authentication header"), http.StatusBadRequest, w)
196-
return
197-
}
198-
199-
if utilx.Or(us != h.username, ps != h.password) {
200-
h.handleError(errors.New("credentials are not valid"), http.StatusUnauthorized, w)
201-
return
202-
}
203-
}
204-
205-
defer r.Body.Close()
206-
body, err := io.ReadAll(r.Body)
207-
if err != nil {
208-
h.handleError(err, http.StatusInternalServerError, w)
209-
return
210-
}
211-
212-
event := cloudevents.NewEvent(cloudevents.VersionV1)
213-
event.SetType(h.eventType)
214-
event.SetSource(h.eventSource)
215-
216-
// Add extension attributes if configured
217-
if h.extensionAttributesFrom != nil {
218-
if h.extensionAttributesFrom.path {
219-
event.SetExtension("path", r.URL.Path)
220-
}
221-
if h.extensionAttributesFrom.method {
222-
event.SetExtension("method", r.Method)
223-
}
224-
if h.extensionAttributesFrom.host {
225-
event.SetExtension("host", r.Host)
226-
}
227-
if h.extensionAttributesFrom.queries {
228-
for k, v := range r.URL.Query() {
229-
if len(v) == 1 {
230-
event.SetExtension(sanitizeCloudEventAttributeName(queryPrefix+k), v[0])
231-
} else {
232-
for i := range v {
233-
event.SetExtension(sanitizeCloudEventAttributeName(
234-
fmt.Sprintf("%s%s%d", queryPrefix, k, i)), v[i])
235-
}
236-
}
237-
}
238-
}
239-
240-
if h.extensionAttributesFrom.headers {
241-
for k, v := range r.Header {
242-
// Prevent Authorization header from being added
243-
// as a CloudEvent attribute
244-
if k == "Authorization" {
245-
continue
246-
}
247-
if k == "Ce-Id" {
248-
if len(v) != 0 {
249-
event.SetID(v[0])
250-
}
251-
continue
252-
}
253-
254-
if k == "Ce-Subject" {
255-
if len(v) != 0 {
256-
event.SetSubject(v[0])
257-
}
258-
continue
259-
}
260-
261-
if len(v) == 1 {
262-
event.SetExtension(sanitizeCloudEventAttributeName(headerPrefix+k), v[0])
263-
} else {
264-
for i := range v {
265-
event.SetExtension(sanitizeCloudEventAttributeName(
266-
fmt.Sprintf("%s%s%d", headerPrefix, k, i)), v[i])
267-
}
268-
}
269-
}
270-
}
271-
}
272-
273-
if err := event.SetData(r.Header.Get("Content-Type"), body); err != nil {
274-
h.handleError(fmt.Errorf("failed to set event data: %w", err), http.StatusInternalServerError, w)
275-
return
276-
}
277-
278-
rEvent, result := h.ceClient.Request(ctx, event)
279-
if !cloudevents.IsACK(result) {
280-
h.handleError(fmt.Errorf("could not send Cloud Event: %w", result), http.StatusInternalServerError, w)
281-
return
282-
}
283-
if rEvent == nil || rEvent.Data() == nil {
284-
w.WriteHeader(http.StatusNoContent)
285-
return
286-
}
287-
288-
w.WriteHeader(http.StatusOK)
289-
}
290-
}
291-
292-
func (h *mtWebhookAdapter) handleError(err error, code int, w http.ResponseWriter) {
293-
h.logger.Error("An error occurred", zap.Error(err))
294-
http.Error(w, err.Error(), code)
295-
}
296-
297-
func healthCheckHandler(w http.ResponseWriter, _ *http.Request) {
298-
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
299-
w.WriteHeader(http.StatusOK)
300-
}
301-
302-
// nolint:gocyclo
303-
func sanitizeCloudEventAttributeName(name string) string {
304-
// only lowercase accepted
305-
name = strings.ToLower(name)
306-
307-
// strip non valid characters
308-
needsStripping := false
309-
for i := range name {
310-
if !((name[i] >= 'a' && name[i] <= 'z') || (name[i] >= '0' && name[i] <= '9')) {
311-
needsStripping = true
312-
break
313-
}
314-
}
315-
316-
if needsStripping {
317-
stripped := []byte{}
318-
for i := range name {
319-
if (name[i] >= 'a' && name[i] <= 'z') || (name[i] >= '0' && name[i] <= '9') {
320-
stripped = append(stripped, name[i])
321-
}
322-
}
323-
name = string(stripped)
324-
}
325-
326-
// truncate if longer than 20 characters
327-
if len(name) > 20 {
328-
name = name[:20]
329-
}
330-
331-
// data is a reserved element at CloudEvents
332-
if name == "data" || name == "path" || name == "method" || name == "host" {
333-
return "data0"
334-
}
335-
336-
return name
337-
}

pkg/sources/adapter/webhooksource/controller.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)