Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/localstack/awsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func getBootstrap(args []string) (interop.Bootstrap, string) {
return NewSimpleBootstrap(bootstrapLookupCmd, currentWorkingDir), handler
}

func PrintEndReports(invokeId string, initDuration string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) {
func PrintEndReports(invokeId string, initDuration string, status string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) {
// Calculate invoke duration
invokeDuration := math.Min(float64(time.Now().Sub(invokeStart).Nanoseconds()),
float64(timeoutDuration.Nanoseconds())) / float64(time.Millisecond)
Expand All @@ -102,11 +102,12 @@ func PrintEndReports(invokeId string, initDuration string, memorySize string, in
// not a clean way to get this information from rapidcore
_, _ = fmt.Fprintf(w,
"REPORT RequestId: %s\t"+
initDuration+
"Duration: %.2f ms\t"+
"Billed Duration: %.f ms\t"+
"Memory Size: %s MB\t"+
"Max Memory Used: %s MB\t\n",
"Max Memory Used: %s MB\t"+
initDuration+
status+"\n",
invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize)
}

Expand Down
103 changes: 80 additions & 23 deletions cmd/localstack/custom_interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type CustomInteropServer struct {
localStackAdapter *LocalStackAdapter
port string
upstreamEndpoint string
// initStart is set once in Init() and warmStart is flipped on the first invoke.
// Both are accessed only from the single sequential init -> invoke flow (the RIE
// processes one invocation at a time), so they need no additional synchronization.
initStart time.Time
warmStart bool
}

type LocalStackAdapter struct {
Expand All @@ -43,10 +48,11 @@ const (

func (l *LocalStackAdapter) SendStatus(status LocalStackStatus, payload []byte) error {
statusUrl := fmt.Sprintf("%s/status/%s/%s", l.UpstreamEndpoint, l.RuntimeId, status)
_, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload))
resp, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

Expand All @@ -56,8 +62,12 @@ func (l *LocalStackAdapter) SendLogs(invokeId string, logs LogResponse) error {
if err != nil {
return err
}
_, err = http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized))
return err
resp, err := http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

// SendResult posts the invocation result body to LocalStack.
Expand All @@ -77,8 +87,12 @@ func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError boo
} else {
log.Infoln("Sending to /response")
}
_, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body))
return err
resp, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

// The InvokeRequest is sent by LocalStack to trigger an invocation
Expand All @@ -87,25 +101,25 @@ type InvokeRequest struct {
InvokedFunctionArn string `json:"invoked-function-arn"`
Payload string `json:"payload"`
TraceId string `json:"trace-id"`
IsInitRetry bool `json:"is-init-retry,omitempty"`
}

// The ErrorResponse is sent TO LocalStack when encountering an error
type ErrorResponse struct {
ErrorMessage string `json:"errorMessage"`
ErrorType string `json:"errorType,omitempty"`
RequestId string `json:"requestId,omitempty"`
StackTrace []string `json:"stackTrace,omitempty"`
ErrorMessage string `json:"errorMessage"`
ErrorType string `json:"errorType,omitempty"`
// RequestId uses *string so that an empty string "" is serialized (not omitted),
// while nil is omitted — init errors always set this field, fault events leave it nil.
RequestId *string `json:"requestId,omitempty"`
StackTrace []string `json:"stackTrace,omitempty"`
}

func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
server = &CustomInteropServer{
delegate: delegate.(*rapidcore.Server),
port: lsOpts.InteropPort,
upstreamEndpoint: lsOpts.RuntimeEndpoint,
localStackAdapter: &LocalStackAdapter{
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
RuntimeId: lsOpts.RuntimeId,
},
delegate: delegate.(*rapidcore.Server),
port: lsOpts.InteropPort,
upstreamEndpoint: lsOpts.RuntimeEndpoint,
localStackAdapter: adapter,
}

// TODO: extract this
Expand All @@ -128,6 +142,13 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
functionVersion := GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION") // default $LATEST
_, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion)

initDuration := ""
if !server.warmStart && !invokeR.IsInitRetry {
initTimeMS := float64(time.Since(server.initStart).Nanoseconds()) / float64(time.Millisecond)
initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)
}
server.warmStart = true

