diff --git a/config/config.go b/config/config.go index 522254b7..ff6d0672 100644 --- a/config/config.go +++ b/config/config.go @@ -111,6 +111,9 @@ type ( InterpreterActivityConfig InterpreterActivityConfig `yaml:"interpreterActivityConfig"` VerboseDebug bool FailAtMemoIncompatibility bool + // LogLocalActivityThresholdBytes enables warn-level logging of local activity inputs/outputs when the + // serialized payload size meets or exceeds this value. Set to 0 (default) to disable. + LogLocalActivityThresholdBytes int `yaml:"logLocalActivityThresholdBytes"` } TemporalConfig struct { diff --git a/service/common/blobstore/store_impl.go b/service/common/blobstore/store_impl.go index 0f94cf4e..06cd0b13 100644 --- a/service/common/blobstore/store_impl.go +++ b/service/common/blobstore/store_impl.go @@ -90,6 +90,23 @@ func (b *blobStoreImpl) WriteObject(ctx context.Context, workflowId, data string err = putObject(ctx, b.s3Client, b.activeStorage.S3Bucket, b.pathPrefix+path, data) if err != nil { b.writeObjectErrorCounter.Inc(1) + var re s3.ResponseError + if errors.As(err, &re) { + b.logger.Error("PutObject S3 API error", + tag.Key("requestId"), tag.Value(re.ServiceRequestID()), + tag.Key("hostId"), tag.Value(re.ServiceHostID()), + tag.Key("bucket"), tag.Value(b.activeStorage.S3Bucket), + tag.Key("workflowId"), tag.Value(workflowId), + tag.Error(err)) + err = fmt.Errorf("failed to write object (requestId=%s, hostId=%s): %w", + re.ServiceRequestID(), re.ServiceHostID(), err) + } else { + b.logger.Error("PutObject error", + tag.Key("bucket"), tag.Value(b.activeStorage.S3Bucket), + tag.Key("workflowId"), tag.Value(workflowId), + tag.Error(err)) + err = fmt.Errorf("failed to write object: %w", err) + } return } b.writeObjectSuccessHistogram.Record(time.Duration(len(data))) @@ -105,7 +122,24 @@ func (b *blobStoreImpl) ReadObject(ctx context.Context, storeId, path string) (s data, err := getObject(ctx, b.s3Client, storeConfig.S3Bucket, b.pathPrefix+path) if err != nil { b.readObjectErrorCounter.Inc(1) - return "", err + var re s3.ResponseError + if errors.As(err, &re) { + b.logger.Error("GetObject S3 API error", + tag.Key("requestId"), tag.Value(re.ServiceRequestID()), + tag.Key("hostId"), tag.Value(re.ServiceHostID()), + tag.Key("bucket"), tag.Value(storeConfig.S3Bucket), + tag.Key("path"), tag.Value(path), + tag.Key("storeId"), tag.Value(storeId), + tag.Error(err)) + return "", fmt.Errorf("failed to read object (requestId=%s, hostId=%s): %w", + re.ServiceRequestID(), re.ServiceHostID(), err) + } + b.logger.Error("GetObject error", + tag.Key("bucket"), tag.Value(storeConfig.S3Bucket), + tag.Key("path"), tag.Value(path), + tag.Key("storeId"), tag.Value(storeId), + tag.Error(err)) + return "", fmt.Errorf("failed to read object: %w", err) } b.readObjectSuccessHistogram.Record(time.Duration(len(data))) return data, nil diff --git a/service/common/blobstore/store_impl_test.go b/service/common/blobstore/store_impl_test.go index 87275867..da439dea 100644 --- a/service/common/blobstore/store_impl_test.go +++ b/service/common/blobstore/store_impl_test.go @@ -275,5 +275,10 @@ func TestBlobStoreIntegration(t *testing.T) { err = blobStore.DeleteWorkflowObjects(ctx, "invalid-store-id", "some-workflow-path") assert.Error(t, err) assert.Contains(t, err.Error(), "store not found") + + // Test reading a non-existent key from a valid store triggers the new error wrapping + _, err = blobStore.ReadObject(ctx, testStorageId, "nonexistent/path/that/does/not/exist") + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to read object") }) } diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index df9d39b5..2b987b96 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -2,6 +2,7 @@ package interpreter import ( "context" + "encoding/json" "fmt" "io" "net/http" @@ -39,9 +40,9 @@ func StateApiWaitUntil( logger.Info("StateWaitUntilActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) - svcCfg := env.GetSharedConfig() + sharedCfg := env.GetSharedConfig() apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ - DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeaders, + DefaultHeader: sharedCfg.Interpreter.InterpreterActivityConfig.DefaultHeaders, Servers: []iwfidl.ServerConfiguration{ { URL: iwfWorkerBaseUrl, @@ -59,6 +60,15 @@ func StateApiWaitUntil( if input.Request.StateInput != nil && input.Request.StateInput.ExtStoreId != nil { _, err = loadStateInputFromExternalStorage(ctx, input.Request.StateInput) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiWaitUntil local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } } @@ -66,6 +76,15 @@ func StateApiWaitUntil( // Load data attributes from external storage err = blobstore.LoadDataObjectsFromExternalStorage(ctx, input.Request.DataObjects, env.GetBlobStore()) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiWaitUntil local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } @@ -93,6 +112,15 @@ func StateApiWaitUntil( Details: &errDetails, }, }) + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiWaitUntil local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, stateStartErr } @@ -115,6 +143,15 @@ func StateApiWaitUntil( Details: &errDetails, }, }) + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiWaitUntil local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, stateStartErr } @@ -125,9 +162,18 @@ func StateApiWaitUntil( resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId()) } - if env.GetSharedConfig().ExternalStorage.Enabled { + if env.GetSharedConfig().ExternalStorage.Enabled && env.GetBlobStore() != nil { err = blobstore.WriteDataObjectsToExternalStorage(ctx, resp.UpsertDataObjects, activityInfo.WorkflowExecution.ID, env.GetSharedConfig().ExternalStorage.ThresholdInBytes, env.GetBlobStore(), env.GetSharedConfig().ExternalStorage.Enabled) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiWaitUntil local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } } @@ -143,6 +189,15 @@ func StateApiWaitUntil( EndTimestampInMs: ptr.Any(time.Now().UnixMilli()), SearchAttributes: searchAttributes, }) + if activityInfo.IsLocalActivity { + respBytes, _ := json.Marshal(resp) + if threshold := sharedCfg.Interpreter.LogLocalActivityThresholdBytes; threshold > 0 && len(respBytes) >= threshold { + logger.Warn("StateApiWaitUntil local activity return on success", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(respBytes)) + } + } return resp, nil } @@ -166,9 +221,9 @@ func StateApiExecute( logger.Info("StateExecuteActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) - svcCfg := env.GetSharedConfig() + sharedCfg := env.GetSharedConfig() apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ - DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeaders, + DefaultHeader: sharedCfg.Interpreter.InterpreterActivityConfig.DefaultHeaders, Servers: []iwfidl.ServerConfiguration{ { URL: iwfWorkerBaseUrl, @@ -187,6 +242,15 @@ func StateApiExecute( if input.Request.StateInput != nil && input.Request.StateInput.ExtStoreId != nil { wholeStateInputCopy, err = loadStateInputFromExternalStorage(ctx, input.Request.StateInput) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiExecute local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } } @@ -194,6 +258,15 @@ func StateApiExecute( // Load data attributes from external storage err = blobstore.LoadDataObjectsFromExternalStorage(ctx, input.Request.DataObjects, env.GetBlobStore()) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiExecute local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } @@ -233,6 +306,15 @@ func StateApiExecute( Details: &errDetails, }, }) + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiExecute local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, stateApiExecuteErr } @@ -255,6 +337,15 @@ func StateApiExecute( Details: &errDetails, }, }) + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiExecute local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, stateApiExecuteErr } @@ -265,13 +356,32 @@ func StateApiExecute( resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId()) } - if env.GetSharedConfig().ExternalStorage.Enabled { + // Externalize only when enabled and blob store is available (nil when e.g. STAGING_LEVEL was empty at worker start). + if env.GetSharedConfig().ExternalStorage.Enabled && env.GetBlobStore() != nil { resp.StateDecision.NextStates, err = writeNextStateInputsToExternalStorage(ctx, resp.StateDecision.NextStates, wholeStateInputCopy, activityInfo.WorkflowExecution.ID) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiExecute local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } err = blobstore.WriteDataObjectsToExternalStorage(ctx, resp.UpsertDataObjects, activityInfo.WorkflowExecution.ID, env.GetSharedConfig().ExternalStorage.ThresholdInBytes, env.GetBlobStore(), env.GetSharedConfig().ExternalStorage.Enabled) if err != nil { + if activityInfo.IsLocalActivity { + reqBytes, _ := json.Marshal(input.Request) + if sharedCfg.Interpreter.LogLocalActivityThresholdBytes > 0 { + logger.Warn("StateApiExecute local activity return on error", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(reqBytes)) + } + } return nil, err } } @@ -287,6 +397,15 @@ func StateApiExecute( EndTimestampInMs: ptr.Any(time.Now().UnixMilli()), SearchAttributes: input.Request.SearchAttributes, }) + if activityInfo.IsLocalActivity { + respBytes, _ := json.Marshal(resp) + if threshold := sharedCfg.Interpreter.LogLocalActivityThresholdBytes; threshold > 0 && len(respBytes) >= threshold { + logger.Warn("StateApiExecute local activity return on success", + "workflowId", activityInfo.WorkflowExecution.ID, + "stateExecutionId", input.Request.Context.GetStateExecutionId(), + "payloadSize", len(respBytes)) + } + } return resp, nil } @@ -438,11 +557,11 @@ func DumpWorkflowInternal( logger := provider.GetLogger(ctx) logger.Info("DumpWorkflowInternalActivity", "input", log.ToJsonAndTruncateForLogging(req)) - svcCfg := env.GetSharedConfig() - apiAddress := svcCfg.GetApiServiceAddressWithDefault() + sharedCfg := env.GetSharedConfig() + apiAddress := sharedCfg.GetApiServiceAddressWithDefault() apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ - DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeaders, + DefaultHeader: sharedCfg.Interpreter.InterpreterActivityConfig.DefaultHeaders, Servers: []iwfidl.ServerConfiguration{ { URL: apiAddress, @@ -466,14 +585,25 @@ func InvokeWorkerRpc( provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("InvokeWorkerRpcActivity", "input", log.ToJsonAndTruncateForLogging(req)) + activityInfo := provider.GetActivityInfo(ctx) + sharedCfg := env.GetSharedConfig() - apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds + apiMaxSeconds := sharedCfg.Api.MaxWaitSeconds - resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds, env.GetBlobStore(), env.GetSharedConfig().ExternalStorage) - return &interfaces.InvokeRpcActivityOutput{ + resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds, env.GetBlobStore(), sharedCfg.ExternalStorage) + output := &interfaces.InvokeRpcActivityOutput{ RpcOutput: resp, StatusError: statusErr, - }, nil + } + if activityInfo.IsLocalActivity { + outputBytes, _ := json.Marshal(output) + if threshold := sharedCfg.Interpreter.LogLocalActivityThresholdBytes; threshold > 0 && len(outputBytes) >= threshold { + logger.Warn("InvokeWorkerRpc local activity return", + "workflowId", activityInfo.WorkflowExecution.ID, + "payloadSize", len(outputBytes)) + } + } + return output, nil } func writeNextStateInputsToExternalStorage(ctx context.Context, nextStates []iwfidl.StateMovement, currentInputCopy *iwfidl.EncodedObject, workflowId string) ([]iwfidl.StateMovement, error) {