@@ -48,68 +48,80 @@ type AsyncItemsCursor struct {
48
48
container v3io.Container
49
49
logger logger.Logger
50
50
51
- responseChan chan * v3io.Response
52
- workers int
53
- totalSegments int
54
- lastShards int
55
- Cnt int
51
+ responseChan chan * v3io.Response
52
+ workers int
53
+ totalSegments int
54
+ lastShards int
55
+ Cnt int
56
+ numberOfPartitions int
56
57
}
57
58
58
- func NewAsyncItemsCursor (container v3io.Container , input * v3io.GetItemsInput , workers int , shardingKeys []string , logger logger.Logger ) (* AsyncItemsCursor , error ) {
59
+ func NewAsyncItemsCursorMultiplePartitions (container v3io.Container , input * v3io.GetItemsInput , workers int ,
60
+ shardingKeys []string , logger logger.Logger , partitions []string ) (* AsyncItemsCursor , error ) {
59
61
60
62
// TODO: use workers from Context.numWorkers (if no ShardingKey)
61
63
if workers == 0 || input .ShardingKey != "" {
62
64
workers = 1
63
65
}
64
66
65
67
newAsyncItemsCursor := & AsyncItemsCursor {
66
- container : container ,
67
- input : input ,
68
- responseChan : make (chan * v3io.Response , 1000 ),
69
- workers : workers ,
70
- logger : logger .GetChild ("AsyncItemsCursor" ),
68
+ container : container ,
69
+ input : input ,
70
+ responseChan : make (chan * v3io.Response , 1000 ),
71
+ workers : workers ,
72
+ logger : logger .GetChild ("AsyncItemsCursor" ),
73
+ numberOfPartitions : len (partitions ),
71
74
}
72
75
73
76
if len (shardingKeys ) > 0 {
74
77
newAsyncItemsCursor .workers = len (shardingKeys )
75
78
79
+ for _ , partition := range partitions {
80
+ for i := 0 ; i < newAsyncItemsCursor .workers ; i ++ {
81
+ input := v3io.GetItemsInput {
82
+ Path : partition ,
83
+ AttributeNames : input .AttributeNames ,
84
+ Filter : input .Filter ,
85
+ ShardingKey : shardingKeys [i ],
86
+ }
87
+ _ , err := container .GetItems (& input , & input , newAsyncItemsCursor .responseChan )
88
+
89
+ if err != nil {
90
+ return nil , err
91
+ }
92
+ }
93
+ }
94
+
95
+ return newAsyncItemsCursor , nil
96
+ }
97
+
98
+ for _ , partition := range partitions {
76
99
for i := 0 ; i < newAsyncItemsCursor .workers ; i ++ {
100
+ newAsyncItemsCursor .totalSegments = workers
77
101
input := v3io.GetItemsInput {
78
- Path : input . Path ,
102
+ Path : partition ,
79
103
AttributeNames : input .AttributeNames ,
80
104
Filter : input .Filter ,
81
- ShardingKey : shardingKeys [i ],
105
+ TotalSegments : newAsyncItemsCursor .totalSegments ,
106
+ Segment : i ,
82
107
}
83
108
_ , err := container .GetItems (& input , & input , newAsyncItemsCursor .responseChan )
84
109
85
110
if err != nil {
111
+ // TODO: proper exit, release requests which passed
86
112
return nil , err
87
113
}
88
114
}
89
-
90
- return newAsyncItemsCursor , nil
91
- }
92
-
93
- for i := 0 ; i < newAsyncItemsCursor .workers ; i ++ {
94
- newAsyncItemsCursor .totalSegments = workers
95
- input := v3io.GetItemsInput {
96
- Path : input .Path ,
97
- AttributeNames : input .AttributeNames ,
98
- Filter : input .Filter ,
99
- TotalSegments : newAsyncItemsCursor .totalSegments ,
100
- Segment : i ,
101
- }
102
- _ , err := container .GetItems (& input , & input , newAsyncItemsCursor .responseChan )
103
-
104
- if err != nil {
105
- // TODO: proper exit, release requests which passed
106
- return nil , err
107
- }
108
115
}
109
116
110
117
return newAsyncItemsCursor , nil
111
118
}
112
119
120
+ func NewAsyncItemsCursor (container v3io.Container , input * v3io.GetItemsInput , workers int , shardingKeys []string ,
121
+ logger logger.Logger ) (* AsyncItemsCursor , error ) {
122
+ return NewAsyncItemsCursorMultiplePartitions (container , input , workers , shardingKeys , logger , []string {input .Path })
123
+ }
124
+
113
125
// error returns the last error
114
126
func (ic * AsyncItemsCursor ) Err () error {
115
127
return ic .currentError
@@ -148,7 +160,7 @@ func (ic *AsyncItemsCursor) NextItem() (v3io.Item, error) {
148
160
}
149
161
150
162
// are there any more items up stream? did all the shards complete ?
151
- if ic .lastShards == ic .workers {
163
+ if ic .lastShards == ic .workers * ic . numberOfPartitions {
152
164
ic .currentError = nil
153
165
return nil , nil
154
166
}
0 commit comments