Skip to content

feat(client-core): add signal option to support abort fetch #9539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ const cubeApi = cube(
);
```

If you need to set up cancellation for all requests made by this API instance:

```js
import cube from '@cubejs-client/core';

// Create a controller for managing request cancellation
const controller = new AbortController();
const { signal } = controller;

const cubeApi = cube(
'CUBE-API-TOKEN',
{
apiUrl: 'http://localhost:4000/cubejs-api/v1',
signal: signal
}
);

// Later when you need to cancel all pending requests:
// controller.abort();
```

**Parameters:**

Name | Type | Description |
Expand Down Expand Up @@ -87,6 +108,41 @@ const context = document.getElementById('myChart');
new Chart(context, chartjsConfig(resultSet));
```

You can also use AbortController to cancel a request:

```js
import cube from '@cubejs-client/core';

const cubeApi = cube('CUBE_TOKEN');

// Create an AbortController instance
const controller = new AbortController();
const { signal } = controller;

try {
// Pass the signal to the load method
const resultSetPromise = cubeApi.load(
{
measures: ['Orders.count'],
dimensions: ['Orders.status']
},
{ signal }
);

// To cancel the request at any time:
// controller.abort();

const resultSet = await resultSetPromise;
// Process the result
} catch (error) {
if (error.name === 'AbortError') {
console.log('Request was cancelled');
} else {
console.error('Error loading data:', error);
}
}
```

**Parameters:**

