Skip to content

Commit ffb16d9

Browse files
authored
fix: multiple find (#186)
1 parent 884a3df commit ffb16d9

3 files changed

Lines changed: 72 additions & 39 deletions

File tree

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![tag](https://img.shields.io/github/tag/manyuanrong/deno_mongo.svg)](https://github.com/manyuanrong/deno_mongo/releases)
66
[![Build Status](https://github.com/manyuanrong/deno_mongo/workflows/ci/badge.svg?branch=master)](https://github.com/manyuanrong/deno_mongo/actions)
77
[![license](https://img.shields.io/github/license/manyuanrong/deno_mongo.svg)](https://github.com/manyuanrong/deno_mongo)
8-
[![tag](https://img.shields.io/badge/deno-v1.5.2-green.svg)](https://github.com/denoland/deno)
8+
[![tag](https://img.shields.io/badge/deno-v1.8.1-green.svg)](https://github.com/denoland/deno)
99

1010
## Links
1111

@@ -16,7 +16,7 @@
1616
## Examples
1717

1818
```ts
19-
import { MongoClient, Bson } from "https://deno.land/x/mongo@v0.21.0/mod.ts";
19+
import { Bson, MongoClient } from "https://deno.land/x/mongo@v0.22.0/mod.ts";
2020

2121
const client = new MongoClient();
2222
await client.connect("mongodb://localhost:27017");
@@ -56,7 +56,9 @@ const user1 = await users.findOne({ _id: insertId });
5656
const all_users = await users.find({ username: { $ne: null } }).toArray();
5757

5858
// find by ObjectId
59-
const user1_id = await users.findOne({ _id: new Bson.ObjectId("SOME OBJECTID STRING") });
59+
const user1_id = await users.findOne({
60+
_id: new Bson.ObjectId("SOME OBJECTID STRING"),
61+
});
6062

6163
// count
6264
const count = await users.count({ username: { $ne: null } });
@@ -70,13 +72,13 @@ const docs = await users.aggregate([
7072
// updateOne
7173
const { matchedCount, modifiedCount, upsertedId } = await users.updateOne(
7274
{ username: { $ne: null } },
73-
{ $set: { username: "USERNAME" } }
75+
{ $set: { username: "USERNAME" } },
7476
);
7577

7678
// updateMany
7779
const { matchedCount, modifiedCount, upsertedId } = await users.updateMany(
7880
{ username: { $ne: null } },
79-
{ $set: { username: "USERNAME" } }
81+
{ $set: { username: "USERNAME" } },
8082
);
8183

8284
// deleteOne

src/protocol/protocol.ts

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,21 @@ import { parseHeader } from "./header.ts";
66
import { deserializeMessage, Message, serializeMessage } from "./message.ts";
77

88
type Socket = Deno.Reader & Deno.Writer;
9+
interface CommandTask {
10+
requestId: number;
11+
db: string;
12+
body: Document;
13+
}
914

1015
let nextRequestId = 0;
1116

1217
export class WireProtocol {
1318
#socket: Socket;
14-
#pending = false;
15-
#pendingOps: Map<number, Deferred<Message>> = new Map();
19+
#isPendingResponse = false;
20+
#isPendingRequest = false;
21+
#pendingResponses: Map<number, Deferred<Message>> = new Map();
1622
#reader: BufReader;
23+
#commandQueue: CommandTask[] = [];
1724

1825
#connectionId: number = 0;
1926

@@ -38,26 +45,18 @@ export class WireProtocol {
3845

3946
async command<T = Document>(db: string, body: Document): Promise<T[]> {
4047
const requestId = nextRequestId++;
41-
const chunks = serializeMessage({
48+
const commandTask = {
4249
requestId,
43-
responseTo: 0,
44-
sections: [
45-
{
46-
document: {
47-
...body,
48-
$db: db,
49-
},
50-
},
51-
],
52-
});
50+
db,
51+
body,
52+
};
5353

54-
for (const chunk of chunks) {
55-
await Deno.writeAll(this.#socket, chunk);
56-
}
54+
this.#commandQueue.push(commandTask);
55+
this.send();
5756

58-
this.#pendingOps.set(requestId, deferred());
57+
this.#pendingResponses.set(requestId, deferred());
5958
this.receive();
60-
const message = await this.#pendingOps.get(requestId);
59+
const message = await this.#pendingResponses.get(requestId);
6160

6261
let documents: T[] = [];
6362

@@ -72,10 +71,35 @@ export class WireProtocol {
7271
return documents;
7372
}
7473

74+
private async send() {
75+
if (this.#isPendingRequest) return;
76+
this.#isPendingRequest = true;
77+
while (this.#commandQueue.length > 0) {
78+
const task = this.#commandQueue.shift()!;
79+
const chunks = serializeMessage({
80+
requestId: task.requestId,
81+
responseTo: 0,
82+
sections: [
83+
{
84+
document: {
85+
...task.body,
86+
$db: task.db,
87+
},
88+
},
89+
],
90+
});
91+
92+
for (const chunk of chunks) {
93+
await Deno.writeAll(this.#socket, chunk);
94+
}
95+
}
96+
this.#isPendingRequest = false;
97+
}
98+
7599
private async receive() {
76-
if (this.#pending) return;
77-
this.#pending = true;
78-
while (this.#pendingOps.size > 0) {
100+
if (this.#isPendingResponse) return;
101+
this.#isPendingResponse = true;
102+
while (this.#pendingResponses.size > 0) {
79103
const headerBuffer = await this.#reader.readFull(new Uint8Array(16));
80104
assert(headerBuffer);
81105
const header = parseHeader(headerBuffer!);
@@ -84,10 +108,10 @@ export class WireProtocol {
84108
);
85109
assert(bodyBuffer);
86110
const reply = deserializeMessage(header, bodyBuffer!);
87-
const pendingMessage = this.#pendingOps.get(header.responseTo);
88-
this.#pendingOps.delete(header.responseTo);
111+
const pendingMessage = this.#pendingResponses.get(header.responseTo);
112+
this.#pendingResponses.delete(header.responseTo);
89113
pendingMessage?.resolve(reply);
90114
}
91-
this.#pending = false;
115+
this.#isPendingResponse = false;
92116
}
93117
}

tests/cases/03_curd.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,23 @@ export default function curdTests() {
190190
assertEquals(notFound, []);
191191
});
192192

193+
testWithClient("test multiple queries at the same time", async (client) => {
194+
const db = client.database("test");
195+
const users = db.collection("mongo_test_users");
196+
197+
const result = await Promise.all([
198+
users.findOne({}, { projection: { username: 1 } }),
199+
users.findOne({}, { projection: { username: 1 } }),
200+
users.findOne({}, { projection: { username: 1 } }),
201+
]);
202+
203+
assertEquals(result, [
204+
{ _id: "aaaaaaaaaaaaaaaaaaaaaaaa", username: "user1" },
205+
{ _id: "aaaaaaaaaaaaaaaaaaaaaaaa", username: "user1" },
206+
{ _id: "aaaaaaaaaaaaaaaaaaaaaaaa", username: "user1" },
207+
]);
208+
});
209+
193210
testWithClient("testCount", async (client) => {
194211
const db = client.database("test");
195212
const users = db.collection("mongo_test_users");
@@ -238,16 +255,6 @@ export default function curdTests() {
238255
assertEquals(user1, ["USER2", "user1"]);
239256
});
240257

241-
// // TODO mongdb_rust official library has not implemented this feature
242-
// // testWithClient("testCreateIndexes", async (client) => {
243-
// // const db = client.database("test");
244-
// // const collection = db.collection("mongo_indexes");
245-
// // const result = await collection.createIndexes([
246-
// // { keys: { created_at: 1 }, options: { expireAfterSeconds: 10000 } }
247-
// // ]);
248-
// // console.log(result);
249-
// // });
250-
251258
testWithClient("testDropConnection", async (client) => {
252259
const db = client.database("test");
253260
await db.collection("mongo_test_users_2").drop();

0 commit comments

Comments
 (0)