Skip to content

Commit a2aa8d6

Browse files
authored
feature: add ability to test concurrent requests (#92)
* feature: add ability to test concurrent requests * use JSON for HTTP request payload since content type header is set to JSON
1 parent b3fb81d commit a2aa8d6

File tree

3 files changed

+218
-37
lines changed

3 files changed

+218
-37
lines changed

client/concurrency.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"fmt"
19+
"log"
20+
"sync"
21+
"time"
22+
23+
"github.com/GoogleCloudPlatform/functions-framework-conformance/events"
24+
)
25+
26+
func timeExecution(fn func() error) (time.Duration, error) {
27+
start := time.Now()
28+
err := fn()
29+
return time.Since(start), err
30+
}
31+
32+
// validateConcurrency validates a server can handle concurrent requests by
33+
// valdating that the response time for a single request does not increase
34+
// linearly with n concurrent requests, given a function that:
35+
// 1. Is not CPU-bound (e.g. sleeps)
36+
// 2. Executes for at least 1s to ensure non-trivial measurement differences
37+
func validateConcurrency(url string, functionType string) error {
38+
log.Printf("%s validation with concurrent requests...", functionType)
39+
var sendFn func() error
40+
switch functionType {
41+
case "http":
42+
sendFn = func() error {
43+
return sendHTTP(url, []byte(`{"data": "hello"}`))
44+
}
45+
case "cloudevent":
46+
// Arbitrary payload that conforms to CloudEvent schema
47+
sendFn = func() error {
48+
return send(url, events.CloudEvent, []byte(`{
49+
"specversion": "1.0",
50+
"type": "google.firebase.auth.user.v1.created",
51+
"source": "//firebaseauth.googleapis.com/projects/my-project-id",
52+
"subject": "users/UUpby3s4spZre6kHsgVSPetzQ8l2",
53+
"id": "aaaaaa-1111-bbbb-2222-cccccccccccc",
54+
"time": "2020-09-29T11:32:00.123Z",
55+
"datacontenttype": "application/json",
56+
"data": {
57+
"email": "[email protected]",
58+
"metadata": {
59+
"createTime": "2020-05-26T10:42:27Z",
60+
"lastSignInTime": "2020-10-24T11:00:00Z"
61+
},
62+
"providerData": [
63+
{
64+
"email": "[email protected]",
65+
"providerId": "password",
66+
67+
}
68+
],
69+
"uid": "UUpby3s4spZre6kHsgVSPetzQ8l2"
70+
}
71+
}`))
72+
}
73+
case "legacyevent":
74+
// Arbitrary payload that conforms to Background event schema
75+
sendFn = func() error {
76+
return send(url, events.LegacyEvent, []byte(`{
77+
"data": {
78+
"email": "[email protected]",
79+
"metadata": {
80+
"createdAt": "2020-05-26T10:42:27Z",
81+
"lastSignedInAt": "2020-10-24T11:00:00Z"
82+
},
83+
"providerData": [
84+
{
85+
"email": "[email protected]",
86+
"providerId": "password",
87+
88+
}
89+
],
90+
"uid": "UUpby3s4spZre6kHsgVSPetzQ8l2"
91+
},
92+
"eventId": "aaaaaa-1111-bbbb-2222-cccccccccccc",
93+
"eventType": "providers/firebase.auth/eventTypes/user.create",
94+
"notSupported": {
95+
},
96+
"resource": "projects/my-project-id",
97+
"timestamp": "2020-09-29T11:32:00.123Z"
98+
}`))
99+
}
100+
default:
101+
return fmt.Errorf("expected type to be one of 'http', 'cloudevent', or 'legacyevent', got %s", functionType)
102+
}
103+
if err := sendConcurrentRequests(sendFn); err != nil {
104+
return err
105+
}
106+
log.Printf("Concurrency validation passed!")
107+
return nil
108+
}
109+
110+
func sendConcurrentRequests(sendFn func() error) error {
111+
// Get a benchmark for the time it takes for a single request
112+
singleReqTime, singleReqErr := timeExecution(func() error {
113+
return sendFn()
114+
})
115+
if singleReqErr != nil {
116+
return fmt.Errorf("concurrent validation unable to send single request to benchmark response time: %v", singleReqErr)
117+
}
118+
119+
minWait := 1 * time.Second
120+
if singleReqTime < minWait {
121+
return fmt.Errorf("concurrent validation requires a function that waits at least %s before responding, function responded in %s", minWait, singleReqTime)
122+
}
123+
log.Printf("Single request response time benchmarked, took %s for 1 request", singleReqTime)
124+
125+
// Get a benchmark for the time it takes for concurrent requests
126+
const numConReqs = 10
127+
log.Printf("Starting %d concurrent workers to send requests", numConReqs)
128+
129+
type workerResponse struct {
130+
id int
131+
err error
132+
}
133+
var wg sync.WaitGroup
134+
respCh := make(chan workerResponse, numConReqs)
135+
conReqTime, _ := timeExecution(func() error {
136+
for i := 0; i < numConReqs; i++ {
137+
wg.Add(1)
138+
go func(id int) {
139+
defer wg.Done()
140+
err := sendFn()
141+
respCh <- workerResponse{id: id, err: err}
142+
}(i)
143+
}
144+
145+
wg.Wait()
146+
return nil
147+
})
148+
149+
maybeErrMessage := ""
150+
for i := 0; i < numConReqs; i++ {
151+
resp := <-respCh
152+
if resp.err != nil {
153+
maybeErrMessage += fmt.Sprintf("error #%d: %v\n", i, resp.err)
154+
} else {
155+
log.Printf("Worker #%d done", resp.id)
156+
}
157+
}
158+
159+
if maybeErrMessage != "" {
160+
return fmt.Errorf("at least one concurrent request failed:\n%s", maybeErrMessage)
161+
}
162+
163+
// Validate that the concurrent requests were handled faster than if all
164+
// the requests were handled serially, using the single request time
165+
// as a benchmark. The concurrent time should be less than half of the
166+
// time it would have taken to execute all requests serially.
167+
if conReqTime*2 > numConReqs*singleReqTime {
168+
return fmt.Errorf("function took too long to complete %d concurrent requests. %d concurrent request time: %s, single request time: %s", numConReqs, numConReqs, conReqTime, singleReqTime)
169+
}
170+
log.Printf("Concurrent request response time benchmarked, took %s for %d requests", conReqTime, numConReqs)
171+
return nil
172+
}

