Skip to content

Commit 462a2d2

Browse files
committed
fix(thrift): recover from transient Mongo timeouts
1 parent 62ad1df commit 462a2d2

2 files changed

Lines changed: 79 additions & 7 deletions

File tree

articlemeta/controller.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# coding: utf-8
2+
import os
23
from urllib.parse import urlparse
34
import warnings
45
import json
@@ -173,6 +174,24 @@ def get_date_range_filter(from_date=None, until_date=None):
173174
def get_dbconn(db_dsn):
174175
"""Connects to the MongoDB server and returns a database handler."""
175176

177+
def _mongo_timeouts():
178+
return {
179+
"serverSelectionTimeoutMS": int(os.environ.get("MONGO_SERVER_SELECTION_TIMEOUT_MS", 5000)),
180+
"connectTimeoutMS": int(os.environ.get("MONGO_CONNECT_TIMEOUT_MS", 5000)),
181+
"socketTimeoutMS": int(os.environ.get("MONGO_SOCKET_TIMEOUT_MS", 15000)),
182+
"waitQueueTimeoutMS": int(os.environ.get("MONGO_WAIT_QUEUE_TIMEOUT_MS", 5000)),
183+
"maxPoolSize": int(os.environ.get("MONGO_MAX_POOL_SIZE", 100)),
184+
"retryWrites": False,
185+
}
186+
187+
def _normalize_dsn(raw_dsn):
188+
# Accepts legacy forms like "host:27017/db", "//host:27017/db", or full mongodb URI.
189+
if raw_dsn.startswith("mongodb://") or raw_dsn.startswith("mongodb+srv://"):
190+
return raw_dsn
191+
if raw_dsn.startswith("//"):
192+
return "mongodb:%s" % raw_dsn
193+
return "mongodb://%s" % raw_dsn
194+
176195
def _create_indexes(db):
177196
"""
178197
Ensures that an index exists on specified collections.
@@ -254,10 +273,23 @@ def _create_indexes(db):
254273
db[collection].create_index(index[0], **index[1])
255274

256275
print('End Creation index')
257-
db_url = urlparse(db_dsn)
258-
conn = pymongo.MongoClient('mongodb://%s' % db_url.netloc)
259-
db = conn[db_url.path[1:]]
260-
_create_indexes(db)
276+
mongo_uri = _normalize_dsn(db_dsn)
277+
db_url = urlparse(mongo_uri)
278+
db_name = db_url.path[1:] or "articlemeta"
279+
conn = pymongo.MongoClient(mongo_uri, **_mongo_timeouts())
280+
db = conn[db_name]
281+
try:
282+
_create_indexes(db)
283+
except (
284+
pymongo.errors.AutoReconnect,
285+
pymongo.errors.NetworkTimeout,
286+
pymongo.errors.ServerSelectionTimeoutError,
287+
pymongo.errors.ConnectionFailure,
288+
) as exc:
289+
warnings.warn(
290+
"MongoDB unavailable during index creation; continuing startup "
291+
"without blocking. Error: %s" % exc
292+
)
261293
return db
262294

263295

articlemeta/thrift/server.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import json
33
import logging
44
import os
5+
import threading
56
import uuid
67

78
import thriftpywrap
89
import thriftpy2
10+
from pymongo import errors as pymongo_errors
911

1012
from articlemeta.controller import DataBroker, get_dbconn
1113
from articlemeta import utils
@@ -64,6 +66,44 @@
6466
articlemeta_thrift = thriftpy2.load(
6567
os.path.join(os.path.dirname(__file__), 'articlemeta.thrift'))
6668

69+
TRANSIENT_MONGO_ERRORS = (
70+
pymongo_errors.AutoReconnect,
71+
pymongo_errors.NetworkTimeout,
72+
pymongo_errors.ServerSelectionTimeoutError,
73+
pymongo_errors.ConnectionFailure,
74+
)
75+
76+
77+
class ResilientDataBroker(object):
78+
"""Wraps DataBroker calls with one reconnect/retry on transient Mongo errors."""
79+
80+
def __init__(self, broker_factory):
81+
self._broker_factory = broker_factory
82+
self._lock = threading.Lock()
83+
self._broker = broker_factory()
84+
85+
def _reset(self):
86+
with self._lock:
87+
self._broker = self._broker_factory()
88+
89+
def __getattr__(self, attr_name):
90+
attr = getattr(self._broker, attr_name)
91+
if not callable(attr):
92+
return attr
93+
94+
def _wrapped(*args, **kwargs):
95+
try:
96+
return getattr(self._broker, attr_name)(*args, **kwargs)
97+
except TRANSIENT_MONGO_ERRORS:
98+
logger.exception(
99+
"Transient MongoDB error on '%s', reconnecting and retrying once",
100+
attr_name,
101+
)
102+
self._reset()
103+
return getattr(self._broker, attr_name)(*args, **kwargs)
104+
105+
return _wrapped
106+
67107

68108
class Dispatcher(object):
69109
def __init__(self):
@@ -73,9 +113,9 @@ def __init__(self):
73113
db_dsn = os.environ.get('MONGODB_HOST', settings.get('mongo_uri', '127.0.0.1:27017'))
74114

75115
self._admintoken = os.environ.get('ADMIN_TOKEN', None) or settings['app:main'].get('admintoken', uuid.uuid4().hex)
76-
77-
db_client = get_dbconn(db_dsn)
78-
self._databroker = DataBroker(db_client)
116+
self._databroker = ResilientDataBroker(
117+
lambda: DataBroker(get_dbconn(db_dsn))
118+
)
79119

80120
def getInterfaceVersion(self):
81121
return articlemeta_thrift.VERSION

0 commit comments

Comments
 (0)