Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions integration_test/srchydration/src_hydration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"

"github.com/rudderlabs/rudder-server/processor/isolation"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/transformer"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
webhookutil "github.com/rudderlabs/rudder-server/testhelper/webhook"

_ "github.com/marcboeker/go-duckdb"
"github.com/samber/lo"

proctypes "github.com/rudderlabs/rudder-server/processor/types"
Expand Down Expand Up @@ -111,7 +114,7 @@ func TestSrcHydration(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, tr.TransformerURL, t.TempDir())
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, tr.TransformerURL, t.TempDir(), nil)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
Expand Down Expand Up @@ -221,6 +224,9 @@ func TestSrcHydration(t *testing.T) {
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

minioResource, err := minio.Setup(pool, t)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -229,7 +235,7 @@ func TestSrcHydration(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir())
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir(), minioResource)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
Expand Down Expand Up @@ -280,6 +286,12 @@ func TestSrcHydration(t *testing.T) {
if tt.failOnHydrationFailure {
expectedReports = append(expectedReports, prepareSrcHydrationFailedReports(t, sourceID3, numEvents)...)
expectedReports = append(expectedReports, prepareExpectedReports(t, sourceID3, true, numEvents)...)
requireJobsCount(t, ctx, postgresContainer.DB, "err_idx", jobsdb.Succeeded.State, numEvents)
requireMessagesCount(t, ctx, minioResource, numEvents, []lo.Tuple2[string, string]{
{A: "source_id", B: sourceID3},
{A: "failed_stage", B: "source_hydration"},
{A: "event_type", B: "identify"},
}...)
}
requireReports(t, ctx, postgresContainer.DB, expectedReports)

Expand Down Expand Up @@ -507,14 +519,7 @@ func prepareBackendConfigServer(t *testing.T, webhookURL string, internalSecret
Build()
}

func runRudderServer(
t testing.TB,
ctx context.Context,
port int,
postgresContainer *postgres.Resource,
cbURL, transformerURL,
tmpDir string,
) (err error) {
func runRudderServer(t testing.TB, ctx context.Context, port int, postgresContainer *postgres.Resource, cbURL, transformerURL, tmpDir string, minioResource *minio.Resource) (err error) {
config.Reset()
t.Setenv("CONFIG_BACKEND_URL", cbURL)
t.Setenv("WORKSPACE_TOKEN", "token")
Expand Down Expand Up @@ -544,6 +549,18 @@ func runRudderServer(
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Processor.archiveInPreProcess"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Processor.SourceHydration.maxRetry"), "2")
if minioResource != nil {
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.errorIndexReporting.enabled"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.errorIndexReporting.SleepDuration"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.errorIndexReporting.minWorkerSleep"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.errorIndexReporting.uploadFrequency"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "ErrorIndex.storage.Bucket"), minioResource.BucketName)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "ErrorIndex.storage.Endpoint"), fmt.Sprintf("http://%s", minioResource.Endpoint))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "ErrorIndex.storage.AccessKey"), minioResource.AccessKeyID)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "ErrorIndex.storage.SecretAccessKey"), minioResource.AccessKeySecret)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "ErrorIndex.storage.S3ForcePathStyle"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "ErrorIndex.storage.DisableSSL"), "true")
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panicked: %v", r)
Expand Down Expand Up @@ -637,3 +654,41 @@ func requireJobsCount(
"%d %s events should be in %s state", expectedCount, queue, state,
)
}