client/main.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@ import (
2121
)
2222

2323
var (
24-
runCmd = flag.String("cmd", "", "string with command to run a Functions Framework server at localhost:8080. Ignored if -buildpacks=true.")
25-
functionType = flag.String("type", "http", "type of function to validate (must be 'http', 'cloudevent', or 'legacyevent'")
26-
validateMapping = flag.Bool("validate-mapping", true, "whether to validate mapping from legacy->cloud events and vice versa (as applicable)")
27-
outputFile = flag.String("output-file", "function_output.json", "name of file output by function")
28-
useBuildpacks = flag.Bool("buildpacks", true, "whether to use the current release of buildpacks to run the validation. If true, -cmd is ignored and --builder-* flags must be set.")
29-
source = flag.String("builder-source", "", "function source directory to use in building. Required if -buildpacks=true")
30-
target = flag.String("builder-target", "", "function target to use in building. Required if -buildpacks=true")
31-
runtime = flag.String("builder-runtime", "", "runtime to use in building. Required if -buildpacks=true")
32-
tag = flag.String("builder-tag", "latest", "builder image tag to use in building")
33-
startDelay = flag.Uint("start-delay", 1, "Seconds to wait before sending HTTP request to command process")
24+
runCmd = flag.String("cmd", "", "string with command to run a Functions Framework server at localhost:8080. Ignored if -buildpacks=true.")
25+
functionType = flag.String("type", "http", "type of function to validate (must be 'http', 'cloudevent', or 'legacyevent'")
26+
validateMapping = flag.Bool("validate-mapping", true, "whether to validate mapping from legacy->cloud events and vice versa (as applicable)")
27+
outputFile = flag.String("output-file", "function_output.json", "name of file output by function")
28+
useBuildpacks = flag.Bool("buildpacks", true, "whether to use the current release of buildpacks to run the validation. If true, -cmd is ignored and --builder-* flags must be set.")
29+
source = flag.String("builder-source", "", "function source directory to use in building. Required if -buildpacks=true")
30+
target = flag.String("builder-target", "", "function target to use in building. Required if -buildpacks=true")
31+
runtime = flag.String("builder-runtime", "", "runtime to use in building. Required if -buildpacks=true")
32+
tag = flag.String("builder-tag", "latest", "builder image tag to use in building")
33+
startDelay = flag.Uint("start-delay", 1, "Seconds to wait before sending HTTP request to command process")
34+
validateConcurrencyFlag = flag.Bool("validate-concurrency", false, "whether to validate concurrent requests can be handled, requires a function that sleeps for 1 second ")
3435
)
3536

