Skip to content

Commit 608bcb5

Browse files
Remove unused functions from TaskAckManager (#4872)
1 parent dc5230f commit 608bcb5

File tree

2 files changed

+0
-236
lines changed

2 files changed

+0
-236
lines changed

service/history/replication/task_ack_manager.go

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,11 @@ import (
3434
"github.com/uber/cadence/common"
3535
"github.com/uber/cadence/common/backoff"
3636
"github.com/uber/cadence/common/cache"
37-
"github.com/uber/cadence/common/collection"
3837
"github.com/uber/cadence/common/dynamicconfig"
3938
"github.com/uber/cadence/common/log"
4039
"github.com/uber/cadence/common/log/tag"
4140
"github.com/uber/cadence/common/metrics"
4241
"github.com/uber/cadence/common/persistence"
43-
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
4442
"github.com/uber/cadence/common/quotas"
4543
"github.com/uber/cadence/common/types"
4644
exec "github.com/uber/cadence/service/history/execution"
@@ -51,7 +49,6 @@ import (
5149
var (
5250
errUnknownQueueTask = errors.New("unknown task type")
5351
errUnknownReplicationTask = errors.New("unknown replication task")
54-
defaultHistoryPageSize = 1000
5552
minReadTaskSize = 20
5653
)
5754

@@ -368,33 +365,6 @@ func (t *taskAckManagerImpl) getEventsBlob(
368365
return eventBatchBlobs[0].ToInternal(), nil
369366
}
370367

371-
func (t *taskAckManagerImpl) isNewRunNDCEnabled(
372-
ctx context.Context,
373-
domainID string,
374-
workflowID string,
375-
runID string,
376-
) (isNDCWorkflow bool, retError error) {
377-
378-
context, release, err := t.executionCache.GetOrCreateWorkflowExecution(
379-
ctx,
380-
domainID,
381-
types.WorkflowExecution{
382-
WorkflowID: workflowID,
383-
RunID: runID,
384-
},
385-
)
386-
if err != nil {
387-
return false, err
388-
}
389-
defer func() { release(retError) }()
390-
391-
mutableState, err := context.LoadWorkflowExecution(ctx)
392-
if err != nil {
393-
return false, err
394-
}
395-
return mutableState.GetVersionHistories() != nil, nil
396-
}
397-
398368
func (t *taskAckManagerImpl) readTasksWithBatchSize(
399369
ctx context.Context,
400370
readLevel int64,
@@ -422,74 +392,6 @@ func (t *taskAckManagerImpl) readTasksWithBatchSize(
422392
return tasks, len(response.NextPageToken) != 0, nil
423393
}
424394

425-
func (t *taskAckManagerImpl) getAllHistory(
426-
ctx context.Context,
427-
firstEventID int64,
428-
nextEventID int64,
429-
branchToken []byte,
430-
) (*types.History, error) {
431-
432-
// overall result
433-
shardID := t.shard.GetShardID()
434-
var historyEvents []*types.HistoryEvent
435-
historySize := 0
436-
iterator := collection.NewPagingIterator(
437-
t.getPaginationFunc(
438-
ctx,
439-
firstEventID,
440-
nextEventID,
441-
branchToken,
442-
shardID,
443-
&historySize,
444-
),
445-
)
446-
for iterator.HasNext() {
447-
event, err := iterator.Next()
448-
if err != nil {
449-
return nil, err
450-
}
451-
historyEvents = append(historyEvents, event.(*types.HistoryEvent))
452-
}
453-
t.metricsClient.RecordTimer(metrics.ReplicatorQueueProcessorScope, metrics.HistorySize, time.Duration(historySize))
454-
history := &types.History{
455-
Events: historyEvents,
456-
}
457-
return history, nil
458-
}
459-
460-
func (t *taskAckManagerImpl) getPaginationFunc(
461-
ctx context.Context,
462-
firstEventID int64,
463-
nextEventID int64,
464-
branchToken []byte,
465-
shardID int,
466-
historySize *int,
467-
) collection.PaginationFn {
468-
469-
return func(paginationToken []byte) ([]interface{}, []byte, error) {
470-
events, _, pageToken, pageHistorySize, err := persistenceutils.PaginateHistory(
471-
ctx,
472-
t.historyManager,
473-
false,
474-
branchToken,
475-
firstEventID,
476-
nextEventID,
477-
paginationToken,
478-
defaultHistoryPageSize,
479-
common.IntPtr(shardID),
480-
)
481-
if err != nil {
482-
return nil, nil, err
483-
}
484-
*historySize += pageHistorySize
485-
var paginateItems []interface{}
486-
for _, event := range events {
487-
paginateItems = append(paginateItems, event)
488-
}
489-
return paginateItems, pageToken, nil
490-
}
491-
}
492-
493395
func (t *taskAckManagerImpl) generateFailoverMarkerTask(
494396
taskInfo *persistence.ReplicationTaskInfo,
495397
) *types.ReplicationTask {

service/history/replication/task_ack_manager_test.go

Lines changed: 0 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -114,64 +114,6 @@ func (s *taskAckManagerSuite) TearDownTest() {
114114
s.mockShard.Finish(s.T())
115115
}
116116

117-
func (s *taskAckManagerSuite) TestGetPaginationFunc() {
118-
firstEventID := int64(0)
119-
nextEventID := int64(1)
120-
var branchToken []byte
121-
shardID := 0
122-
historyCount := 0
123-
pagingFunc := s.ackManager.getPaginationFunc(context.Background(), firstEventID, nextEventID, branchToken, shardID, &historyCount)
124-
125-
pageToken := []byte{1}
126-
event := &types.HistoryEvent{
127-
ID: 1,
128-
}
129-
s.mockHistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&persistence.ReadHistoryBranchResponse{
130-
HistoryEvents: []*types.HistoryEvent{event},
131-
NextPageToken: pageToken,
132-
Size: 1,
133-
LastFirstEventID: 1,
134-
}, nil)
135-
events, token, err := pagingFunc(nil)
136-
s.NoError(err)
137-
s.Equal(pageToken, token)
138-
s.Len(events, 1)
139-
s.Equal(events[0].(*types.HistoryEvent), event)
140-
s.Equal(historyCount, 1)
141-
}
142-
143-
func (s *taskAckManagerSuite) TestGetAllHistory_OK() {
144-
firstEventID := int64(0)
145-
nextEventID := int64(1)
146-
var branchToken []byte
147-
event := &types.HistoryEvent{
148-
ID: 1,
149-
}
150-
151-
s.mockHistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&persistence.ReadHistoryBranchResponse{
152-
HistoryEvents: []*types.HistoryEvent{event},
153-
NextPageToken: nil,
154-
Size: 1,
155-
LastFirstEventID: 1,
156-
}, nil)
157-
158-
history, err := s.ackManager.getAllHistory(context.Background(), firstEventID, nextEventID, branchToken)
159-
s.NoError(err)
160-
s.Len(history.GetEvents(), 1)
161-
s.Equal(event, history.GetEvents()[0])
162-
}
163-
164-
func (s *taskAckManagerSuite) TestGetAllHistory_Error() {
165-
firstEventID := int64(0)
166-
nextEventID := int64(1)
167-
var branchToken []byte
168-
s.mockHistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(nil, errors.New("test"))
169-
170-
history, err := s.ackManager.getAllHistory(context.Background(), firstEventID, nextEventID, branchToken)
171-
s.Error(err)
172-
s.Nil(history)
173-
}
174-
175117
func (s *taskAckManagerSuite) TestReadTasksWithBatchSize_OK() {
176118
task := &persistence.ReplicationTaskInfo{
177119
DomainID: uuid.New(),
@@ -197,86 +139,6 @@ func (s *taskAckManagerSuite) TestReadTasksWithBatchSize_Error() {
197139
s.Len(taskInfo, 0)
198140
}
199141

200-
func (s *taskAckManagerSuite) TestIsNewRunNDCEnabled_True() {
201-
domainID := uuid.New()
202-
workflowID := uuid.New()
203-
runID := uuid.New()
204-
workflowContext, release, _ := s.ackManager.executionCache.GetOrCreateWorkflowExecutionForBackground(
205-
domainID,
206-
types.WorkflowExecution{
207-
WorkflowID: workflowID,
208-
RunID: runID,
209-
},
210-
)
211-
workflowContext.SetWorkflowExecution(s.mockMutableState)
212-
release(nil)
213-
214-
s.mockMutableState.EXPECT().StartTransaction(gomock.Any(), gomock.Any()).Return(false, nil).Times(1)
215-
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes()
216-
s.mockMutableState.EXPECT().GetVersionHistories().Return(&persistence.VersionHistories{})
217-
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest(
218-
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
219-
&persistence.DomainConfig{Retention: 1},
220-
&persistence.DomainReplicationConfig{
221-
ActiveClusterName: cluster.TestCurrentClusterName,
222-
Clusters: []*persistence.ClusterReplicationConfig{
223-
{ClusterName: cluster.TestCurrentClusterName},
224-
{ClusterName: cluster.TestAlternativeClusterName},
225-
},
226-
},
227-
1,
228-
), nil).AnyTimes()
229-
230-
isNDC, err := s.ackManager.isNewRunNDCEnabled(
231-
context.Background(),
232-
domainID,
233-
workflowID,
234-
runID,
235-
)
236-
s.NoError(err)
237-
s.True(isNDC)
238-
}
239-
240-
func (s *taskAckManagerSuite) TestIsNewRunNDCEnabled_False() {
241-
domainID := uuid.New()
242-
workflowID := uuid.New()
243-
runID := uuid.New()
244-
workflowContext, release, _ := s.ackManager.executionCache.GetOrCreateWorkflowExecutionForBackground(
245-
domainID,
246-
types.WorkflowExecution{
247-
WorkflowID: workflowID,
248-
RunID: runID,
249-
},
250-
)
251-
workflowContext.SetWorkflowExecution(s.mockMutableState)
252-
release(nil)
253-
254-
s.mockMutableState.EXPECT().StartTransaction(gomock.Any(), gomock.Any()).Return(false, nil).Times(1)
255-
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes()
256-
s.mockMutableState.EXPECT().GetVersionHistories().Return(nil)
257-
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest(
258-
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
259-
&persistence.DomainConfig{Retention: 1},
260-
&persistence.DomainReplicationConfig{
261-
ActiveClusterName: cluster.TestCurrentClusterName,
262-
Clusters: []*persistence.ClusterReplicationConfig{
263-
{ClusterName: cluster.TestCurrentClusterName},
264-
{ClusterName: cluster.TestAlternativeClusterName},
265-
},
266-
},
267-
1,
268-
), nil).AnyTimes()
269-
270-
isNDC, err := s.ackManager.isNewRunNDCEnabled(
271-
context.Background(),
272-
domainID,
273-
workflowID,
274-
runID,
275-
)
276-
s.NoError(err)
277-
s.False(isNDC)
278-
}
279-
280142
func (s *taskAckManagerSuite) TestGetVersionHistoryItems_Error() {
281143
_, _, err := getVersionHistoryItems(nil, 0, 0)
282144
s.Error(err)

0 commit comments

Comments
 (0)