// nolint: unparam
func requireMessagesCount(
t *testing.T,
ctx context.Context,
mr *minio.Resource,
expectedCount int,
filters ...lo.Tuple2[string, string],
) {
t.Helper()

db, err := sql.Open("duckdb", "")
require.NoError(t, err)

_, err = db.Exec(fmt.Sprintf(`INSTALL parquet; LOAD parquet; INSTALL httpfs; LOAD httpfs;SET s3_region='%s';SET s3_endpoint='%s';SET s3_access_key_id='%s';SET s3_secret_access_key='%s';SET s3_use_ssl= false;SET s3_url_style='path';`,
mr.Region,
mr.Endpoint,
mr.AccessKeyID,
mr.AccessKeySecret,
))
require.NoError(t, err)

query := fmt.Sprintf("SELECT count(*) FROM read_parquet('%s') WHERE 1 = 1", fmt.Sprintf("s3://%s/**/**/**/*.parquet", mr.BucketName))
query += strings.Join(lo.Map(filters, func(t lo.Tuple2[string, string], _ int) string {
return fmt.Sprintf(" AND %s = '%s'", t.A, t.B)
}), "")

require.Eventually(t, func() bool {
var messagesCount int
require.NoError(t, db.QueryRowContext(ctx, query).Scan(&messagesCount))
t.Logf("messagesCount: %d", messagesCount)
return messagesCount == expectedCount
},
10*time.Second,
1*time.Second,
fmt.Sprintf("%d messages should be in the bucket", expectedCount),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "SourceHydration"))
handle.config.sourceHydrationURL = handle.conf.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
handle.config.logLongRunningTransformAfter = conf.GetDuration("HttpClient.procTransformer.logLongRunningTransformAfter", 600, time.Second)
handle.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.SourceHydration.maxRetry", "Processor.maxRetry")
handle.config.maxRetry = conf.GetReloadableIntVar(50, 1, "Processor.SourceHydration.maxRetry", "Processor.maxRetry")
handle.config.failOnError = conf.GetReloadableBoolVar(false, "Processor.SourceHydration.failOnError", "Processor.Transformer.failOnError")
handle.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.SourceHydration.maxRetryBackoffInterval", "Processor.maxRetryBackoffInterval")
handle.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(60, time.Second, "Processor.SourceHydration.maxRetryBackoffInterval", "Processor.maxRetryBackoffInterval")
handle.config.batchSize = conf.GetReloadableIntVar(100, 1, "Processor.SourceHydration.batchSize", "Processor.transformBatchSize")

for _, opt := range opts {
Expand Down
17 changes: 15 additions & 2 deletions processor/src_hydration_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/rudderlabs/rudder-server/utils/crash"
"github.com/rudderlabs/rudder-server/utils/misc"

kitctx "github.com/rudderlabs/rudder-go-kit/context"
Expand Down Expand Up @@ -70,7 +72,7 @@ func (proc *Handle) srcHydrationStage(partition string, message *srcHydrationMes
var sharedMapsMutex sync.Mutex
srcHydrationEnabledMap := make(map[SourceIDT]bool)
for sourceId, jobs := range message.groupedEventsBySourceId {
g.Go(func() error {
g.Go(crash.Wrapper(func() error {
var hydrationFailedReports []*reportingtypes.PUReportedMetric
source, err := proc.getSourceBySourceID(string(sourceId))
if err != nil {
Expand Down Expand Up @@ -145,7 +147,7 @@ func (proc *Handle) srcHydrationStage(partition string, message *srcHydrationMes
}
}
return nil
})
}))
}

// Wait for all goroutines to complete and check for errors
Expand Down Expand Up @@ -219,6 +221,7 @@ func (proc *Handle) getHydrationFailedReports(source *backendconfig.SourceT, job
return lo.FilterMap(jobs, func(job types.TransformerEvent, _ int) (*reportingtypes.PUReportedMetric, bool) {
eventName, _ := misc.MapLookup(job.Message, "event").(string)
eventType, _ := misc.MapLookup(job.Message, "type").(string)
receivedAt, _ := time.Parse(misc.RFC3339Milli, job.Metadata.ReceivedAt)
if _, ok := metricsMap[eventName]; !ok {
metricsMap[eventName] = make(map[string]*reportingtypes.PUReportedMetric)
}
Expand All @@ -239,11 +242,21 @@ func (proc *Handle) getHydrationFailedReports(source *backendconfig.SourceT, job
SampleEvent: sampleEvent,
EventName: eventName,
EventType: eventType,
FailedMessages: []*reportingtypes.FailedMessage{
{
MessageID: job.Metadata.MessageID,
ReceivedAt: receivedAt,
},
},
},
}
return metricsMap[eventName][eventType], true
}
metricsMap[eventName][eventType].StatusDetail.Count += 1
metricsMap[eventName][eventType].StatusDetail.FailedMessages = append(metricsMap[eventName][eventType].StatusDetail.FailedMessages, &reportingtypes.FailedMessage{
MessageID: job.Metadata.MessageID,
ReceivedAt: receivedAt,
})
return nil, false
})
}
Loading