Skip to content

Commit cb97897

Browse files
authored
feat: HTTP Data API NDJSON streaming endpoint (#1248)
* feat(reader): generalize report readers to ReportLike and guard sqlOverrideParams Introduce a ReportLikeReadPlan/ReportLike union and widen the storage report reader interface and all five readers (BigQuery, Athena, Snowflake, Redshift, Databricks) from Report to ReportLike, so a transient read plan can drive prepareReportData without a persisted Report. Athena, Snowflake, Redshift and Databricks fail closed on parameterized sqlOverride; BigQuery binds the params. * feat(api): add HTTP Data API NDJSON streaming endpoint GET /api/external/http-data/data-marts/{id}.ndjson streams a published Data Mart's selected columns as pure NDJSON for project members authenticated with their ODM member token. Action.USE is enforced on the requested Data Mart and every contributing Data Mart — blended columns, post-join filters, sort columns and pre-join filter alias paths. Supports base64-encoded filter/sort and an integer limit; pageToken/offset are rejected. Each pull is recorded as an HTTP_DATA Data Mart run (metadata in additionalParams.httpData, whitelisted in run history) and counted for consumption; no persisted Report or Data Destination is created. The stream handles backpressure, request timeout and client disconnect via AbortController without leaking an unhandled rejection, retries SUCCESS persistence so a transient write failure never flips a completed stream to FAILED, and recovers orphaned RUNNING runs on bootstrap. * feat(web): show HTTP Data runs in data mart run history Render HTTP_DATA runs with a dedicated icon and "HTTP Data" label, and surface the httpData parameters block in the run configuration view independently of the run definition snapshot.
1 parent 353810d commit cb97897

46 files changed

Lines changed: 2618 additions & 81 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'owox': minor
3+
---
4+
5+
# Add the HTTP Data API — stream a published Data Mart's rows over HTTP as NDJSON
6+
7+
A new `GET /api/external/http-data/data-marts/{id}.ndjson` endpoint streams the selected
8+
columns of a published Data Mart as newline-delimited JSON for project members
9+
authenticated with their ODM member token (`x-owox-authorization`). Callers explicitly
10+
choose columns and may pass optional base64-encoded `filter`/`sort` and a `limit`. Every
11+
pull is recorded as an `HTTP_DATA` Data Mart run (visible in run history) and counted for
12+
consumption; no persisted Report or Data Destination is created.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Controller, Get, Param, Query, Res } from '@nestjs/common';
2+
import { ApiTags } from '@nestjs/swagger';
3+
import type { Response } from 'express';
4+
import { Auth, AuthContext, AuthorizationContext, Role } from '../../../idp';
5+
import { HttpDataMapper } from '../../mappers/http-data.mapper';
6+
import { StreamHttpDataService } from '../../use-cases/stream-http-data.service';
7+
import { StreamHttpDataSpec } from '../spec/external/http-data.api';
8+
9+
@Controller('external/http-data')
10+
@ApiTags('HTTP Data')
11+
export class HttpDataController {
12+
constructor(
13+
private readonly mapper: HttpDataMapper,
14+
private readonly streamHttpDataService: StreamHttpDataService
15+
) {}
16+
17+
@Auth(Role.viewer())
18+
@Get('data-marts/:dataMartId.ndjson')
19+
@StreamHttpDataSpec()
20+
async stream(
21+
@Param('dataMartId') dataMartId: string,
22+
@Query() rawQuery: Record<string, unknown>,
23+
@AuthContext() ctx: AuthorizationContext,
24+
@Res() res: Response
25+
): Promise<void> {
26+
const command = this.mapper.toStreamHttpDataCommand(dataMartId, ctx, rawQuery);
27+
await this.streamHttpDataService.stream(command, res);
28+
}
29+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import { applyDecorators } from '@nestjs/common';
2+
import {
3+
ApiHeader,
4+
ApiOkResponse,
5+
ApiOperation,
6+
ApiParam,
7+
ApiProduces,
8+
ApiQuery,
9+
ApiResponse,
10+
} from '@nestjs/swagger';
11+
12+
export function StreamHttpDataSpec() {
13+
return applyDecorators(
14+
ApiOperation({
15+
summary: 'Stream Data Mart data as NDJSON',
16+
description:
17+
'Streams rows of a published Data Mart as newline-delimited JSON ' +
18+
'(one data row per line, no envelope). The caller must explicitly select ' +
19+
'columns via the repeated `column` query parameter; row objects use those ' +
20+
'column names as keys in the requested order. Authenticated with the ODM ' +
21+
'member token via `x-owox-authorization`. Creates one DataMartRun of type ' +
22+
'HTTP_DATA per request, available through the run history endpoint.',
23+
}),
24+
ApiHeader({
25+
name: 'x-owox-authorization',
26+
description: 'ODM member token',
27+
required: true,
28+
}),
29+
ApiParam({
30+
name: 'dataMartId',
31+
description:
32+
'Data Mart identifier (UUID for native Data Marts, free-form string for legacy ones)',
33+
}),
34+
ApiQuery({
35+
name: 'column',
36+
description:
37+
'Column to include in the output. Repeat the parameter to select multiple ' +
38+
'columns; the order of repetition is preserved in row objects.',
39+
required: true,
40+
isArray: true,
41+
type: String,
42+
example: ['date', 'revenue'],
43+
}),
44+
ApiQuery({
45+
name: 'filter',
46+
description:
47+
'Optional base64url-encoded JSON matching the `FilterConfig` schema (same shape as Reports), ' +
48+
'applied server-side before streaming. Encode the JSON array with base64url ' +
49+
'(URL-safe base64: `-`/`_` instead of `+`/`/`, no `=` padding) so it survives the query string. ' +
50+
'Example JSON: `[{"column":"date","operator":"gte","value":"2026-01-01"}]` → ' +
51+
'base64url `W3siY29sdW1uIjoiZGF0ZSIsIm9wZXJhdG9yIjoiZ3RlIiwidmFsdWUiOiIyMDI2LTAxLTAxIn1d`.',
52+
required: false,
53+
type: String,
54+
example: 'W3siY29sdW1uIjoiZGF0ZSIsIm9wZXJhdG9yIjoiZ3RlIiwidmFsdWUiOiIyMDI2LTAxLTAxIn1d',
55+
}),
56+
ApiQuery({
57+
name: 'sort',
58+
description:
59+
'Optional base64url-encoded JSON matching the `SortConfig` schema, applied server-side. ' +
60+
'Encode the JSON array with base64url ' +
61+
'(URL-safe base64: `-`/`_` instead of `+`/`/`, no `=` padding). ' +
62+
'Example JSON: `[{"column":"date","direction":"desc"}]` → ' +
63+
'base64url `W3siY29sdW1uIjoiZGF0ZSIsImRpcmVjdGlvbiI6ImRlc2MifV0`.',
64+
required: false,
65+
type: String,
66+
example: 'W3siY29sdW1uIjoiZGF0ZSIsImRpcmVjdGlvbiI6ImRlc2MifV0',
67+
}),
68+
ApiQuery({
69+
name: 'limit',
70+
description: 'Optional row cap (1..10_000_000).',
71+
required: false,
72+
type: Number,
73+
}),
74+
ApiProduces('application/x-ndjson'),
75+
ApiOkResponse({
76+
description:
77+
'NDJSON stream of row objects. Each line is a complete JSON object whose ' +
78+
'keys match the requested columns. Response headers include `x-owox-run-id` ' +
79+
'with the created DataMartRun ID and `x-owox-columns` with the requested ' +
80+
'column list as a base64url-encoded JSON array.',
81+
headers: {
82+
'x-owox-run-id': {
83+
description: 'ID of the created DataMartRun (HTTP_DATA) for traceability',
84+
schema: { type: 'string' },
85+
},
86+
'x-owox-columns': {
87+
description:
88+
'Requested columns, in row-object order, as a base64url-encoded JSON array ' +
89+
'(lets clients recover the column list even for an empty result stream)',
90+
schema: { type: 'string' },
91+
},
92+
},
93+
content: {
94+
'application/x-ndjson': {
95+
schema: {
96+
type: 'string',
97+
example:
98+
'{"date":"2026-05-01","revenue":42.5}\n' + '{"date":"2026-05-02","revenue":51.0}\n',
99+
},
100+
},
101+
},
102+
}),
103+
ApiResponse({
104+
status: 400,
105+
description:
106+
'Invalid request: missing or unknown column, duplicate columns, ' +
107+
'forbidden pagination parameter (`pageToken`/`offset`), malformed filter/sort/limit, ' +
108+
'or unsupported storage type.',
109+
}),
110+
ApiResponse({ status: 401, description: 'Missing or invalid `x-owox-authorization` token.' }),
111+
ApiResponse({
112+
status: 403,
113+
description: 'Caller is authenticated but lacks `Action.USE` on the requested Data Mart.',
114+
}),
115+
ApiResponse({
116+
status: 404,
117+
description: 'Data Mart not visible in the caller’s project, or not published.',
118+
})
119+
);
120+
}

apps/backend/src/data-marts/data-marts.module.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ import { AccessDecisionService } from './services/access-decision';
88
import { UpdateAvailabilityService } from './use-cases/update-availability.service';
99
import { MemberOwnershipWarningsService } from './services/member-ownership-warnings.service';
1010
import { LookerStudioConnectorController } from './controllers/external/looker-studio-connector.controller';
11+
import { HttpDataController } from './controllers/external/http-data.controller';
12+
import { HttpDataMapper } from './mappers/http-data.mapper';
13+
import { StreamHttpDataService } from './use-cases/stream-http-data.service';
14+
import { HttpDataStreamWriter } from './services/http-data/http-data-stream-writer.service';
15+
import { HttpDataRequestValidator } from './services/http-data/http-data-request-validator.service';
16+
import { HttpDataColumnValidator } from './services/http-data/http-data-column-validator.service';
1117
import { MarkdownParserController } from './controllers/markdown-parser.controller';
1218
import { ReportController } from './controllers/report.controller';
1319
import { InsightController } from './controllers/insight.controller';
@@ -441,6 +447,7 @@ import { ProjectMemberApiKeysModule } from '../project-member-api-keys/project-m
441447
ContextController,
442448
ProjectMembersController,
443449
ProjectMemberApiKeysController,
450+
HttpDataController,
444451
],
445452
providers: [
446453
...dataStorageResolverProviders,
@@ -727,6 +734,11 @@ import { ProjectMemberApiKeysModule } from '../project-member-api-keys/project-m
727734
UpdateProjectMemberApiKeyService,
728735
RevokeProjectMemberApiKeyService,
729736
ProjectMemberApiKeysMapper,
737+
HttpDataMapper,
738+
StreamHttpDataService,
739+
HttpDataStreamWriter,
740+
HttpDataRequestValidator,
741+
HttpDataColumnValidator,
730742
],
731743
})
732744
export class DataMartsModule {

apps/backend/src/data-marts/data-storage-types/athena/services/athena-report-reader.service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from '../interfaces/athena-reader-state.interface';
1212
import { Injectable, Logger, Scope } from '@nestjs/common';
1313
import { DataStorageType } from '../../enums/data-storage-type.enum';
14-
import { Report } from '../../../entities/report.entity';
14+
import { ReportLike } from '../../../dto/domain/report-like-read-plan';
1515
import { ReportDataDescription } from '../../../dto/domain/report-data-description.dto';
1616
import { ReportDataBatch } from '../../../dto/domain/report-data-batch.dto';
1717
import { DataMartDefinition } from '../../../dto/schemas/data-mart-table-definitions/data-mart-definition';
@@ -46,14 +46,18 @@ export class AthenaReportReader implements DataStorageReportReader {
4646
) {}
4747

4848
async prepareReportData(
49-
report: Report,
49+
report: ReportLike,
5050
options?: PrepareReportDataOptions
5151
): Promise<ReportDataDescription> {
5252
const { storage, definition, schema } = report.dataMart;
5353
if (!storage || !definition) {
5454
throw new Error('Data Mart is not properly configured');
5555
}
5656

57+
if (options?.sqlOverrideParams?.length) {
58+
throw new Error('Athena report reader does not support parameterized sqlOverride');
59+
}
60+
5761
if (!schema) {
5862
throw new Error('Athena data mart schema is required for header generation');
5963
}

apps/backend/src/data-marts/data-storage-types/bigquery/services/bigquery-report-reader.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
isConnectorDefinition,
1010
isTableDefinition,
1111
} from '../../../dto/schemas/data-mart-table-definitions/data-mart-definition.guards';
12-
import { Report } from '../../../entities/report.entity';
12+
import { ReportLike } from '../../../dto/domain/report-like-read-plan';
1313
import { DataMartDefinitionType } from '../../../enums/data-mart-definition-type.enum';
1414
import { isBigQueryDataMartSchema } from '../../data-mart-schema.guards';
1515
import { isBigQueryConfig } from '../../data-storage-config.guards';
@@ -61,7 +61,7 @@ export class BigQueryReportReader implements DataStorageReportReader {
6161
) {}
6262

6363
public async prepareReportData(
64-
report: Report,
64+
report: ReportLike,
6565
options?: PrepareReportDataOptions
6666
): Promise<ReportDataDescription> {
6767
const { storage, definitionType, definition, schema } = report.dataMart;

apps/backend/src/data-marts/data-storage-types/databricks/services/databricks-report-reader.service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
import { ReportDataBatch } from '../../../dto/domain/report-data-batch.dto';
99
import { ReportDataDescription } from '../../../dto/domain/report-data-description.dto';
1010
import { ReportDataHeader } from '../../../dto/domain/report-data-header.dto';
11-
import { Report } from '../../../entities/report.entity';
11+
import { ReportLike } from '../../../dto/domain/report-like-read-plan';
1212
import { DataMartDefinition } from '../../../dto/schemas/data-mart-table-definitions/data-mart-definition';
1313
import { DataStorageReportReaderState } from '../../interfaces/data-storage-report-reader-state.interface';
1414
import { DataStorage } from '../../../entities/data-storage.entity';
@@ -44,14 +44,18 @@ export class DatabricksReportReader implements DataStorageReportReader {
4444
) {}
4545

4646
async prepareReportData(
47-
report: Report,
47+
report: ReportLike,
4848
options?: PrepareReportDataOptions
4949
): Promise<ReportDataDescription> {
5050
const { storage, definition, schema } = report.dataMart;
5151
if (!storage || !definition) {
5252
throw new Error('Data Mart is not properly configured');
5353
}
5454

55+
if (options?.sqlOverrideParams?.length) {
56+
throw new Error('Databricks report reader does not support parameterized sqlOverride');
57+
}
58+
5559
if (!schema) {
5660
throw new Error('Databricks data mart schema is required for header generation');
5761
}

apps/backend/src/data-marts/data-storage-types/interfaces/data-storage-report-reader.interface.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { TypedComponent } from '../../../common/resolver/typed-component.resolver';
22
import { ReportDataHeader } from '../../dto/domain/report-data-header.dto';
33
import { DataStorageType } from '../enums/data-storage-type.enum';
4-
import { Report } from '../../entities/report.entity';
4+
import { ReportLike } from '../../dto/domain/report-like-read-plan';
55
import { ReportDataDescription } from '../../dto/domain/report-data-description.dto';
66
import { ReportDataBatch } from '../../dto/domain/report-data-batch.dto';
77
import { DataStorageReportReaderState } from './data-storage-report-reader-state.interface';
@@ -44,7 +44,7 @@ export interface DataStorageReportReader extends TypedComponent<DataStorageType>
4444
* Prepares report data for reading
4545
*/
4646
prepareReportData(
47-
report: Report,
47+
report: ReportLike,
4848
options?: PrepareReportDataOptions
4949
): Promise<ReportDataDescription>;
5050

apps/backend/src/data-marts/data-storage-types/redshift/services/redshift-report-reader.service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
PrepareReportDataOptions,
55
} from '../../interfaces/data-storage-report-reader.interface';
66
import { DataStorageType } from '../../enums/data-storage-type.enum';
7-
import { Report } from '../../../entities/report.entity';
7+
import { ReportLike } from '../../../dto/domain/report-like-read-plan';
88
import { ReportDataDescription } from '../../../dto/domain/report-data-description.dto';
99
import { ReportDataBatch } from '../../../dto/domain/report-data-batch.dto';
1010
import { ReportDataHeader } from '../../../dto/domain/report-data-header.dto';
@@ -40,7 +40,7 @@ export class RedshiftReportReader implements DataStorageReportReader {
4040
) {}
4141

4242
async prepareReportData(
43-
report: Report,
43+
report: ReportLike,
4444
options?: PrepareReportDataOptions
4545
): Promise<ReportDataDescription> {
4646
const { storage, definition, schema } = report.dataMart;
@@ -49,6 +49,10 @@ export class RedshiftReportReader implements DataStorageReportReader {
4949
throw new Error('Data Mart is not properly configured');
5050
}
5151

52+
if (options?.sqlOverrideParams?.length) {
53+
throw new Error('Redshift report reader does not support parameterized sqlOverride');
54+
}
55+
5256
if (!schema || !isRedshiftDataMartSchema(schema)) {
5357
throw new Error('Redshift data mart schema is required');
5458
}

apps/backend/src/data-marts/data-storage-types/snowflake/services/snowflake-report-reader.service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
import { ReportDataBatch } from '../../../dto/domain/report-data-batch.dto';
88
import { ReportDataDescription } from '../../../dto/domain/report-data-description.dto';
99
import { ReportDataHeader } from '../../../dto/domain/report-data-header.dto';
10-
import { Report } from '../../../entities/report.entity';
10+
import { ReportLike } from '../../../dto/domain/report-like-read-plan';
1111
import { DataMartDefinition } from '../../../dto/schemas/data-mart-table-definitions/data-mart-definition';
1212
import { DataStorageReportReaderState } from '../../interfaces/data-storage-report-reader-state.interface';
1313
import { DataStorage } from '../../../entities/data-storage.entity';
@@ -39,14 +39,18 @@ export class SnowflakeReportReader implements DataStorageReportReader {
3939
) {}
4040

4141
async prepareReportData(
42-
report: Report,
42+
report: ReportLike,
4343
options?: PrepareReportDataOptions
4444
): Promise<ReportDataDescription> {
4545
const { storage, definition, schema } = report.dataMart;
4646
if (!storage || !definition) {
4747
throw new Error('Data Mart is not properly configured');
4848
}
4949

50+
if (options?.sqlOverrideParams?.length) {
51+
throw new Error('Snowflake report reader does not support parameterized sqlOverride');
52+
}
53+
5054
if (!schema) {
5155
throw new Error('Snowflake data mart schema is required for header generation');
5256
}

0 commit comments

Comments
 (0)