Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package audience

import (
"archive/zip"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -121,7 +122,7 @@ var _ = Describe("Bing ads Audience", func() {
}

// making upload function call
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
received.ImportingParameters = json.RawMessage{}

// Remove the directory and its contents
Expand Down Expand Up @@ -178,7 +179,7 @@ var _ = Describe("Bing ads Audience", func() {
return
}
GinkgoT().Setenv("RUDDER_TMPDIR", dir)
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
err = os.RemoveAll(dir)
if err != nil {
fmt.Printf("Failed to remove the temporary directory: %v\n", err)
Expand Down Expand Up @@ -232,7 +233,7 @@ var _ = Describe("Bing ads Audience", func() {
return
}
GinkgoT().Setenv("RUDDER_TMPDIR", dir)
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
err = os.RemoveAll(dir)
if err != nil {
fmt.Printf("Failed to remove the temporary directory: %v\n", err)
Expand Down Expand Up @@ -295,7 +296,7 @@ var _ = Describe("Bing ads Audience", func() {
DestinationID: destination.ID,
ImportingParameters: json.RawMessage(importParameters),
}
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)

// Remove the directory and its contents
err = os.RemoveAll(dir)
Expand Down Expand Up @@ -325,7 +326,7 @@ var _ = Describe("Bing ads Audience", func() {
Complete: true,
StatusCode: 200,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)
Expect(recievedResponse).To(Equal(expectedResp))
})

Expand All @@ -344,7 +345,7 @@ var _ = Describe("Bing ads Audience", func() {
StatusCode: 500,
HasFailed: true,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)
Expect(recievedResponse).To(Equal(expectedResp))
})

Expand All @@ -370,7 +371,7 @@ var _ = Describe("Bing ads Audience", func() {
HasFailed: true,
FailedJobParameters: "https://dummy.url.com",
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand All @@ -397,7 +398,7 @@ var _ = Describe("Bing ads Audience", func() {
InProgress: true,
StatusCode: 200,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand All @@ -424,7 +425,7 @@ var _ = Describe("Bing ads Audience", func() {
HasFailed: true,
StatusCode: 500,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand Down Expand Up @@ -459,7 +460,7 @@ var _ = Describe("Bing ads Audience", func() {
StatusCode: 500,
FailedJobParameters: ",", // empty file
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand Down Expand Up @@ -624,7 +625,7 @@ var _ = Describe("Bing ads Audience", func() {
}

// making upload function call
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
received.ImportingParameters = json.RawMessage{}

// Remove the directory and its contents
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package audience

import (
"context"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -43,7 +44,7 @@ This function create at most 3 zip files from the text file created by the batch
It takes the text file path as input and returns the zip file path
The maximum size of the zip file is 100MB, if the size of the zip file exceeds 100MB then the job is marked as failed
*/
func (b *BingAdsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
func (b *BingAdsBulkUploader) Upload(_ context.Context, asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
var failedJobs []int64
var successJobs []int64
Expand Down Expand Up @@ -165,7 +166,7 @@ func (b *BingAdsBulkUploader) pollSingleImport(requestId string) common.PollStat
}
}

func (b *BingAdsBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
func (b *BingAdsBulkUploader) Poll(_ context.Context, pollInput common.AsyncPoll) common.PollStatusResponse {
var cumulativeResp common.PollStatusResponse
var completionStatus []bool
var failedJobURLs []string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package offline_conversions

import (
"archive/zip"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -121,7 +122,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
}

// making upload function call
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
received.ImportingParameters = json.RawMessage{}

// Remove the directory and its contents
Expand Down Expand Up @@ -167,7 +168,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
err = os.Mkdir(subDir, 0o755)
Expect(err).ShouldNot(HaveOccurred(), "Creating the directory 'something'")
GinkgoT().Setenv("RUDDER_TMPDIR", dir)
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
err = os.RemoveAll(dir)
Expect(err).ShouldNot(HaveOccurred(), "removing temporary directory")
Expect(received).To(Equal(expected))
Expand Down Expand Up @@ -212,7 +213,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
err = os.Mkdir(subDir, 0o755)
Expect(err).ShouldNot(HaveOccurred(), "Creating the directory 'something'")
GinkgoT().Setenv("RUDDER_TMPDIR", dir)
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)
err = os.RemoveAll(dir)
Expect(err).ShouldNot(HaveOccurred(), "removing temporary directory")
Expect(received).To(Equal(expected))
Expand Down Expand Up @@ -267,7 +268,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
DestinationID: destination.ID,
ImportingParameters: json.RawMessage(importParameters),
}
received := bulkUploader.Upload(&asyncDestination)
received := bulkUploader.Upload(context.Background(), &asyncDestination)

// Remove the directory and its contents
err = os.RemoveAll(dir)
Expand All @@ -293,7 +294,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
Complete: true,
StatusCode: 200,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)
Expect(recievedResponse).To(Equal(expectedResp))
})

Expand All @@ -311,7 +312,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
StatusCode: 500,
HasFailed: true,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)
Expect(recievedResponse).To(Equal(expectedResp))
})

Expand All @@ -336,7 +337,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
HasFailed: true,
FailedJobParameters: "https://dummy.url.com",
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand All @@ -362,7 +363,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
InProgress: true,
StatusCode: 200,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand All @@ -388,7 +389,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
HasFailed: true,
StatusCode: 500,
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand Down Expand Up @@ -422,7 +423,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
StatusCode: 500,
FailedJobParameters: ",", // empty file
}
recievedResponse := bulkUploader.Poll(pollInput)
recievedResponse := bulkUploader.Poll(context.Background(), pollInput)

os.Remove(expectedResp.FailedJobParameters)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package offline_conversions

import (
"context"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -150,7 +151,7 @@ This function create at most 3 zip files from the text file created by the batch
It takes the text file path as input and returns the zip file path
The maximum size of the zip file is 100MB, if the size of the zip file exceeds 100MB then the job is marked as failed
*/
func (b *BingAdsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
func (b *BingAdsBulkUploader) Upload(_ context.Context, asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
var failedJobs []int64
var successJobs []int64
Expand Down Expand Up @@ -276,7 +277,7 @@ func (b *BingAdsBulkUploader) pollSingleImport(requestId string) common.PollStat
}
}

func (b *BingAdsBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
func (b *BingAdsBulkUploader) Poll(_ context.Context, pollInput common.AsyncPoll) common.PollStatusResponse {
var cumulativeResp common.PollStatusResponse
var completionStatus []bool
var failedJobURLs []string
Expand Down
11 changes: 6 additions & 5 deletions router/batchrouter/asyncdestinationmanager/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
stdjson "encoding/json"
"net/http"
"sync"
Expand All @@ -14,25 +15,25 @@ import (
)

type AsyncUploadAndTransformManager interface {
Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput
Upload(ctx context.Context, asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput
Transform(job *jobsdb.JobT) (string, error)
}

type AsyncDestinationManager interface {
AsyncUploadAndTransformManager
Poll(pollInput AsyncPoll) PollStatusResponse
Poll(ctx context.Context, pollInput AsyncPoll) PollStatusResponse
GetUploadStats(UploadStatsInput GetUploadStatsInput) GetUploadStatsResponse
}

type SimpleAsyncDestinationManager struct {
UploaderAndTransformer AsyncUploadAndTransformManager
}

func (m SimpleAsyncDestinationManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
return m.UploaderAndTransformer.Upload(asyncDestStruct)
func (m SimpleAsyncDestinationManager) Upload(ctx context.Context, asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
return m.UploaderAndTransformer.Upload(ctx, asyncDestStruct)
}

func (m SimpleAsyncDestinationManager) Poll(AsyncPoll) PollStatusResponse {
func (m SimpleAsyncDestinationManager) Poll(_ context.Context, _ AsyncPoll) PollStatusResponse {
return PollStatusResponse{
StatusCode: http.StatusOK,
Complete: true,
Expand Down
5 changes: 3 additions & 2 deletions router/batchrouter/asyncdestinationmanager/common/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"errors"
"fmt"

Expand All @@ -15,7 +16,7 @@ func (*InvalidManager) Transform(job *jobsdb.JobT) (string, error) {
return "", errors.New("invalid job")
}

func (*InvalidManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
func (*InvalidManager) Upload(_ context.Context, asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
abortedJobIDs := append(asyncDestStruct.ImportingJobIDs, asyncDestStruct.FailedJobIDs...)
return AsyncUploadOutput{
AbortJobIDs: abortedJobIDs,
Expand All @@ -26,7 +27,7 @@ func (*InvalidManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUplo
}
}

func (*InvalidManager) Poll(_ AsyncPoll) PollStatusResponse {
func (*InvalidManager) Poll(_ context.Context, _ AsyncPoll) PollStatusResponse {
return PollStatusResponse{
StatusCode: 400,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eloqua

import (
"context"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -30,7 +31,7 @@ func (*EloquaBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(gjson.GetBytes(job.EventPayload, "body.JSON").String(), job.JobID)
}

func (b *EloquaBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
func (b *EloquaBulkUploader) Upload(_ context.Context, asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
uploadRetryableStat := b.statsFactory.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{
"module": "batch_router",
Expand Down Expand Up @@ -147,7 +148,7 @@ func (b *EloquaBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStru
}
}

func (b *EloquaBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
func (b *EloquaBulkUploader) Poll(_ context.Context, pollInput common.AsyncPoll) common.PollStatusResponse {
importIds := strings.Split(pollInput.ImportId, ":")
checkSyncStatusData := HttpRequestData{
DynamicPart: importIds[0],
Expand Down
Loading
Loading