Skip to content

Pull upstream changes 2023/04 #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions cmd/aws-lambda-rie/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"strings"
"time"

"go.amzn.com/lambda/core/statejson"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapidcore"
"go.amzn.com/lambda/rapidcore/env"

"github.com/google/uuid"

Expand All @@ -27,6 +29,19 @@ type Sandbox interface {
Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error
}

type InteropServer interface {
Init(i *interop.Init, invokeTimeoutMs int64) error
AwaitInitialized() error
FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct bool) error
Reserve(id string, traceID, lambdaSegmentID string) (*rapidcore.ReserveResponse, error)
Reset(reason string, timeoutMs int64) (*statejson.ResetDescription, error)
AwaitRelease() (*statejson.InternalStateDescription, error)
Shutdown(shutdown *interop.Shutdown) *statejson.InternalStateDescription
InternalState() (*statejson.InternalStateDescription, error)
CurrentToken() *interop.Token
Restore(restore *interop.Restore) error
}

var initDone bool

func GetenvWithDefault(key string, defaultValue string) string {
Expand Down Expand Up @@ -57,7 +72,7 @@ func printEndReports(invokeId string, initDuration string, memorySize string, in
invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize)
}

func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {
func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap) {
log.Debugf("invoke: -> %s %s %v", r.Method, r.URL, r.Header)
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
Expand All @@ -80,7 +95,7 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {

if !initDone {

initStart, initEnd := InitHandler(sandbox, functionVersion, timeout)
initStart, initEnd := InitHandler(sandbox, functionVersion, timeout, bs)

// Calculate InitDuration
initTimeMS := math.Min(float64(initEnd.Sub(initStart).Nanoseconds()),
Expand All @@ -99,7 +114,6 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {
TraceID: r.Header.Get("X-Amzn-Trace-Id"),
LambdaSegmentID: r.Header.Get("X-Amzn-Segment-Id"),
Payload: bytes.NewReader(bodyBytes),
CorrelationID: "invokeCorrelationID",
}
fmt.Println("START RequestId: " + invokePayload.ID + " Version: " + functionVersion)

Expand Down Expand Up @@ -166,7 +180,7 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {
w.Write(invokeResp.Body)
}

func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.Time, time.Time) {
func InitHandler(sandbox Sandbox, functionVersion string, timeout int64, bs interop.Bootstrap) (time.Time, time.Time) {
additionalFunctionEnvironmentVariables := map[string]string{}

// Add default Env Vars if they were not defined. This is a required otherwise 1p Python2.7, Python3.6, and
Expand All @@ -189,15 +203,20 @@ func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.T
// pass to rapid
sandbox.Init(&interop.Init{
Handler: GetenvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
CorrelationID: "initCorrelationID",
AwsKey: os.Getenv("AWS_ACCESS_KEY_ID"),
AwsSecret: os.Getenv("AWS_SECRET_ACCESS_KEY"),
AwsSession: os.Getenv("AWS_SESSION_TOKEN"),
XRayDaemonAddress: "0.0.0.0:0", // TODO
FunctionName: GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
FunctionVersion: functionVersion,

RuntimeInfo: interop.RuntimeInfo{
ImageJSON: "{}",
Arn: "",
Version: ""},
CustomerEnvironmentVariables: additionalFunctionEnvironmentVariables,
SandboxType: interop.SandboxClassic,
Bootstrap: bs,
EnvironmentVariables: env.NewEnvironment(),
}, timeout*1000)
initEnd := time.Now()
return initStart, initEnd
Expand Down
6 changes: 4 additions & 2 deletions cmd/aws-lambda-rie/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"net/http"

log "github.com/sirupsen/logrus"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapidcore"
)

func startHTTPServer(ipport string, sandbox Sandbox) {
func startHTTPServer(ipport string, sandbox *rapidcore.SandboxBuilder, bs interop.Bootstrap) {
srv := &http.Server{
Addr: ipport,
}

// Pass a channel
http.HandleFunc("/2015-03-31/functions/function/invocations", func(w http.ResponseWriter, r *http.Request) {
InvokeHandler(w, r, sandbox)
InvokeHandler(w, r, sandbox.LambdaInvokeAPI(), bs)
})

// go routine (main thread waits)
Expand Down
51 changes: 44 additions & 7 deletions cmd/aws-lambda-rie/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"fmt"
"net"
"os"
"runtime/debug"

Expand All @@ -21,20 +22,49 @@ const (
)

type options struct {
LogLevel string `long:"log-level" default:"info" description:"log level"`
LogLevel string `long:"log-level" description:"The level of AWS Lambda Runtime Interface Emulator logs to display. Can also be set by the environment variable 'LOG_LEVEL'. Defaults to the value 'info'."`
InitCachingEnabled bool `long:"enable-init-caching" description:"Enable support for Init Caching"`
// Do not have a default value so we do not need to keep it in sync with the default value in lambda/rapidcore/sandbox_builder.go
RuntimeAPIAddress string `long:"runtime-api-address" description:"The address of the AWS Lambda Runtime API to communicate with the Lambda execution environment."`
RuntimeInterfaceEmulatorAddress string `long:"runtime-interface-emulator-address" default:"0.0.0.0:8080" description:"The address for the AWS Lambda Runtime Interface Emulator to accept HTTP request upon."`
}

func main() {
// More frequent GC reduces the tail latencies, equivalent to export GOGC=33
debug.SetGCPercent(33)

opts, args := getCLIArgs()
rapidcore.SetLogLevel(opts.LogLevel)

logLevel := "info"

// If you specify an option by using a parameter on the CLI command line, it overrides any value from either the corresponding environment variable.
//
// https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
if opts.LogLevel != "" {
logLevel = opts.LogLevel
} else if envLogLevel, envLogLevelSet := os.LookupEnv("LOG_LEVEL"); envLogLevelSet {
logLevel = envLogLevel
}

rapidcore.SetLogLevel(logLevel)

if opts.RuntimeAPIAddress != "" {
_, _, err := net.SplitHostPort(opts.RuntimeAPIAddress)

if err != nil {
log.WithError(err).Fatalf("The command line value for \"--runtime-api-address\" is not a valid network address %q.", opts.RuntimeAPIAddress)
}
}

_, _, err := net.SplitHostPort(opts.RuntimeInterfaceEmulatorAddress)

if err != nil {
log.WithError(err).Fatalf("The command line value for \"--runtime-interface-emulator-address\" is not a valid network address %q.", opts.RuntimeInterfaceEmulatorAddress)
}

bootstrap, handler := getBootstrap(args, opts)
sandbox := rapidcore.
NewSandboxBuilder(bootstrap).
NewSandboxBuilder().
AddShutdownFunc(context.CancelFunc(func() { os.Exit(0) })).
SetExtensionsFlag(true).
SetInitCachingFlag(opts.InitCachingEnabled)
Expand All @@ -43,10 +73,17 @@ func main() {
sandbox.SetHandler(handler)
}

go sandbox.Create()
if opts.RuntimeAPIAddress != "" {
sandbox.SetRuntimeAPIAddress(opts.RuntimeAPIAddress)
}

sandboxContext, internalStateFn := sandbox.Create()
// Since we have not specified a custom interop server for standalone, we can
// directly reference the default interop server, which is a concrete type
sandbox.DefaultInteropServer().SetSandboxContext(sandboxContext)
sandbox.DefaultInteropServer().SetInternalStateGetter(internalStateFn)

testAPIipport := "0.0.0.0:8080"
startHTTPServer(testAPIipport, sandbox)
startHTTPServer(opts.RuntimeInterfaceEmulatorAddress, sandbox, bootstrap)
}

func getCLIArgs() (options, []string) {
Expand Down Expand Up @@ -112,5 +149,5 @@ func getBootstrap(args []string, opts options) (*rapidcore.Bootstrap, string) {
log.Panic("insufficient arguments: bootstrap not provided")
}

return rapidcore.NewBootstrapSingleCmd(bootstrapLookupCmd, currentWorkingDir), handler
return rapidcore.NewBootstrapSingleCmd(bootstrapLookupCmd, currentWorkingDir, ""), handler
}
75 changes: 18 additions & 57 deletions lambda/agents/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,38 @@
package agents

import (
"fmt"
"io"
"io/ioutil"
"os/exec"
"os"
"path"
"syscall"
"path/filepath"

log "github.com/sirupsen/logrus"
)

// AgentProcess is the common interface exposed by both internal and external agent processes
type AgentProcess interface {
Name() string
}

// ExternalAgentProcess represents an external agent process
type ExternalAgentProcess struct {
cmd *exec.Cmd
}

// NewExternalAgentProcess returns a new external agent process
func NewExternalAgentProcess(path string, env []string, stdoutWriter io.Writer, stderrWriter io.Writer) ExternalAgentProcess {
command := exec.Command(path)
command.Env = env

command.Stdout = NewNewlineSplitWriter(stdoutWriter)
command.Stderr = NewNewlineSplitWriter(stderrWriter)
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

return ExternalAgentProcess{
cmd: command,
}
}

// Name returns the name of the agent
// For external agents is the executable name
func (a *ExternalAgentProcess) Name() string {
return path.Base(a.cmd.Path)
}

func (a *ExternalAgentProcess) Pid() int {
return a.cmd.Process.Pid
}

// Start starts an external agent process
func (a *ExternalAgentProcess) Start() error {
return a.cmd.Start()
}

// Wait waits for the external agent process to exit
func (a *ExternalAgentProcess) Wait() error {
return a.cmd.Wait()
}

// String is used to print values passed as an operand to any format that accepts a string or to an unformatted printer such as Print.
func (a *ExternalAgentProcess) String() string {
return fmt.Sprintf("%s (%s)", a.Name(), a.cmd.Path)
}

// ListExternalAgentPaths return a list of external agents found in a given directory
func ListExternalAgentPaths(root string) []string {
func ListExternalAgentPaths(dir string, root string) []string {
var agentPaths []string
files, err := ioutil.ReadDir(root)
if !isCanonical(dir) || !isCanonical(root) {
log.Warningf("Agents base paths are not absolute and in canonical form: %s, %s", dir, root)
return agentPaths
}
fullDir := path.Join(root, dir)
files, err := os.ReadDir(fullDir)
if err != nil {
log.WithError(err).Warning("Cannot list external agents")
return agentPaths
}
for _, file := range files {
if !file.IsDir() {
agentPaths = append(agentPaths, path.Join(root, file.Name()))
// The returned path is absolute wrt to `root`. This allows
// to exec the agents in their own mount namespace
p := path.Join("/", dir, file.Name())
agentPaths = append(agentPaths, p)
}
}
return agentPaths
}

func isCanonical(path string) bool {
absPath, err := filepath.Abs(path)
return err == nil && absPath == path
}
Loading