Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/packages/gax/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"dependencies": {
"@grpc/grpc-js": "^1.12.6",
"@grpc/proto-loader": "^0.8.0",
"@opentelemetry/api": "^1.9.0",
"duplexify": "^4.1.3",
"google-auth-library": "10.5.0",
"google-logging-utils": "1.1.3",
Expand Down
88 changes: 58 additions & 30 deletions core/packages/gax/src/normalCalls/retries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {Status} from '../status';
import {trace, metrics, SpanStatusCode} from '@opentelemetry/api';

import {
APICallback,
Expand All @@ -29,6 +30,12 @@ import {GoogleError} from '../googleError';

import {addTimeoutArg} from './timeout';

const meter = metrics.getMeter('google-gax');
const attemptDurationRecorder = meter.createHistogram('gcp-e.attempt_duration', {
description: 'Duration of a single gRPC attempt',
unit: 'ms',
});

/**
* Creates a function equivalent to func, but that retries on certain
* exceptions.
Expand Down Expand Up @@ -143,36 +150,57 @@ export function retryable(
retries++;
let lastError = err;
const toCall = addTimeoutArg(func, timeout!, otherArgs);
canceller = toCall(argument, (err, response, next, rawResponse) => {
if (err) {
lastError = err;
}
if (!err) {
callback(null, response, next, rawResponse);
return;
}
canceller = null;
if (
retry.retryCodes.length > 0 &&
retry.retryCodes.indexOf(err!.code!) < 0
) {
err.note =
'Exception occurred in retry method that was ' +
'not classified as transient';
callback(err);
} else {
const toSleep = Math.random() * delay;
timeoutId = setTimeout(() => {
now = new Date();
delay = Math.min(delay * delayMult, maxDelay);
const timeoutCal =
timeout && timeoutMult ? timeout * timeoutMult : 0;
const rpcTimeout = maxTimeout ? maxTimeout : 0;
const newDeadline = deadline ? deadline - now.getTime() : Infinity;
timeout = Math.min(timeoutCal, rpcTimeout, newDeadline);
repeat(lastError);
}, toSleep);
}

const tracer = trace.getTracer('google-gax');
const spanName = apiName ? `grpc.attempt.${apiName}` : 'grpc.attempt';
const startTime = Date.now();

canceller = tracer.startActiveSpan(spanName, (span) => {
const result = toCall(argument, (err, response, next, rawResponse) => {
const duration = Date.now() - startTime;
attemptDurationRecorder.record(duration, {
'rpc.method': apiName || 'unknown',
'rpc.grpc.status_code': err ? err.code || -1 : 0, // 0 is OK
});

if (err) {
lastError = err;
span.setStatus({ code: SpanStatusCode.ERROR, message: err.message });
console.error(`[GAPIC Attempt Error] in ${spanName}:`, err);
}
if (!err) {
span.setStatus({ code: SpanStatusCode.OK });
}
span.end();

if (!err) {
callback(null, response, next, rawResponse);
return;
}
canceller = null;
if (
retry.retryCodes.length > 0 &&
retry.retryCodes.indexOf(err!.code!) < 0
) {
err.note =
'Exception occurred in retry method that was ' +
'not classified as transient';
callback(err);
} else {
const toSleep = Math.random() * delay;
timeoutId = setTimeout(() => {
now = new Date();
delay = Math.min(delay * delayMult, maxDelay);
const timeoutCal =
timeout && timeoutMult ? timeout * timeoutMult : 0;
const rpcTimeout = maxTimeout ? maxTimeout : 0;
const newDeadline = deadline ? deadline - now.getTime() : Infinity;
timeout = Math.min(timeoutCal, rpcTimeout, newDeadline);
repeat(lastError);
}, toSleep);
}
});
return result;
});
if (canceller instanceof Promise) {
canceller.catch(err => {
Expand Down
Loading