Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll for results from frontend for async search #8481

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/fragments/8481.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feat:
- Add logic to poll for async query result ([#8481](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/8481))
50 changes: 46 additions & 4 deletions src/plugins/data/common/data_frames/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,54 @@ export interface IDataFrameWithAggs extends IDataFrame {
aggs: Record<string, DataFrameAgg | DataFrameBucketAgg | DataFrameBucketAgg[]>;
}

export interface IDataFrameResponse extends SearchResponse<any> {
type: DATA_FRAME_TYPES;
body: IDataFrame | IDataFrameWithAggs | IDataFrameError;
export interface IDataFrameDefaultResponse {
type: DATA_FRAME_TYPES.DEFAULT;
body: IDataFrame | IDataFrameWithAggs;
took: number;
}

export type IDataFramePollingResponse = {
type: DATA_FRAME_TYPES.POLLING;
} & (FetchStatusResponse | QueryStartedResponse);

export interface IDataFrameErrorResponse {
type: DATA_FRAME_TYPES.ERROR;
body: IDataFrameError;
took: number;
}

export interface IDataFrameError extends IDataFrameResponse {
export type IDataFrameResponse = SearchResponse<any> &
(IDataFrameDefaultResponse | IDataFramePollingResponse | IDataFrameErrorResponse);

export interface IDataFrameError extends SearchResponse<any> {
error: Error;
}

export interface PollQueryResultsParams {
queryId?: string;
sessionId?: string;
}

export type QueryStatusConfig = PollQueryResultsParams;

export interface QuerySuccessStatusResponse {
status: 'success';
body: IDataFrame | IDataFrameWithAggs | IDataFrameError;
}

export interface QueryStartedResponse {
status: 'started';
body: { queryStatusConfig: QueryStatusConfig };
}

export interface QueryFailedStatusResponse {
status: 'failed';
body: IDataFrameError;
}

export type FetchStatusResponse =
| QueryFailedStatusResponse
| QuerySuccessStatusResponse
| { status?: string };

export type PollQueryResultsHandler = () => Promise<FetchStatusResponse>;
42 changes: 38 additions & 4 deletions src/plugins/data/common/search/search_source/search_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@
import { IIndexPattern } from '../../index_patterns';
import {
DATA_FRAME_TYPES,
FetchStatusResponse,
IDataFrame,
IDataFrameDefaultResponse,
IDataFrameError,
IDataFramePollingResponse,
IDataFrameResponse,
QueryStartedResponse,
QuerySuccessStatusResponse,
convertResult,
createDataFrame,
} from '../../data_frames';
Expand All @@ -115,6 +120,7 @@
import { getHighlightRequest } from '../../../common/field_formats';
import { fetchSoon } from './legacy';
import { extractReferences } from './extract_references';
import { handleQueryResults } from '../../utils/helpers';

/** @internal */
export const searchSourceRequiredUiSettings = [
Expand Down Expand Up @@ -436,14 +442,42 @@
return search({ params }, options).then(async (response: any) => {
if (response.hasOwnProperty('type')) {
if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.DEFAULT) {
const dataFrameResponse = response as IDataFrameResponse;
const dataFrameResponse = response as IDataFrameDefaultResponse;

Check warning on line 445 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L445

Added line #L445 was not covered by tests
await this.setDataFrame(dataFrameResponse.body as IDataFrame);
return onResponse(searchRequest, convertResult(response as IDataFrameResponse));
}
if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.POLLING) {
const dataFrameResponse = response as IDataFrameResponse;
await this.setDataFrame(dataFrameResponse.body as IDataFrame);
return onResponse(searchRequest, convertResult(response as IDataFrameResponse));
const startTime = Date.now();
const { status } = response as IDataFramePollingResponse;

Check warning on line 451 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L450-L451

Added lines #L450 - L451 were not covered by tests
let results;
if (status === 'success') {
results = response as QuerySuccessStatusResponse;

Check warning on line 454 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L454

Added line #L454 was not covered by tests
} else if (status === 'started') {
const {
body: { queryStatusConfig },
} = response as QueryStartedResponse;

Check warning on line 458 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L458

Added line #L458 was not covered by tests

if (!queryStatusConfig) {
throw new Error('Cannot poll results for undefined query status config');

Check warning on line 461 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L461

Added line #L461 was not covered by tests
}

results = await handleQueryResults({

Check warning on line 464 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L464

Added line #L464 was not covered by tests
pollQueryResults: async () =>
search(

Check warning on line 466 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L466

Added line #L466 was not covered by tests
{ params: { ...params, pollQueryResultsParams: { ...queryStatusConfig } } },
options
) as Promise<FetchStatusResponse>,
queryId: queryStatusConfig.queryId,
});
} else {
throw new Error('Invalid query state');

Check warning on line 473 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L473

Added line #L473 was not covered by tests
}

const elapsedMs = Date.now() - startTime;
(results as any).took = elapsedMs;

Check warning on line 477 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L476-L477

Added lines #L476 - L477 were not covered by tests

await this.setDataFrame((results as QuerySuccessStatusResponse).body as IDataFrame);
return onResponse(searchRequest, convertResult(results as IDataFrameResponse));

Check warning on line 480 in src/plugins/data/common/search/search_source/search_source.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/search/search_source/search_source.ts#L479-L480

Added lines #L479 - L480 were not covered by tests
}
if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.ERROR) {
const dataFrameError = response as IDataFrameError;
Expand Down
70 changes: 70 additions & 0 deletions src/plugins/data/common/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Any modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { timer } from 'rxjs';
import { filter, mergeMap, take, takeWhile } from 'rxjs/operators';
import {
PollQueryResultsHandler,
FetchStatusResponse,
QueryFailedStatusResponse,
} from '../data_frames';

export interface QueryStatusOptions {
pollQueryResults: PollQueryResultsHandler;
queryId?: string;
interval?: number;
}

export const handleQueryResults = <T>(
options: QueryStatusOptions
): Promise<FetchStatusResponse> => {
const { pollQueryResults, interval = 5000, queryId } = options;

return timer(0, interval)

Check warning on line 50 in src/plugins/data/common/utils/helpers.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/utils/helpers.ts#L50

Added line #L50 was not covered by tests
.pipe(
mergeMap(() => pollQueryResults()),

Check warning on line 52 in src/plugins/data/common/utils/helpers.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/utils/helpers.ts#L52

Added line #L52 was not covered by tests
takeWhile((response: FetchStatusResponse) => {
const status = response?.status?.toUpperCase();

Check warning on line 54 in src/plugins/data/common/utils/helpers.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/utils/helpers.ts#L54

Added line #L54 was not covered by tests
return status !== 'SUCCESS' && status !== 'FAILED';
}, true),
filter((response: FetchStatusResponse) => {
const status = response?.status?.toUpperCase();

Check warning on line 58 in src/plugins/data/common/utils/helpers.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/utils/helpers.ts#L58

Added line #L58 was not covered by tests
if (status === 'FAILED') {
throw (

Check warning on line 60 in src/plugins/data/common/utils/helpers.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/utils/helpers.ts#L60

Added line #L60 was not covered by tests
(response as QueryFailedStatusResponse).body.error ??
new Error(`Failed to fetch results ${queryId ?? ''}`)
);
}
return status === 'SUCCESS';

Check warning on line 65 in src/plugins/data/common/utils/helpers.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/data/common/utils/helpers.ts#L65

Added line #L65 was not covered by tests
}),
take(1)
)
.toPromise();
};
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { debounceTime } from 'rxjs/operators';
import { i18n } from '@osd/i18n';
import { useEffect } from 'react';
import { cloneDeep, isEqual } from 'lodash';
import { cloneDeep } from 'lodash';
import { useLocation } from 'react-router-dom';
import { RequestAdapter } from '../../../../../inspector/public';
import { DiscoverViewServices } from '../../../build_services';
Expand Down Expand Up @@ -243,7 +243,7 @@
elapsedMs,
},
});
} catch (error) {
} catch (error: any) {
// If the request was aborted then no need to surface this error in the UI
if (error instanceof Error && error.name === 'AbortError') return;

Expand All @@ -259,9 +259,9 @@
}
let errorBody;
try {
errorBody = JSON.parse(error.body.message);
errorBody = JSON.parse(error.message);

Check warning on line 262 in src/plugins/discover/public/application/view_components/utils/use_search.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/discover/public/application/view_components/utils/use_search.ts#L262

Added line #L262 was not covered by tests
} catch (e) {
errorBody = error.body.message;
errorBody = error.message;

Check warning on line 264 in src/plugins/discover/public/application/view_components/utils/use_search.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/discover/public/application/view_components/utils/use_search.ts#L264

Added line #L264 was not covered by tests
}

data$.next({
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/query_enhancements/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import { CoreSetup } from 'opensearch-dashboards/public';
import { PollQueryResultsParams } from '../../data/common';

export interface QueryAggConfig {
[key: string]: {
Expand All @@ -25,6 +26,7 @@ export interface EnhancedFetchContext {
http: CoreSetup['http'];
path: string;
signal?: AbortSignal;
body?: { pollQueryResultsParams: PollQueryResultsParams };
}

export interface QueryStatusOptions<T> {
Expand Down
9 changes: 7 additions & 2 deletions src/plugins/query_enhancements/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
*/

import { Query } from 'src/plugins/data/common';
import { from, throwError, timer } from 'rxjs';
import { from, timer } from 'rxjs';
import { filter, mergeMap, take, takeWhile } from 'rxjs/operators';
import { stringify } from '@osd/std';
import {
EnhancedFetchContext,
QueryAggConfig,
Expand Down Expand Up @@ -49,7 +50,11 @@

export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: QueryAggConfig) => {
const { http, path, signal } = context;
const body = JSON.stringify({ query: { ...query, format: 'jdbc' }, aggConfig });
const body = stringify({

Check warning on line 53 in src/plugins/query_enhancements/common/utils.ts

View check run for this annotation

Codecov / codecov/patch

src/plugins/query_enhancements/common/utils.ts#L53

Added line #L53 was not covered by tests
query: { ...query, format: 'jdbc' },
aggConfig,
pollQueryResultsParams: context.body?.pollQueryResultsParams,
});
return from(
http.fetch({
method: 'POST',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { trimEnd } from 'lodash';
import { Observable, throwError } from 'rxjs';
import { catchError, tap } from 'rxjs/operators';
import { catchError } from 'rxjs/operators';
import { CoreStart } from 'opensearch-dashboards/public';
import {
DataPublicPluginStart,
Expand Down Expand Up @@ -36,16 +36,16 @@ export class SQLSearchInterceptor extends SearchInterceptor {
signal?: AbortSignal,
strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const isAsync = strategy === SEARCH_STRATEGY.SQL_ASYNC;
const context: EnhancedFetchContext = {
http: this.deps.http,
path: trimEnd(`${API.SEARCH}/${strategy}`),
signal,
body: {
pollQueryResultsParams: request.params?.pollQueryResultsParams,
},
};

if (isAsync) this.notifications.toasts.add('Fetching data...');
return fetch(context, this.queryService.queryString.getQuery()).pipe(
tap(() => isAsync && this.notifications.toasts.addSuccess('Fetch complete...')),
catchError((error) => {
return throwError(error);
})
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/query_enhancements/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ export function defineSearchStrategyRouteProvider(logger: Logger, router: IRoute
format: schema.string(),
}),
aggConfig: schema.nullable(schema.object({}, { unknowns: 'allow' })),
pollQueryResultsParams: schema.maybe(
schema.object({
queryId: schema.maybe(schema.string()),
sessionId: schema.maybe(schema.string()),
})
),
}),
},
},
Expand Down
Loading
Loading