Name | Type | Description |
Expand Down Expand Up @@ -728,6 +784,7 @@ headers? | Record‹string, string› | - |
parseDateMeasures? | boolean | - |
pollInterval? | number | - |
transport? | [ITransport](#i-transport) | Transport implementation to use. [HttpTransport](#http-transport) will be used by default. |
signal? | AbortSignal | AbortSignal to cancel requests |

### `DateRange`

Expand Down Expand Up @@ -757,6 +814,7 @@ Name | Type | Optional? | Description |
`mutexObj` | `Object` | ✅ Yes | Object to store MUTEX |
`progressCallback` | | ✅ Yes | |
`subscribe` | `boolean` | ✅ Yes | Pass `true` to use continuous fetch behavior. |
`signal` | `AbortSignal` | ✅ Yes | AbortSignal to cancel the request. This allows you to manually abort requests using AbortController. |

### `LoadResponse`

Expand Down Expand Up @@ -1035,6 +1093,7 @@ apiUrl | string | path to `/cubejs-api/v1` |
authorization | string | [jwt auth token][ref-security] |
credentials? | "omit" | "same-origin" | "include" | - |
headers? | Record‹string, string› | custom headers |
signal? | AbortSignal | AbortSignal to cancel requests |

### `UnaryFilter`

Expand Down
16 changes: 16 additions & 0 deletions packages/cubejs-client-core/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ declare module '@cubejs-client/core' {
* Fetch timeout in milliseconds. Would be passed as AbortSignal.timeout()
*/
fetchTimeout?: number;
/**
* AbortSignal to cancel requests
*/
signal?: AbortSignal;
};

export interface ITransportResponse<R> {
Expand Down Expand Up @@ -64,6 +68,10 @@ declare module '@cubejs-client/core' {
* @hidden
*/
protected credentials: TransportOptions['credentials'];
/**
* @hidden
*/
protected signal?: TransportOptions['signal'];

constructor(options: TransportOptions);

Expand All @@ -89,6 +97,10 @@ declare module '@cubejs-client/core' {
* How many network errors would be retried before returning to users. Default to 0.
*/
networkErrorRetries?: number;
/**
* AbortSignal to cancel requests
*/
signal?: AbortSignal;
};

export type LoadMethodOptions = {
Expand Down Expand Up @@ -116,6 +128,10 @@ declare module '@cubejs-client/core' {
* Function that receives `ProgressResult` on each `Continue wait` message.
*/
progressCallback?(result: ProgressResult): void;
/**
* AbortSignal to cancel the request
*/
signal?: AbortSignal;
};

export type LoadMethodCallback<T> = (error: Error | null, resultSet: T) => void;
Expand Down
7 changes: 4 additions & 3 deletions packages/cubejs-client-core/src/HttpTransport.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ import fetch from 'cross-fetch';
import 'url-search-params-polyfill';

class HttpTransport {
constructor({ authorization, apiUrl, method, headers = {}, credentials, fetchTimeout }) {
constructor({ authorization, apiUrl, method, headers = {}, credentials, fetchTimeout, signal }) {
this.authorization = authorization;
this.apiUrl = apiUrl;
this.method = method;
this.headers = headers;
this.credentials = credentials;
this.fetchTimeout = fetchTimeout;
this.signal = signal;
}

request(method, { baseRequestId, ...params }) {
request(method, { baseRequestId, signal, ...params }) {
let spanCounter = 1;
const searchParams = new URLSearchParams(
params && Object.keys(params)
Expand All @@ -38,7 +39,7 @@ class HttpTransport {
},
credentials: this.credentials,
body: requestMethod === 'POST' ? JSON.stringify(params) : null,
signal: this.fetchTimeout ? AbortSignal.timeout(this.fetchTimeout) : undefined,
signal: signal || this.signal || (this.fetchTimeout ? AbortSignal.timeout(this.fetchTimeout) : undefined),
});

return {
Expand Down
176 changes: 175 additions & 1 deletion packages/cubejs-client-core/src/HttpTransport.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable import/first */
/* eslint-disable import/newline-after-import */
/* globals describe,test,expect,jest,afterEach,beforeAll */
/* globals describe,test,expect,jest,afterEach,beforeAll,beforeEach */
import '@babel/runtime/regenerator';
jest.mock('cross-fetch');
import fetch from 'cross-fetch';
Expand Down Expand Up @@ -114,4 +114,178 @@ describe('HttpTransport', () => {
body: largeQueryJson
});
});

// Signal tests from src/tests/HttpTransport.test.js
describe('Signal functionality', () => {
beforeEach(() => {
fetch.mockClear();
// Default mock implementation for signal tests
fetch.mockImplementation(() => Promise.resolve({
json: () => Promise.resolve({ data: 'test data' }),
ok: true,
status: 200
}));
});

test('should pass the signal to fetch when provided in constructor', async () => {
const controller = new AbortController();
const { signal } = controller;

const transport = new HttpTransport({
authorization: 'token',
apiUrl: 'http://localhost:4000/cubejs-api/v1',
signal
});

const request = transport.request('load', { query: { measures: ['Orders.count'] } });

// Start the request
const promise = request.subscribe((result) => result);

// Wait for fetch to be called
await Promise.resolve();

// Ensure fetch was called with the signal
expect(fetch).toHaveBeenCalledTimes(1);
expect(fetch.mock.calls[0][1].signal).toBe(signal);

await promise;
});

test('should pass the signal to fetch when provided in request method', async () => {
const controller = new AbortController();
const { signal } = controller;

const transport = new HttpTransport({
authorization: 'token',
apiUrl: 'http://localhost:4000/cubejs-api/v1'
});

const request = transport.request('load', {
query: { measures: ['Orders.count'] },
signal
});

// Start the request
const promise = request.subscribe((result) => result);

// Wait for fetch to be called
await Promise.resolve();

// Ensure fetch was called with the signal
expect(fetch).toHaveBeenCalledTimes(1);
expect(fetch.mock.calls[0][1].signal).toBe(signal);

await promise;
});

test('should prioritize request signal over constructor signal', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();

const transport = new HttpTransport({
authorization: 'token',
apiUrl: 'http://localhost:4000/cubejs-api/v1',
signal: controller1.signal
});

const request = transport.request('load', {
query: { measures: ['Orders.count'] },
signal: controller2.signal
});

// Start the request
const promise = request.subscribe((result) => result);

// Wait for fetch to be called
await Promise.resolve();

// Ensure fetch was called with the request signal, not the constructor signal
expect(fetch).toHaveBeenCalledTimes(1);
expect(fetch.mock.calls[0][1].signal).toBe(controller2.signal);
expect(fetch.mock.calls[0][1].signal).not.toBe(controller1.signal);

await promise;
});

test('should create AbortSignal.timeout from fetchTimeout if signal not provided', async () => {
// Mock AbortSignal.timeout
const originalTimeout = AbortSignal.timeout;
const mockTimeoutSignal = {};
AbortSignal.timeout = jest.fn().mockReturnValue(mockTimeoutSignal);

const transport = new HttpTransport({
authorization: 'token',
apiUrl: 'http://localhost:4000/cubejs-api/v1',
fetchTimeout: 5000
});

const request = transport.request('load', {
query: { measures: ['Orders.count'] }
});

// Start the request
const promise = request.subscribe((result) => result);

// Wait for fetch to be called
await Promise.resolve();

// Ensure fetch was called with the timeout signal
expect(fetch).toHaveBeenCalledTimes(1);
expect(fetch.mock.calls[0][1].signal).toBe(mockTimeoutSignal);
expect(AbortSignal.timeout).toHaveBeenCalledWith(5000);

// Restore original implementation
AbortSignal.timeout = originalTimeout;

await promise;
});

test('should handle request abortion', async () => {
// Create a mock Promise and resolver function to control Promise completion
let resolveFetch;
const fetchPromise = new Promise(resolve => {
resolveFetch = resolve;
});

// Mock fetch to return our controlled Promise
fetch.mockImplementationOnce(() => fetchPromise);

const controller = new AbortController();
const { signal } = controller;

const transport = new HttpTransport({
authorization: 'token',
apiUrl: 'http://localhost:4000/cubejs-api/v1'
});

const request = transport.request('load', {
query: { measures: ['Orders.count'] },
signal
});

// Start the request but don't wait for it to complete
const requestPromise = request.subscribe((result) => result);

// Wait for fetch to be called
await Promise.resolve();

// Ensure fetch was called with the signal
expect(fetch).toHaveBeenCalledTimes(1);
expect(fetch.mock.calls[0][1].signal).toBe(signal);

// Abort the request
controller.abort();

// Resolve the fetch Promise, simulating request completion
resolveFetch({
json: () => Promise.resolve({ data: 'aborted data' }),
ok: true,
status: 200
});

// Wait for the request Promise to complete
await requestPromise;
}, 10000); // Set 10-second timeout
});
});
Loading