33import com .conveyal .analysis .UserPermissions ;
44import com .conveyal .analysis .persistence .AnalysisDB ;
55import com .google .common .collect .Lists ;
6+ import com .mongodb .client .FindIterable ;
67import com .mongodb .client .MongoCollection ;
7- import com .mongodb .util .JSON ;
8- import org .bson .BsonArray ;
98import org .bson .Document ;
109import org .bson .conversions .Bson ;
11- import org .bson .json .JsonWriter ;
1210import org .slf4j .Logger ;
1311import org .slf4j .LoggerFactory ;
1412import spark .Request ;
1513import spark .Response ;
1614
17- import java .io .IOException ;
1815import java .io .OutputStream ;
19- import java .io .PrintWriter ;
20- import java .io .Writer ;
2116import java .lang .invoke .MethodHandles ;
2217import java .nio .charset .StandardCharsets ;
2318import java .util .ArrayList ;
19+ import java .util .HashMap ;
2420import java .util .List ;
21+ import java .util .Map ;
2522
2623import static com .conveyal .analysis .util .JsonUtil .toJson ;
24+ import static com .google .common .base .Preconditions .checkNotNull ;
2725import static com .mongodb .client .model .Filters .and ;
2826import static com .mongodb .client .model .Filters .eq ;
2927
@@ -38,61 +36,72 @@ public class DatabaseController implements HttpController {
3836
3937 private final AnalysisDB database ;
4038
41- private final MongoCollection <Document > regions ;
42- private final MongoCollection <Document > bundles ;
39+ private final Map <String , MongoCollection <Document >> mongoCollections ;
40+
41+ // Preloading these avoids synchronization during handling http requests by reading from an immutable map.
42+ // TODO verify if it is threadsafe to reuse MongoCollection in all threads.
43+ // Amazingly there seems to be no documentation on this at all. Drilling down into the function calls, it seems
44+ // to create a new session on each find() call, so should presumably go through synchronization.
45+ // In testing with siege and other http benchmarking tools, reusing the MongoCollection seems to result in much
46+ // smoother operation; creating a new MongoCollection on each request seems to jam up after a certain number
47+ // of requests (perhaps waiting for idle MongoCollectons to be cleaned up).
48+ public Map <String , MongoCollection <Document >> mongoCollectionMap (String ... collectionNames ) {
49+ Map <String , MongoCollection <Document >> map = new HashMap <>();
50+ for (String name : collectionNames ) {
51+ map .put (name , database .getBsonCollection (name ));
52+ }
53+ // Make the map immutable for threadsafe reading and return.
54+ return Map .copyOf (map );
55+ }
4356
4457 public DatabaseController (AnalysisDB database ) {
4558 this .database = database ;
46- // TODO verify if it is threadsafe to reuse this collection in all threads
47- // Also verify whether it's any slower to just get the collection on every GET operation.
48- // Testing with Apache bench, retaining and reusing the collection seems much smoother.
49- this .regions = database .getBsonCollection ("regions" );
50- this .bundles = database .getBsonCollection ("bundles" );
59+ this .mongoCollections = mongoCollectionMap ("regions" , "bundles" );
5160 }
5261
53- /**
54- * Fetch anything from database. Buffers in memory so not suitable for huge responses.
55- * register serialization with sparkService.get("/api/db/:collection", this::getDocuments, toJson);
56- */
57- private Iterable <Document > getDocuments (Request req , Response res ) {
62+ /** Factored out for experimenting with streaming and non-streaming approaches to serialization. */
63+ private FindIterable <Document > getDocuments (Request req ) {
5864 String accessGroup = UserPermissions .from (req ).accessGroup ;
5965 final String collectionName = req .params ("collection" );
60- MongoCollection <Document > collection = collectionName . equals ( "bundles" ) ? bundles :
61- database . getBsonCollection ( collectionName );
66+ MongoCollection <Document > collection = mongoCollections . get ( collectionName );
67+ checkNotNull ( collection , "Collection not available: " + collectionName );
6268 List <Bson > filters = Lists .newArrayList (eq ("accessGroup" , accessGroup ));
6369 req .queryMap ().toMap ().forEach ((key , values ) -> {
6470 for (String value : values ) {
6571 filters .add (eq (key , value ));
6672 }
6773 });
74+ return collection .find (and (filters ));
75+ }
76+
77+ /**
78+ * Fetch anything from database. Buffers all documents in memory so may not not suitable for large responses.
79+ * Register result serialization with: sparkService.get("/api/db/:collection", this::getDocuments, toJson);
80+ */
81+ private Iterable <Document > getDocuments (Request req , Response res ) {
82+ FindIterable <Document > docs = getDocuments (req );
6883 List <Document > documents = new ArrayList <>();
69- collection . find ( and ( filters )) .into (documents );
84+ docs .into (documents );
7085 return documents ;
7186 }
7287
7388 /**
7489 * Fetch anything from database. Streaming processing, no in-memory buffering of the BsonDocuments.
7590 * The output stream does buffer to some extent but should stream chunks instead of serializing into memory.
91+ * Anecdotally in testing with seige this does seem to almost double the response rate and allow double the
92+ * concurrent connections without stalling (though still low at 20, and it eventually does stall).
7693 */
7794 private Object getDocumentsStreaming (Request req , Response res ) {
78- String accessGroup = UserPermissions .from (req ).accessGroup ;
79- final String collectionName = req .params ("collection" );
80- MongoCollection <Document > collection = collectionName .equals ("bundles" ) ? bundles :
81- database .getBsonCollection (collectionName );
82- List <Bson > filters = Lists .newArrayList (eq ("accessGroup" , accessGroup ));
83- req .queryMap ().toMap ().forEach ((key , values ) -> {
84- for (String value : values ) {
85- filters .add (eq (key , value ));
86- }
87- });
95+ FindIterable <Document > docs = getDocuments (req );
8896 // getOutputStream returns a ServletOutputStream, usually Jetty implementation HttpOutputStream which
8997 // buffers the output. doc.toJson() creates a lot of short-lived objects which could be factored out.
9098 // The Mongo driver says to use JsonWriter or toJson() rather than utility methods:
9199 // https://github.com/mongodb/mongo-java-driver/commit/63409f9cb3bbd0779dd5139355113d9b227dfa05
92- try (OutputStream out = res .raw ().getOutputStream ()) {
100+ try {
101+ OutputStream out = res .raw ().getOutputStream ();
93102 out .write ('[' ); // Begin JSON array.
94103 boolean firstElement = true ;
95- for (Document doc : collection . find ( and ( filters )) ) {
104+ for (Document doc : docs ) {
96105 if (firstElement ) {
97106 firstElement = false ;
98107 } else {
@@ -101,17 +110,16 @@ private Object getDocumentsStreaming (Request req, Response res) {
101110 out .write (doc .toJson ().getBytes (StandardCharsets .UTF_8 ));
102111 }
103112 out .write (']' ); // Close JSON array.
104- } catch (IOException e ) {
113+ // We do not close the OutputStream, even implicitly with a try-with-resources.
114+ // The thinking is that closing the stream might close the underlying connection, which might be keepalive.
115+ } catch (Exception e ) {
105116 throw new RuntimeException ("Failed to write database records as JSON." , e );
106117 }
107118 // Since we're directly writing to the OutputStream, no need to return anything.
108119 // But do not return null or Spark will complain cryptically.
109120 return "" ;
110121 }
111122
112- // Testing with Apache bench shows some stalling
113- // -k keepalive connections fails immediately
114-
115123 @ Override
116124 public void registerEndpoints (spark .Service sparkService ) {
117125 sparkService .get ("/api/db/:collection" , this ::getDocuments , toJson );
0 commit comments