41
41
#include "mongoc/mongoc-database-private.h"
42
42
#include "mongoc/mongoc-gridfs-private.h"
43
43
#include "mongoc/mongoc-error.h"
44
+ #include "mongoc/mongoc-error-private.h"
44
45
#include "mongoc/mongoc-log.h"
45
46
#include "mongoc/mongoc-queue-private.h"
46
47
#include "mongoc/mongoc-socket.h"
@@ -1603,7 +1604,8 @@ _mongoc_client_retryable_write_command_with_stream (
1603
1604
& client -> cluster , & parts -> assembled , reply , error );
1604
1605
1605
1606
if (is_retryable ) {
1606
- _mongoc_write_error_update_if_unsupported_storage_engine (ret , error , reply );
1607
+ _mongoc_write_error_update_if_unsupported_storage_engine (
1608
+ ret , error , reply );
1607
1609
}
1608
1610
1609
1611
/* If a retryable error is encountered and the write is retryable, select
@@ -1642,9 +1644,75 @@ _mongoc_client_retryable_write_command_with_stream (
1642
1644
}
1643
1645
1644
1646
1647
+ static bool
1648
+ _mongoc_client_retryable_read_command_with_stream (
1649
+ mongoc_client_t * client ,
1650
+ mongoc_cmd_parts_t * parts ,
1651
+ mongoc_server_stream_t * server_stream ,
1652
+ bson_t * reply ,
1653
+ bson_error_t * error )
1654
+ {
1655
+ mongoc_server_stream_t * retry_server_stream = NULL ;
1656
+ bool is_retryable = true;
1657
+ bool ret ;
1658
+ bson_t reply_local ;
1659
+
1660
+ if (reply == NULL ) {
1661
+ reply = & reply_local ;
1662
+ }
1663
+
1664
+ ENTRY ;
1665
+
1666
+ BSON_ASSERT (parts -> is_retryable_read );
1667
+
1668
+ retry :
1669
+ ret = mongoc_cluster_run_command_monitored (
1670
+ & client -> cluster , & parts -> assembled , reply , error );
1671
+
1672
+ /* If a retryable error is encountered and the read is retryable, select
1673
+ * a new readable stream and retry. If server selection fails or the selected
1674
+ * server does not support retryable reads, fall through and allow the
1675
+ * original error to be reported. */
1676
+ if (is_retryable &&
1677
+ _mongoc_read_error_get_type (ret , error , reply ) ==
1678
+ MONGOC_READ_ERR_RETRY ) {
1679
+ bson_error_t ignored_error ;
1680
+
1681
+ /* each read command may be retried at most once */
1682
+ is_retryable = false;
1683
+
1684
+ if (retry_server_stream ) {
1685
+ mongoc_server_stream_cleanup (retry_server_stream );
1686
+ }
1687
+
1688
+ retry_server_stream =
1689
+ mongoc_cluster_stream_for_reads (& client -> cluster ,
1690
+ parts -> read_prefs ,
1691
+ parts -> assembled .session ,
1692
+ NULL ,
1693
+ & ignored_error );
1694
+
1695
+ if (retry_server_stream &&
1696
+ retry_server_stream -> sd -> max_wire_version >=
1697
+ WIRE_VERSION_RETRY_READS ) {
1698
+ parts -> assembled .server_stream = retry_server_stream ;
1699
+ bson_destroy (reply );
1700
+ GOTO (retry );
1701
+ }
1702
+ }
1703
+
1704
+ if (retry_server_stream ) {
1705
+ mongoc_server_stream_cleanup (retry_server_stream );
1706
+ }
1707
+
1708
+ RETURN (ret );
1709
+ }
1710
+
1711
+
1645
1712
static bool
1646
1713
_mongoc_client_command_with_stream (mongoc_client_t * client ,
1647
1714
mongoc_cmd_parts_t * parts ,
1715
+ const mongoc_read_prefs_t * read_prefs ,
1648
1716
mongoc_server_stream_t * server_stream ,
1649
1717
bson_t * reply ,
1650
1718
bson_error_t * error )
@@ -1662,6 +1730,11 @@ _mongoc_client_command_with_stream (mongoc_client_t *client,
1662
1730
client , parts , server_stream , reply , error ));
1663
1731
}
1664
1732
1733
+ if (parts -> is_retryable_read ) {
1734
+ RETURN (_mongoc_client_retryable_read_command_with_stream (
1735
+ client , parts , server_stream , reply , error ));
1736
+ }
1737
+
1665
1738
RETURN (mongoc_cluster_run_command_monitored (
1666
1739
& client -> cluster , & parts -> assembled , reply , error ));
1667
1740
}
@@ -1705,7 +1778,7 @@ mongoc_client_command_simple (mongoc_client_t *client,
1705
1778
1706
1779
if (server_stream ) {
1707
1780
ret = _mongoc_client_command_with_stream (
1708
- client , & parts , server_stream , reply , error );
1781
+ client , & parts , read_prefs , server_stream , reply , error );
1709
1782
} else {
1710
1783
/* reply initialized by mongoc_cluster_stream_for_reads */
1711
1784
ret = false;
@@ -1914,7 +1987,7 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
1914
1987
}
1915
1988
1916
1989
ret = _mongoc_client_command_with_stream (
1917
- client , & parts , server_stream , reply_ptr , error );
1990
+ client , & parts , user_prefs , server_stream , reply_ptr , error );
1918
1991
1919
1992
reply_initialized = true;
1920
1993
@@ -2071,7 +2144,7 @@ mongoc_client_command_simple_with_server_id (
2071
2144
parts .read_prefs = read_prefs ;
2072
2145
2073
2146
ret = _mongoc_client_command_with_stream (
2074
- client , & parts , server_stream , reply , error );
2147
+ client , & parts , read_prefs , server_stream , reply , error );
2075
2148
2076
2149
mongoc_cmd_parts_cleanup (& parts );
2077
2150
mongoc_server_stream_cleanup (server_stream );
0 commit comments