Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ interface OptionDescriptor {
}

export const OPTIONS = {
adaptiveRetries: {
enableOverloadRetargeting: {
default: false,
type: 'boolean'
},
Expand Down Expand Up @@ -885,6 +885,10 @@ export const OPTIONS = {
default: 15,
type: 'uint'
},
maxAdaptiveRetries: {
default: 2,
type: 'uint'
},
maxConnecting: {
default: 2,
transform({ name, values: [value] }): number {
Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,6 @@ export type {
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { MAX_RETRIES, TokenBucket } from './token_bucket';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
9 changes: 6 additions & 3 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
retryReads?: boolean;
/** Enable retryable writes. */
retryWrites?: boolean;
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
adaptiveRetries?: boolean;
/** The maximum number of retries for overload errors. Set to 0 to disable overload retries. Defaults to 2. */
maxAdaptiveRetries?: number;
/** Whether to deprioritize servers that return overload errors during retry server selection. Defaults to false. */
enableOverloadRetargeting?: boolean;
/** Allow a driver to force a Single topology type with a connection string containing one host */
directConnection?: boolean;
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
Expand Down Expand Up @@ -1043,7 +1045,8 @@ export interface MongoOptions
extends Required<
Pick<
MongoClientOptions,
| 'adaptiveRetries'
| 'maxAdaptiveRetries'
| 'enableOverloadRetargeting'
| 'autoEncryption'
| 'connectTimeoutMS'
| 'directConnection'
Expand Down
37 changes: 8 additions & 29 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import {
BASE_BACKOFF_MS,
MAX_BACKOFF_MS,
MAX_RETRIES,
RETRY_COST,
RETRY_TOKEN_RETURN_RATE
} from '../token_bucket';
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect } from './operation';
Expand Down Expand Up @@ -176,6 +169,10 @@ type RetryOptions = {
topology: Topology;
timeoutContext: TimeoutContext;
};
/** @internal The base backoff duration in milliseconds */
const BASE_BACKOFF_MS = 100;
/** @internal The maximum backoff duration in milliseconds */
const MAX_BACKOFF_MS = 10_000;

/**
* Executes an operation and retries as appropriate
Expand Down Expand Up @@ -267,13 +264,6 @@ async function executeOperationWithRetries<
try {
try {
const result = await server.command(operation, timeoutContext);
if (topology.s.options.adaptiveRetries) {
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
}
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
Expand All @@ -282,15 +272,6 @@ async function executeOperationWithRetries<
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (
topology.s.options.adaptiveRetries &&
attempt > 0 &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}

// Preserve the original error once a write has been performed.
// Only update to the latest error if no writes were performed.
if (error == null) {
Expand All @@ -317,7 +298,8 @@ async function executeOperationWithRetries<
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1);
const maxOverloadAttempts = topology.s.options.maxAdaptiveRetries + 1;
maxAttempts = Math.min(maxOverloadAttempts, operation.maxAttempts ?? maxOverloadAttempts);
}

if (attempt + 1 >= maxAttempts) {
Expand Down Expand Up @@ -352,16 +334,13 @@ async function executeOperationWithRetries<
throw error;
}

if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

await setTimeout(backoffMS);
}

if (
topology.description.type === TopologyType.Sharded ||
operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
(operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
topology.s.options.enableOverloadRetargeting)
) {
deprioritizedServers.add(server.description);
}
Expand Down
6 changes: 2 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
import { INITIAL_TOKEN_BUCKET_SIZE, TokenBucket } from '../token_bucket';
import type { Transaction } from '../transactions';
import {
addAbortListener,
Expand Down Expand Up @@ -146,7 +145,8 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
hosts: HostAddress[];
retryWrites: boolean;
retryReads: boolean;
adaptiveRetries: boolean;
maxAdaptiveRetries: number;
enableOverloadRetargeting: boolean;
/** How long to block for server selection before throwing an error */
serverSelectionTimeoutMS: number;
/** The name of the replica set to connect to */
Expand Down Expand Up @@ -214,8 +214,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
hello?: Document;
_type?: string;

tokenBucket = new TokenBucket(INITIAL_TOKEN_BUCKET_SIZE);

client!: MongoClient;

private connectionLock?: Promise<Topology>;
Expand Down
5 changes: 2 additions & 3 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import { ReadConcernLevel } from './read_concern';
import { ReadPreference } from './read_preference';
import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common';
import { TimeoutContext } from './timeout';
import { MAX_RETRIES } from './token_bucket';
import {
isTransactionCommand,
Transaction,
Expand Down Expand Up @@ -498,7 +497,7 @@ export class ClientSession
readPreference: ReadPreference.primary,
bypassPinningCheck: true
});
operation.maxAttempts = MAX_RETRIES + 1;
operation.maxAttempts = this.clientOptions.maxAdaptiveRetries + 1;

const timeoutContext =
this.timeoutContext ??
Expand All @@ -517,7 +516,7 @@ export class ClientSession
} catch (firstCommitError) {
this.commitAttempted = true;

const remainingAttempts = MAX_RETRIES + 1 - operation.attemptsMade;
const remainingAttempts = this.clientOptions.maxAdaptiveRetries + 1 - operation.attemptsMade;
if (remainingAttempts <= 0) {
throw firstCommitError;
}
Expand Down
54 changes: 0 additions & 54 deletions src/token_bucket.ts

This file was deleted.

Loading
Loading