3637
func main() {
@@ -43,15 +44,16 @@ func main() {
4344
}
4445

4546
v := newValidator(validatorParams{
46-
validateMapping: *validateMapping,
47-
useBuildpacks: *useBuildpacks,
48-
runCmd: *runCmd,
49-
outputFile: *outputFile,
50-
source: *source,
51-
target: *target,
52-
runtime: *runtime,
53-
functionType: *functionType,
54-
tag: *tag,
47+
validateMapping: *validateMapping,
48+
useBuildpacks: *useBuildpacks,
49+
runCmd: *runCmd,
50+
outputFile: *outputFile,
51+
source: *source,
52+
target: *target,
53+
runtime: *runtime,
54+
functionType: *functionType,
55+
tag: *tag,
56+
validateConcurrency: *validateConcurrencyFlag,
5557
})
5658

5759
if err := v.runValidation(); err != nil {

client/validate.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,34 @@ import (
2525
)
2626

2727
type validatorParams struct {
28-
useBuildpacks bool
29-
validateMapping bool
30-
runCmd string
31-
outputFile string
32-
source string
33-
target string
34-
runtime string
35-
tag string
36-
functionType string
28+
useBuildpacks bool
29+
validateMapping bool
30+
runCmd string
31+
outputFile string
32+
source string
33+
target string
34+
runtime string
35+
tag string
36+
functionType string
37+
validateConcurrency bool
3738
}
3839

3940
type validator struct {
40-
funcServer functionServer
41-
validateMapping bool
42-
functionType string
43-
stdoutFile string
44-
stderrFile string
41+
funcServer functionServer
42+
validateMapping bool
43+
validateConcurrency bool
44+
functionType string
45+
stdoutFile string
46+
stderrFile string
4547
}
4648

4749
func newValidator(params validatorParams) *validator {
4850
v := validator{
49-
validateMapping: params.validateMapping,
50-
functionType: params.functionType,
51-
stdoutFile: defaultStdoutFile,
52-
stderrFile: defaultStderrFile,
51+
validateMapping: params.validateMapping,
52+
validateConcurrency: params.validateConcurrency,
53+
functionType: params.functionType,
54+
stdoutFile: defaultStdoutFile,
55+
stderrFile: defaultStderrFile,
5356
}
5457

5558
if !params.useBuildpacks {
@@ -129,6 +132,7 @@ func (v validator) validateHTTP(url string) error {
129132
if err := sendHTTP(url, req); err != nil {
130133
return fmt.Errorf("failed to get response from HTTP function: %v", err)
131134
}
135+
132136
output, err := v.funcServer.OutputFile()
133137
if err != nil {
134138
return fmt.Errorf("reading output file from HTTP function: %v", err)
@@ -176,6 +180,9 @@ func (v validator) validateEvents(url string, inputType, outputType events.Event
176180
}
177181

178182
func (v validator) validate(url string) error {
183+
if v.validateConcurrency {
184+
return validateConcurrency(url, v.functionType)
185+
}
179186
switch v.functionType {
180187
case "http":
181188
// Validate HTTP signature, if provided

0 commit comments

Comments
 (0)