Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion service/common/blobstore/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,23 @@ func (b *blobStoreImpl) WriteObject(ctx context.Context, workflowId, data string
err = putObject(ctx, b.s3Client, b.activeStorage.S3Bucket, b.pathPrefix+path, data)
if err != nil {
b.writeObjectErrorCounter.Inc(1)
var re s3.ResponseError
if errors.As(err, &re) {
b.logger.Error("PutObject S3 API error",
tag.Key("requestId"), tag.Value(re.ServiceRequestID()),
tag.Key("hostId"), tag.Value(re.ServiceHostID()),
tag.Key("bucket"), tag.Value(b.activeStorage.S3Bucket),
tag.Key("workflowId"), tag.Value(workflowId),
tag.Error(err))
err = fmt.Errorf("failed to write object (requestId=%s, hostId=%s): %w",
re.ServiceRequestID(), re.ServiceHostID(), err)
} else {
b.logger.Error("PutObject error",
tag.Key("bucket"), tag.Value(b.activeStorage.S3Bucket),
tag.Key("workflowId"), tag.Value(workflowId),
tag.Error(err))
err = fmt.Errorf("failed to write object: %w", err)
}
return
}
b.writeObjectSuccessHistogram.Record(time.Duration(len(data)))
Expand All @@ -105,7 +122,24 @@ func (b *blobStoreImpl) ReadObject(ctx context.Context, storeId, path string) (s
data, err := getObject(ctx, b.s3Client, storeConfig.S3Bucket, b.pathPrefix+path)
if err != nil {
b.readObjectErrorCounter.Inc(1)
return "", err
var re s3.ResponseError
if errors.As(err, &re) {
b.logger.Error("GetObject S3 API error",
tag.Key("requestId"), tag.Value(re.ServiceRequestID()),
tag.Key("hostId"), tag.Value(re.ServiceHostID()),
tag.Key("bucket"), tag.Value(storeConfig.S3Bucket),
tag.Key("path"), tag.Value(path),
tag.Key("storeId"), tag.Value(storeId),
tag.Error(err))
return "", fmt.Errorf("failed to read object (requestId=%s, hostId=%s): %w",
re.ServiceRequestID(), re.ServiceHostID(), err)
}
b.logger.Error("GetObject error",
tag.Key("bucket"), tag.Value(storeConfig.S3Bucket),
tag.Key("path"), tag.Value(path),
tag.Key("storeId"), tag.Value(storeId),
tag.Error(err))
return "", fmt.Errorf("failed to read object: %w", err)
}
b.readObjectSuccessHistogram.Record(time.Duration(len(data)))
return data, nil
Expand Down
109 changes: 105 additions & 4 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package interpreter

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -59,13 +60,27 @@ func StateApiWaitUntil(
if input.Request.StateInput != nil && input.Request.StateInput.ExtStoreId != nil {
_, err = loadStateInputFromExternalStorage(ctx, input.Request.StateInput)
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiWaitUntil local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}
}

// Load data attributes from external storage
err = blobstore.LoadDataObjectsFromExternalStorage(ctx, input.Request.DataObjects, env.GetBlobStore())
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiWaitUntil local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}

Expand Down Expand Up @@ -93,6 +108,13 @@ func StateApiWaitUntil(
Details: &errDetails,
},
})
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiWaitUntil local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, stateStartErr
}

Expand All @@ -115,6 +137,13 @@ func StateApiWaitUntil(
Details: &errDetails,
},
})
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiWaitUntil local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, stateStartErr
}

Expand All @@ -125,9 +154,16 @@ func StateApiWaitUntil(
resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId())
}

