Skip to content

Commit 7e118ce

Browse files
authored
[Cosmos] some tidy up to Cosmos query engine API (Azure#24485)
* some tidy up to Cosmos query engine API * pass completed state through PipelineResult
1 parent 7de8056 commit 7e118ce

File tree

3 files changed

+19
-12
lines changed

3 files changed

+19
-12
lines changed

sdk/data/azcosmos/cosmos_container_query_engine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,24 +122,24 @@ func (c *ContainerClient) executeQueryWithEngine(queryEngine queryengine.QueryEn
122122
}, nil
123123
}
124124
// Fetch more data from the pipeline
125-
items, requests, err := queryPipeline.NextBatch()
125+
result, err := queryPipeline.Run()
126126
if err != nil {
127127
queryPipeline.Close()
128128
return QueryItemsResponse{}, err
129129
}
130130

131131
// If we got items, we can return them, and we should do so now, to avoid making unnecessary requests.
132132
// Even if there are requests in the queue, the pipeline should return the same requests again on the next call to NextBatch.
133-
if len(items) > 0 {
133+
if len(result.Items) > 0 {
134134
return QueryItemsResponse{
135135
Response: lastResponse,
136-
Items: items,
136+
Items: result.Items,
137137
}, nil
138138
}
139139

140140
// If we didn't have any items to return, we need to make requests for the items in the queue.
141141
// If there are no requests, the pipeline should return true for IsComplete, so we'll stop on the next iteration.
142-
for _, request := range requests {
142+
for _, request := range result.Requests {
143143
// Make the single-partition query request
144144
qryRequest := queryRequest(request) // Cast to our type, which has toHeaders defined on it.
145145
azResponse, err := c.database.client.sendQueryRequest(

sdk/data/azcosmos/internal/mock_query_engine.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ func (m *MockQueryPipeline) IsComplete() bool {
139139
}
140140

141141
// NextBatch returns the next batch of items from the pipeline, as well as any requests needed to collect more data.
142-
func (m *MockQueryPipeline) NextBatch() ([][]byte, []queryengine.QueryRequest, error) {
142+
func (m *MockQueryPipeline) Run() (*queryengine.PipelineResult, error) {
143143
if m.IsClosed {
144-
return nil, nil, fmt.Errorf("pipeline is closed")
144+
return nil, fmt.Errorf("pipeline is closed")
145145
}
146146

147147
items := make([][]byte, 0)
@@ -154,7 +154,11 @@ func (m *MockQueryPipeline) NextBatch() ([][]byte, []queryengine.QueryRequest, e
154154
for i := range m.partitionState {
155155
// If any partition hasn't started yet, we can't return any items.
156156
if !m.partitionState[i].started {
157-
return nil, m.getRequests(), nil
157+
return &queryengine.PipelineResult{
158+
IsCompleted: false,
159+
Items: nil,
160+
Requests: m.getRequests(),
161+
}, nil
158162
}
159163

160164
if m.partitionState[i].IsExhausted() {
@@ -175,7 +179,7 @@ func (m *MockQueryPipeline) NextBatch() ([][]byte, []queryengine.QueryRequest, e
175179
// Add the item to the result set and remove it from the queue.
176180
item, err := lowestPartition.PopItem()
177181
if err != nil {
178-
return nil, nil, err
182+
return nil, err
179183
}
180184
items = append(items, item)
181185
}
@@ -190,7 +194,11 @@ func (m *MockQueryPipeline) NextBatch() ([][]byte, []queryengine.QueryRequest, e
190194
m.completed = true
191195
}
192196

193-
return items, requests, nil
197+
return &queryengine.PipelineResult{
198+
IsCompleted: m.completed,
199+
Items: items,
200+
Requests: requests,
201+
}, nil
194202
}
195203

196204
// getRequests returns a list of all the QueryRequests that are needed to get the next batch of items.

sdk/data/azcosmos/unstable/queryengine/cosmos_query_engine.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,8 @@ type QueryPipeline interface {
5656
Query() string
5757
// IsComplete gets a boolean indicating if the pipeline has concluded
5858
IsComplete() bool
59-
// NextBatch gets the next batch of items, which will be empty if there are no more items in the buffer, and the next set of QueryRequests which must be fulfilled, which will be empty if there are no more requests.
60-
// If both the items and requests are empty, the pipeline has concluded.
61-
NextBatch() ([][]byte, []QueryRequest, error)
59+
// Run executes a single turn of the pipeline, yielding a PipelineResult containing the items and requests for more data.
60+
Run() (*PipelineResult, error)
6261
// ProvideData provides more data for a given partition key range ID, using data retrieved from the server in response to making a DataRequest.
6362
ProvideData(data QueryResult) error
6463
// Close frees the resources associated with the pipeline.

0 commit comments

Comments
 (0)