invokeStart := time.Now()
err = server.Invoke(invokeResp, &interop.Invoke{
ID: invokeR.InvokeId,
Expand All @@ -149,15 +170,17 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
})
timeout := int(server.delegate.GetInvokeTimeout().Seconds())
isErr := false
status := ""
if err != nil {
switch {
case errors.Is(err, rapidcore.ErrInvokeTimeout):
log.Debugf("Got invoke timeout")
isErr = true
status = "Status: timeout"
errorResponse := ErrorResponse{
ErrorType: "Sandbox.Timedout",
ErrorMessage: fmt.Sprintf(
"%s %s Task timed out after %d.00 seconds",
time.Now().Format("2006-01-02T15:04:05Z"),
"RequestId: %s Error: Task timed out after %d.00 seconds",
invokeR.InvokeId,
timeout,
),
Expand Down Expand Up @@ -186,7 +209,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
}
timeoutDuration := time.Duration(timeout) * time.Second
memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector)
PrintEndReports(invokeR.InvokeId, initDuration, status, memorySize, invokeStart, timeoutDuration, logCollector)

if err2 := server.localStackAdapter.SendLogs(invokeR.InvokeId, logCollector.getLogs()); err2 != nil {
log.Error("failed to send logs to LocalStack: ", err2)
Expand Down Expand Up @@ -219,12 +242,45 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
return c.delegate.SendErrorResponse(invokeID, resp)
}

// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT.
// SendInitErrorResponse forwards the init error to LocalStack and then propagates it to the delegate.
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {
log.Traceln("SendInitErrorResponse called")
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.")

// Deserialize the raw payload so we can include the requestId and structured fields.
var parsed struct {
ErrorMessage string `json:"errorMessage"`
ErrorType string `json:"errorType"`
StackTrace []string `json:"stackTrace,omitempty"`
}
if err := json.Unmarshal(resp.Payload, &parsed); err != nil {
log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload")
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
Error("Failed to send init error to LocalStack")
}
return c.delegate.SendInitErrorResponse(resp)
}

requestId := c.delegate.GetCurrentInvokeID()
adaptedResp := ErrorResponse{
ErrorMessage: parsed.ErrorMessage,
ErrorType: parsed.ErrorType,
RequestId: &requestId,
StackTrace: parsed.StackTrace,
}
body, err := json.Marshal(adaptedResp)
if err != nil {
log.WithError(err).Error("Failed to marshal adapted init error response")
body = resp.Payload
}

go func() {
if err := c.localStackAdapter.SendStatus(Error, body); err != nil {
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
Error("Failed to send init error to LocalStack")
}
}()

return c.delegate.SendInitErrorResponse(resp)
}

Expand All @@ -240,6 +296,7 @@ func (c *CustomInteropServer) SendRuntimeReady() error {

func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error {
log.Traceln("Init called")
c.initStart = time.Now()
return c.delegate.Init(i, invokeTimeoutMs)
}

Expand Down
62 changes: 62 additions & 0 deletions cmd/localstack/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"encoding/json"
"fmt"
"sync"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone/telemetry"
"github.com/google/uuid"
)

// LocalStackEventsAPI intercepts fault events and forwards them to LocalStack as error status callbacks.
type LocalStackEventsAPI struct {
*telemetry.StandaloneEventsAPI
adapter *LocalStackAdapter
requestID string
mu sync.RWMutex
}

func NewLocalStackEventsAPI(adapter *LocalStackAdapter) *LocalStackEventsAPI {
return &LocalStackEventsAPI{
adapter: adapter,
StandaloneEventsAPI: new(telemetry.StandaloneEventsAPI),
}
}

func (ev *LocalStackEventsAPI) SendFault(data interop.FaultData) error {
_ = ev.StandaloneEventsAPI.SendFault(data)

requestID := string(data.RequestID)
if requestID == "" {
ev.mu.RLock()
requestID = ev.requestID
ev.mu.RUnlock()
}
if requestID == "" {
// No invocation is active during the init phase (LocalStack only dispatches an invoke
// after the runtime reports ready), so synthesize an ID to preserve AWS's
// "RequestId: <uuid> Error: ..." message format.
requestID = uuid.NewString()
}

resp := ErrorResponse{
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, data.ErrorMessage),
ErrorType: string(data.ErrorType),
}

payload, err := json.Marshal(resp)
if err != nil {
return err
}

return ev.adapter.SendStatus(Error, payload)
}

func (ev *LocalStackEventsAPI) SetCurrentRequestID(id interop.RequestID) {
ev.mu.Lock()
defer ev.mu.Unlock()
ev.requestID = string(id)
ev.StandaloneEventsAPI.SetCurrentRequestID(id)
}
22 changes: 20 additions & 2 deletions cmd/localstack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,36 @@ func main() {
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
tracer := NewLocalStackTracer()

// Create LocalStack adapter upfront so it can be shared with the events API and interop server
lsAdapter := &LocalStackAdapter{
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
RuntimeId: lsOpts.RuntimeId,
}

// Events API forwards runtime fault events (unexpected exits) to LocalStack as error callbacks
lsEventsAPI := NewLocalStackEventsAPI(lsAdapter)

// Supervisor intercepts runtime process terminations and emits fault events via the events API
supervisorCtx, cancelSupervisor := context.WithCancel(context.Background())

localStackSupv := NewLocalStackSupervisor(supervisorCtx, lsEventsAPI)

// build sandbox
sandbox := rapidcore.
NewSandboxBuilder().
//SetTracer(tracer).
AddShutdownFunc(func() {
log.Debugln("Stopping file watcher")
cancelFileWatcher()
log.Debugln("Stopping supervisor")
cancelSupervisor()
}).
SetExtensionsFlag(true).
SetInitCachingFlag(true).
SetLogsEgressAPI(localStackLogsEgressApi).
SetTracer(tracer)
SetTracer(tracer).
SetEventsAPI(lsEventsAPI).
SetSupervisor(localStackSupv)

// Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable.
// We need to ensure the runtime server is up before the INIT phase,
Expand All @@ -211,7 +229,7 @@ func main() {
runDaemon(d) // async

defaultInterop := sandbox.DefaultInteropServer()
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
interopServer := NewCustomInteropServer(lsOpts, lsAdapter, defaultInterop, logCollector)
sandbox.SetInteropServer(interopServer)
if len(handler) > 0 {
sandbox.SetHandler(handler)
Expand Down
Loading