if env.GetSharedConfig().ExternalStorage.Enabled {
if env.GetSharedConfig().ExternalStorage.Enabled && env.GetBlobStore() != nil {
err = blobstore.WriteDataObjectsToExternalStorage(ctx, resp.UpsertDataObjects, activityInfo.WorkflowExecution.ID, env.GetSharedConfig().ExternalStorage.ThresholdInBytes, env.GetBlobStore(), env.GetSharedConfig().ExternalStorage.Enabled)
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiWaitUntil local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}
}
Expand All @@ -143,6 +179,13 @@ func StateApiWaitUntil(
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
SearchAttributes: searchAttributes,
})
if activityInfo.IsLocalActivity {
respBytes, _ := json.Marshal(resp)
logger.Warn("StateApiWaitUntil local activity return on success",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(respBytes))
}
return resp, nil
}

Expand Down Expand Up @@ -187,13 +230,27 @@ func StateApiExecute(
if input.Request.StateInput != nil && input.Request.StateInput.ExtStoreId != nil {
wholeStateInputCopy, err = loadStateInputFromExternalStorage(ctx, input.Request.StateInput)
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiExecute local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}
}

// Load data attributes from external storage
err = blobstore.LoadDataObjectsFromExternalStorage(ctx, input.Request.DataObjects, env.GetBlobStore())
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiExecute local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}

Expand Down Expand Up @@ -233,6 +290,13 @@ func StateApiExecute(
Details: &errDetails,
},
})
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiExecute local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, stateApiExecuteErr
}

Expand All @@ -255,6 +319,13 @@ func StateApiExecute(
Details: &errDetails,
},
})
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiExecute local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, stateApiExecuteErr
}

Expand All @@ -265,13 +336,28 @@ func StateApiExecute(
resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId())
}

if env.GetSharedConfig().ExternalStorage.Enabled {
// Externalize only when enabled and blob store is available (nil when e.g. STAGING_LEVEL was empty at worker start).
if env.GetSharedConfig().ExternalStorage.Enabled && env.GetBlobStore() != nil {
resp.StateDecision.NextStates, err = writeNextStateInputsToExternalStorage(ctx, resp.StateDecision.NextStates, wholeStateInputCopy, activityInfo.WorkflowExecution.ID)
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiExecute local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}
err = blobstore.WriteDataObjectsToExternalStorage(ctx, resp.UpsertDataObjects, activityInfo.WorkflowExecution.ID, env.GetSharedConfig().ExternalStorage.ThresholdInBytes, env.GetBlobStore(), env.GetSharedConfig().ExternalStorage.Enabled)
if err != nil {
if activityInfo.IsLocalActivity {
reqBytes, _ := json.Marshal(input.Request)
logger.Warn("StateApiExecute local activity return on error",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(reqBytes))
}
return nil, err
}
}
Expand All @@ -287,6 +373,13 @@ func StateApiExecute(
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
SearchAttributes: input.Request.SearchAttributes,
})
if activityInfo.IsLocalActivity {
respBytes, _ := json.Marshal(resp)
logger.Warn("StateApiExecute local activity return on success",
"workflowId", activityInfo.WorkflowExecution.ID,
"stateExecutionId", input.Request.Context.GetStateExecutionId(),
"payloadSize", len(respBytes))
}
return resp, nil
}

Expand Down Expand Up @@ -466,14 +559,22 @@ func InvokeWorkerRpc(
provider := interfaces.GetActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("InvokeWorkerRpcActivity", "input", log.ToJsonAndTruncateForLogging(req))
activityInfo := provider.GetActivityInfo(ctx)

apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds

resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds, env.GetBlobStore(), env.GetSharedConfig().ExternalStorage)
return &interfaces.InvokeRpcActivityOutput{
output := &interfaces.InvokeRpcActivityOutput{
RpcOutput: resp,
StatusError: statusErr,
}, nil
}
if activityInfo.IsLocalActivity {
outputBytes, _ := json.Marshal(output)
logger.Warn("InvokeWorkerRpc local activity return",
"workflowId", activityInfo.WorkflowExecution.ID,
"payloadSize", len(outputBytes))
}
return output, nil
}

func writeNextStateInputsToExternalStorage(ctx context.Context, nextStates []iwfidl.StateMovement, currentInputCopy *iwfidl.EncodedObject, workflowId string) ([]iwfidl.StateMovement, error) {
Expand Down
Loading