@@ -1754,8 +1754,8 @@ func TestExecuteQuery(t *testing.T) {
1754
1754
newPrepareQueryResp (preparedQuery1 , colFamAddress , colFamInfo ),
1755
1755
},
1756
1756
mockRecvMsgResps : []recvMsgResp {
1757
- newExecQueryRespPartialBatchFirstHalf (true /* reset */ , nil /* sleep */ , colFamAddress ),
1758
- newExecQueryRespPartialBatchSecondHalf (false /* reset */ , nil /* sleep */ , colFamInfo ),
1757
+ newExecQueryRespPartialBatchFirstHalf (true /* reset */ , nil /* sleep */ , [] string { colFamAddress } ),
1758
+ newExecQueryRespPartialBatchSecondHalf (false /* reset */ , nil /* sleep */ , [] string { colFamAddress }, [] string { colFamInfo } ),
1759
1759
newExecQueryRespResumeToken (),
1760
1760
{err : io .EOF },
1761
1761
},
@@ -1764,6 +1764,37 @@ func TestExecuteQuery(t *testing.T) {
1764
1764
},
1765
1765
wantResultRowValues : [][]* btpb.Value {newProtoRowValuesWithKey (colFamAddress , colFamInfo )},
1766
1766
},
1767
+ {
1768
+ /*
1769
+ 1. PrepareQuery
1770
+ 2. ExecuteQuery
1771
+ 3. RecvMsg - gets first half of a batch of data with reset true - half1
1772
+ 3. RecvMsg - receives Unavailable error
1773
+ 3. RecvMsg - gets first half of a batch of data with reset true - half2
1774
+ 4. RecvMsg - gets second half of a batch of data with reset false - half3
1775
+ 5. RecvMsg - gets resume token
1776
+ 6. RecvMsg - gets EOF
1777
+
1778
+ The resulting row should contain only half2 and half3
1779
+ */
1780
+ desc : "success with single batch received across multiple responses with reset" ,
1781
+ mockPrepQueryResps : []prepareQueryResp {
1782
+ newPrepareQueryResp (preparedQuery1 , colFamAddress , colFamInfo ),
1783
+ },
1784
+ mockRecvMsgResps : []recvMsgResp {
1785
+ newExecQueryRespPartialBatchFirstHalf (true /* reset */ , nil /* sleep */ , []string {colFamAddress }),
1786
+ {err : status .Error (codes .Unavailable , "mock unavailable error" )},
1787
+ newExecQueryRespPartialBatchFirstHalf (true /* reset */ , nil /* sleep */ , []string {colFamAddressNew }),
1788
+ newExecQueryRespPartialBatchSecondHalf (false /* reset */ , nil /* sleep */ , []string {colFamAddressNew }, []string {colFamInfo }),
1789
+ newExecQueryRespResumeToken (),
1790
+ {err : io .EOF },
1791
+ },
1792
+ wantExecReqPrepQuerys : [][]byte {
1793
+ []byte (preparedQuery1 ),
1794
+ []byte (preparedQuery1 ),
1795
+ },
1796
+ wantResultRowValues : [][]* btpb.Value {newProtoRowValuesWithKey (colFamAddressNew , colFamInfo )},
1797
+ },
1767
1798
{
1768
1799
/*
1769
1800
1. PrepareQuery
@@ -1776,7 +1807,7 @@ func TestExecuteQuery(t *testing.T) {
1776
1807
newPrepareQueryResp (preparedQuery1 , colFamAddress ),
1777
1808
},
1778
1809
mockRecvMsgResps : []recvMsgResp {
1779
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress ),
1810
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress } ),
1780
1811
{err : io .EOF },
1781
1812
},
1782
1813
wantExecReqPrepQuerys : [][]byte {
@@ -1802,7 +1833,7 @@ func TestExecuteQuery(t *testing.T) {
1802
1833
},
1803
1834
mockRecvMsgResps : []recvMsgResp {
1804
1835
{err : aePcf },
1805
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress ),
1836
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress } ),
1806
1837
newExecQueryRespResumeToken (),
1807
1838
{err : io .EOF },
1808
1839
},
@@ -1827,7 +1858,7 @@ func TestExecuteQuery(t *testing.T) {
1827
1858
newPrepareQueryResp (preparedQuery1 , colFamAddress ), // PrepareStatement
1828
1859
},
1829
1860
mockRecvMsgResps : []recvMsgResp {
1830
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress ),
1861
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress } ),
1831
1862
newExecQueryRespResumeToken (),
1832
1863
{err : status .Error (codes .Unavailable , "transient error" )},
1833
1864
{err : io .EOF },
@@ -1855,7 +1886,7 @@ func TestExecuteQuery(t *testing.T) {
1855
1886
newPrepareQueryResp (preparedQuery1 , colFamAddress ), // From Execute, because expired query
1856
1887
},
1857
1888
mockRecvMsgResps : []recvMsgResp {
1858
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress ),
1889
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress } ),
1859
1890
{
1860
1891
sleep : ptr (testPreparedQueryTTL + 2 * time .Second ),
1861
1892
err : status .Error (codes .DeadlineExceeded , "context deadline exceeded" ), // retryable
@@ -1889,7 +1920,7 @@ func TestExecuteQuery(t *testing.T) {
1889
1920
},
1890
1921
mockRecvMsgResps : []recvMsgResp {
1891
1922
{err : aePcf },
1892
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress ),
1923
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress } ),
1893
1924
newExecQueryRespResumeToken (),
1894
1925
{err : io .EOF },
1895
1926
},
@@ -1920,13 +1951,13 @@ func TestExecuteQuery(t *testing.T) {
1920
1951
newPrepareQueryResp (preparedQuery2 , colFamAddress , colFamInfo ), // Step 8
1921
1952
},
1922
1953
mockRecvMsgResps : []recvMsgResp {
1923
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress ), // Step 3
1924
- newExecQueryRespFullBatch (false , nil /* sleep */ , colFamAddress ), // Step 4
1925
- {err : status .Error (codes .Unavailable , "mock unavailable error" )}, // Step 5, retryable error
1954
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress } ), // Step 3
1955
+ newExecQueryRespFullBatch (false , nil /* sleep */ , [] string { colFamAddress } ), // Step 4
1956
+ {err : status .Error (codes .Unavailable , "mock unavailable error" )}, // Step 5, retryable error
1926
1957
{err : aePcf }, // Step 7, retryable error
1927
- newExecQueryRespFullBatch (true , nil /* sleep */ , colFamAddress , colFamInfo ), // Step 10
1928
- newExecQueryRespResumeToken (), // Step 11
1929
- {err : io .EOF }, // Step 12
1958
+ newExecQueryRespFullBatch (true , nil /* sleep */ , [] string { colFamAddress , colFamInfo } ), // Step 10
1959
+ newExecQueryRespResumeToken (), // Step 11
1960
+ {err : io .EOF }, // Step 12
1930
1961
},
1931
1962
wantExecReqPrepQuerys : [][]byte {
1932
1963
[]byte (preparedQuery1 ), // Step 2
@@ -2021,6 +2052,7 @@ func TestExecuteQuery(t *testing.T) {
2021
2052
2022
2053
const testPreparedQueryTTL = 10 * time .Second
2023
2054
const colFamAddress = "address"
2055
+ const colFamAddressNew = "address-new" // Used only for values and not metadata
2024
2056
const colFamInfo = "info"
2025
2057
2026
2058
var cfToValues = map [string ][]* btpb.Value {
@@ -2062,6 +2094,44 @@ var cfToValues = map[string][]*btpb.Value{
2062
2094
},
2063
2095
},
2064
2096
},
2097
+ colFamAddressNew : {
2098
+ {
2099
+ Kind : & btpb.Value_ArrayValue {
2100
+ ArrayValue : & btpb.ArrayValue {
2101
+ Values : []* btpb.Value {
2102
+ {
2103
+ Kind : & btpb.Value_BytesValue {
2104
+ BytesValue : []byte ("city" ),
2105
+ },
2106
+ },
2107
+ {
2108
+ Kind : & btpb.Value_BytesValue {
2109
+ BytesValue : []byte ("Kirkland" ),
2110
+ },
2111
+ },
2112
+ },
2113
+ },
2114
+ },
2115
+ },
2116
+ {
2117
+ Kind : & btpb.Value_ArrayValue {
2118
+ ArrayValue : & btpb.ArrayValue {
2119
+ Values : []* btpb.Value {
2120
+ {
2121
+ Kind : & btpb.Value_BytesValue {
2122
+ BytesValue : []byte ("state" ),
2123
+ },
2124
+ },
2125
+ {
2126
+ Kind : & btpb.Value_BytesValue {
2127
+ BytesValue : []byte ("WA" ),
2128
+ },
2129
+ },
2130
+ },
2131
+ },
2132
+ },
2133
+ },
2134
+ },
2065
2135
colFamInfo : {
2066
2136
{
2067
2137
Kind : & btpb.Value_ArrayValue {
@@ -2134,9 +2204,14 @@ func newPrepareQueryResp(preparedQuery string, colFams ...string) prepareQueryRe
2134
2204
}
2135
2205
}
2136
2206
2137
- func newRecvMsgResp (reset bool , protoRows * btpb.ProtoRows , blockTime * time.Duration ) recvMsgResp {
2207
+ func newRecvMsgResp (reset bool , protoRows , checksumProtoRows * btpb.ProtoRows , blockTime * time.Duration ) recvMsgResp {
2138
2208
marshalled , _ := proto .Marshal (protoRows )
2139
- checksum := crc32 .Checksum (marshalled , crc32cTable )
2209
+
2210
+ var checksum * uint32
2211
+ if checksumProtoRows != nil {
2212
+ marshalledCk , _ := proto .Marshal (checksumProtoRows )
2213
+ checksum = ptr (crc32 .Checksum (marshalledCk , crc32cTable ))
2214
+ }
2140
2215
return recvMsgResp {
2141
2216
results : & btpb.ExecuteQueryResponse_Results {
2142
2217
Results : & btpb.PartialResultSet {
@@ -2145,7 +2220,7 @@ func newRecvMsgResp(reset bool, protoRows *btpb.ProtoRows, blockTime *time.Durat
2145
2220
BatchData : marshalled ,
2146
2221
},
2147
2222
},
2148
- BatchChecksum : & checksum ,
2223
+ BatchChecksum : checksum ,
2149
2224
Reset_ : reset ,
2150
2225
},
2151
2226
},
@@ -2163,25 +2238,28 @@ func newExecQueryRespResumeToken() recvMsgResp {
2163
2238
}
2164
2239
}
2165
2240
2166
- func newExecQueryRespFullBatch (reset bool , blockTime * time.Duration , colFams ... string ) recvMsgResp {
2241
+ func newExecQueryRespFullBatch (reset bool , blockTime * time.Duration , colFams [] string ) recvMsgResp {
2167
2242
protoRows := & btpb.ProtoRows {
2168
2243
Values : newProtoRowValuesWithKey (colFams ... ),
2169
2244
}
2170
- return newRecvMsgResp (reset , protoRows , blockTime )
2245
+ return newRecvMsgResp (reset , protoRows , protoRows , blockTime )
2171
2246
}
2172
2247
2173
- func newExecQueryRespPartialBatchFirstHalf (reset bool , blockTime * time.Duration , colFams ... string ) recvMsgResp {
2248
+ func newExecQueryRespPartialBatchFirstHalf (reset bool , blockTime * time.Duration , colFams [] string ) recvMsgResp {
2174
2249
protoRows := & btpb.ProtoRows {
2175
2250
Values : newProtoRowValuesWithKey (colFams ... ),
2176
2251
}
2177
- return newRecvMsgResp (reset , protoRows , blockTime )
2252
+ return newRecvMsgResp (reset , protoRows , nil , blockTime )
2178
2253
}
2179
2254
2180
- func newExecQueryRespPartialBatchSecondHalf (reset bool , blockTime * time.Duration , colFams ... string ) recvMsgResp {
2255
+ func newExecQueryRespPartialBatchSecondHalf (reset bool , blockTime * time.Duration , colFamsFirstHalf , colFamsSecondHalf [] string ) recvMsgResp {
2181
2256
protoRows := & btpb.ProtoRows {
2182
- Values : newProtoRowValues (colFams ... ),
2257
+ Values : newProtoRowValues (colFamsSecondHalf ... ),
2258
+ }
2259
+ checksumProtoRows := & btpb.ProtoRows {
2260
+ Values : append (newProtoRowValuesWithKey (colFamsFirstHalf ... ), newProtoRowValues (colFamsSecondHalf ... )... ),
2183
2261
}
2184
- return newRecvMsgResp (reset , protoRows , blockTime )
2262
+ return newRecvMsgResp (reset , protoRows , checksumProtoRows , blockTime )
2185
2263
}
2186
2264
2187
2265
func newProtoRowValuesWithKey (colFams ... string ) []* btpb.Value {
0 commit comments