Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added

- `DataType.Decimal128` write support on the unary/streaming paths: `Table.addDecimalFieldColumn(name, precision, scale)` ships a proto `DecimalTypeExtension` so auto-create produces a `DECIMAL(precision, scale)` column. Values accept decimal strings (exact), numbers, or bigints; excess fractional digits round half-up. Unsupported on the bulk Arrow path (throws `ValueError`).
- Pluggable `EndpointSelector` for multi-endpoint failover: `RandomSelector` (default), `RoundRobinSelector`, and `OutlierDetectingSelector` (Envoy-style consecutive-failure ejection with exponential back-off). Configure via `ConfigBuilder.withEndpointSelector()`; factory helpers `randomSelector()` / `roundRobinSelector()` / `outlierDetectingSelector()`.
- Retry-time exclusion of already-failed peers: within a single `write()` retry sequence, a peer that just failed is excluded so one dead endpoint cannot burn the whole retry budget.
- `GreptimeStatusCode` enum and `isRetryableStatusCode()` mirroring GreptimeDB's `status_code.rs`. `StreamWriter` / `BulkStreamWriter` now expose `endpoint`.
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"module": "./dist/esm/index.js",
"scripts": {
"build": "tshy",
"prepare": "tshy",
"clean": "rimraf dist .tshy .tshy-build",
Comment thread
killme2008 marked this conversation as resolved.
"codegen": "buf generate",
"codegen:check": "pnpm codegen && git diff --exit-code src/generated",
Expand Down
8 changes: 8 additions & 0 deletions src/bulk/arrow-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ function arrowTypeFor(dt: DataType): ArrowDataType {
return new TimeNanosecond();
case DataType.Json:
return new Binary();
case DataType.Decimal128:
throw new ValueError(
'Decimal128 is not supported on the bulk Arrow path; use the unary or streaming write',
);
Comment thread
killme2008 marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -212,6 +216,10 @@ function normalizeValue(v: unknown, dt: DataType): unknown {
return asBigInt('TimeNanosecond', v, I64_MIN, I64_MAX);
case DataType.Json:
return TEXT_ENCODER.encode(safeStringifyJson(v));
case DataType.Decimal128:
throw new ValueError(
'Decimal128 is not supported on the bulk Arrow path; use the unary or streaming write',
);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export {
Table,
validateTableSchema,
type ColumnSpec,
type DecimalSpec,
type Semantic,
type TableSchema,
} from './table/index.js';
Expand Down
8 changes: 8 additions & 0 deletions src/table/data-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ export const DataType = {
TimeMicrosecond: 'TimeMicrosecond',
TimeNanosecond: 'TimeNanosecond',
Json: 'Json',
/**
* Fixed-point decimal backed by a 128-bit integer. A column of this type carries
* `precision`/`scale` metadata (see `ColumnSpec.decimal`) that the encoder ships as a
* proto `DecimalTypeExtension`. Only supported on the unary/streaming write paths.
*/
Decimal128: 'Decimal128',
} as const;

export type DataType = (typeof DataType)[keyof typeof DataType];
Expand Down Expand Up @@ -135,5 +141,7 @@ export function toProtoDataType(t: DataType): ColumnDataType {
return ColumnDataType.TIME_NANOSECOND;
case DataType.Json:
return ColumnDataType.JSON;
case DataType.Decimal128:
return ColumnDataType.DECIMAL128;
}
}
8 changes: 7 additions & 1 deletion src/table/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
export { DataType, Precision, type Semantic } from './data-type.js';
export { Table } from './table.js';
export { validateTableSchema, columnIndex, type ColumnSpec, type TableSchema } from './schema.js';
export {
validateTableSchema,
columnIndex,
type ColumnSpec,
type DecimalSpec,
type TableSchema,
} from './schema.js';
export { toProtoValue } from './value.js';
29 changes: 28 additions & 1 deletion src/table/schema.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import { SchemaError } from '../errors.js';
import type { DataType, Precision, Semantic } from './data-type.js';
import { DataType, type Precision, type Semantic } from './data-type.js';
import { isValidDecimalParams } from './validators.js';

export interface ColumnSpec {
readonly name: string;
readonly dataType: DataType;
readonly semantic: Semantic;
readonly precision?: Precision;
/** Required for `DataType.Decimal128`; must be `undefined` for every other type. */
readonly decimal?: DecimalSpec;
}

export interface DecimalSpec {
/** Total number of significant digits, 1..38. */
readonly precision: number;
/** Number of digits to the right of the decimal point, 0..precision. */
readonly scale: number;
}

/**
Expand Down Expand Up @@ -33,6 +43,23 @@ export function validateTableSchema(schema: TableSchema): void {
throw new SchemaError(`table "${schema.tableName}" has duplicate column "${c.name}"`);
}
seen.add(c.name);
if (c.dataType === DataType.Decimal128) {
if (c.decimal === undefined) {
throw new SchemaError(
`Decimal128 column "${c.name}" requires precision/scale; use addDecimalFieldColumn()`,
);
}
if (!isValidDecimalParams(c.decimal.precision, c.decimal.scale)) {
throw new SchemaError(
`Decimal128 column "${c.name}" has invalid precision/scale (precision=${c.decimal.precision}, ` +
`scale=${c.decimal.scale}); require 1<=precision<=38 and 0<=scale<=precision`,
);
}
} else if (c.decimal !== undefined) {
throw new SchemaError(
`column "${c.name}" carries decimal metadata but is not a Decimal128 column`,
);
}
if (c.semantic === 'timestamp') timestampCount++;
}
if (timestampCount !== 1) {
Expand Down
33 changes: 32 additions & 1 deletion src/table/table.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { SchemaError } from '../errors.js';
import {
DataType,
Precision,
isTimestampDataType,
precisionToTimestampDataType,
type DataType,
} from './data-type.js';
import { validateTableSchema, type ColumnSpec, type TableSchema } from './schema.js';
import { isValidDecimalParams } from './validators.js';

/**
* Table builder — accumulates column definitions and rows, then hands off to the writer.
Expand Down Expand Up @@ -42,6 +43,26 @@ export class Table {
return this.addColumn({ name, dataType, semantic: 'field' });
}

/**
* Add a `DECIMAL(precision, scale)` FIELD column. Values are passed as decimal strings
* (recommended for exactness), numbers, or bigints; the encoder ships them as a 128-bit
* unscaled integer. Only supported on the unary/streaming write paths.
*/
public addDecimalFieldColumn(name: string, precision: number, scale: number): this {
if (!isValidDecimalParams(precision, scale)) {
throw new SchemaError(
`Decimal128 column "${name}" has invalid precision/scale (precision=${precision}, ` +
`scale=${scale}); require 1<=precision<=38 and 0<=scale<=precision`,
);
}
return this.addColumn({
name,
dataType: DataType.Decimal128,
semantic: 'field',
decimal: { precision, scale },
});
}

/**
* Add the TIMESTAMP column. Exactly one per table is required. Default precision is
* millisecond, matching Go's `types.TIMESTAMP_MILLISECOND` default.
Expand All @@ -62,6 +83,16 @@ export class Table {
`timestamp column "${spec.name}" requires a Timestamp* dataType, got ${spec.dataType}`,
);
}
if (spec.dataType === DataType.Decimal128 && spec.decimal === undefined) {
throw new SchemaError(
`Decimal128 column "${spec.name}" requires precision/scale; use addDecimalFieldColumn()`,
);
}
if (spec.decimal !== undefined && spec.dataType !== DataType.Decimal128) {
throw new SchemaError(
`column "${spec.name}" carries decimal metadata but is not a Decimal128 column`,
);
}
if (this._columns.some((c) => c.name === spec.name)) {
throw new SchemaError(`column "${spec.name}" already exists on table "${this._tableName}"`);
}
Expand Down
128 changes: 128 additions & 0 deletions src/table/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,134 @@ export const I64_MAX = (1n << 63n) - 1n;
export const U64_MIN = 0n;
export const U64_MAX = (1n << 64n) - 1n;

/**
* Expand an exponential decimal string (e.g. `"1e-7"`, `"1.5E3"`) into plain form. Both
* `String(1e-7)` and decimal.js `.toString()` can emit exponential notation, which the strict
* decimal parser would otherwise reject. Pass-through when there is no exponent.
*/
// Upper bound on zero-padding when expanding exponential notation. Any value needing more
// digits than this cannot fit DECIMAL(<=38, <=38), so the cap rejects crafted exponents
// (e.g. "1e1000000000") that would otherwise allocate a multi-GB string before the range
// check could reject them.
const MAX_DECIMAL_EXPANSION = 1024;

function expandExponent(s: string): string {
// `\d*` (not `\d+`) on the integer part so leading-dot forms like ".5e1" expand too; an
// empty mantissa ("e5", ".e5") is left for the caller's parser to reject.
const m = /^([+-]?)(\d*)(?:\.(\d*))?[eE]([+-]?\d+)$/.exec(s);
if (!m) return s;
const sign = m[1] ?? '';
const intPart = m[2] ?? '';
const fracPart = m[3] ?? '';
if (intPart === '' && fracPart === '') return s;
const exp = parseInt(m[4] ?? '0', 10);
const digits = intPart + fracPart;
// Position of the decimal point measured from the left, after applying the exponent.
const pointPos = intPart.length + exp;
const pad = pointPos <= 0 ? -pointPos : pointPos - digits.length;
if (pad > MAX_DECIMAL_EXPANSION) {
throw new ValueError(`Decimal128 exponent in "${s}" is out of range`);
}
if (pointPos <= 0) {
return `${sign}0.${'0'.repeat(-pointPos)}${digits}`;
}
if (pointPos >= digits.length) {
return `${sign}${digits}${'0'.repeat(pointPos - digits.length)}`;
}
return `${sign}${digits.slice(0, pointPos)}.${digits.slice(pointPos)}`;
}

/** Decimal128 precision/scale rule: integers with `1<=precision<=38` and `0<=scale<=precision`. */
export function isValidDecimalParams(precision: number, scale: number): boolean {
return (
Number.isInteger(precision) &&
Number.isInteger(scale) &&
precision >= 1 &&
precision <= 38 &&
scale >= 0 &&
scale <= precision
);
}

function decimalDigitCount(n: bigint): number {
const abs = n < 0n ? -n : n;
return abs === 0n ? 0 : abs.toString().length;
}

/**
* Convert a decimal value to the unscaled 128-bit integer that GreptimeDB stores for a
* `DECIMAL(precision, scale)` column (i.e. `round(value * 10^scale)`). Strings are parsed
* exactly; numbers are stringified via `String(value)` then {@link expandExponent} (a `number`
* may already have lost precision before reaching us). Excess fractional digits are rounded
* half-up on the magnitude (away from zero for negatives, matching Java `BigDecimal.HALF_UP`).
*/
export function decimalToUnscaled(
value: string | number | bigint,
precision: number,
scale: number,
): bigint {
if (!isValidDecimalParams(precision, scale)) {
throw new ValueError(
`Decimal128 invalid precision/scale (precision=${precision}, scale=${scale}); require 1<=precision<=38 and 0<=scale<=precision`,
);
}

let s: string;
if (typeof value === 'bigint') {
s = value.toString();
} else if (typeof value === 'number') {
if (!Number.isFinite(value)) {
throw new ValueError(`Decimal128 expected a finite number, got ${value}`);
}
s = expandExponent(String(value));
} else if (typeof value === 'string') {
s = expandExponent(value.trim());
} else {
throw new ValueError(`Decimal128 expected string|number|bigint, got ${typeof value}`);
}

const m = /^([+-]?)(\d*)(?:\.(\d*))?$/.exec(s);
if (!m || ((m[2] ?? '') === '' && (m[3] ?? '') === '')) {
throw new ValueError(`Decimal128 cannot parse "${value}" as a decimal`);
}
const negative = m[1] === '-';
const intPart = m[2] ?? '';
const fracPart = m[3] ?? '';

let magnitude: bigint;
if (fracPart.length <= scale) {
magnitude = BigInt(`${intPart}${fracPart}${'0'.repeat(scale - fracPart.length)}` || '0');
} else {
// More fractional digits than the column scale: keep `scale` digits, round half-up
// on the first dropped digit.
const kept = fracPart.slice(0, scale);
const roundDigit = fracPart.charCodeAt(scale) - 48;
magnitude = BigInt(`${intPart}${kept}` || '0');
if (roundDigit >= 5) magnitude += 1n;
}

const unscaled = negative ? -magnitude : magnitude;
if (decimalDigitCount(unscaled) > precision) {
throw new ValueError(
`Decimal128 value "${value}" exceeds DECIMAL(${precision}, ${scale}) range`,
);
}
return unscaled;
}

/**
* Split a signed unscaled integer into the proto `Decimal128 { hi, lo }` pair. `hi`/`lo`
* are the high/low 64 bits of the two's-complement 128-bit representation, each carried as a
* signed int64 on the wire (the server reinterprets `lo` as unsigned when reconstructing).
*/
export function decimal128Parts(unscaled: bigint): { hi: bigint; lo: bigint } {
const u128 = BigInt.asUintN(128, unscaled);
return {
lo: BigInt.asIntN(64, u128),
hi: BigInt.asIntN(64, u128 >> 64n),
};
}

/**
* Return `Date.getTime()` as a finite number or throw `ValueError`. A `Date`
* constructed from bad input (e.g. `new Date('not a date')`) has `.getTime()`
Expand Down
20 changes: 19 additions & 1 deletion src/table/value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import { create } from '@bufbuild/protobuf';
import { ValueSchema, type Value } from '../generated/greptime/v1/row_pb.js';
import { Decimal128Schema } from '../generated/greptime/v1/common_pb.js';
import { ValueError } from '../errors.js';
import { DataType } from './data-type.js';
import type { DecimalSpec } from './schema.js';
import {
I64_MAX,
I64_MIN,
Expand All @@ -17,6 +19,8 @@ import {
asNumber,
asString,
dateToMs,
decimal128Parts,
decimalToUnscaled,
safeStringifyJson,
} from './validators.js';

Expand Down Expand Up @@ -63,7 +67,7 @@ function asTimestamp(name: string, v: unknown, dataType: DataType): bigint {
* (the protobuf encoding of "null" for row-insert) — callers must reject null-on-non-nullable
* columns before reaching this layer if the server schema forbids nulls.
*/
export function toProtoValue(ts: unknown, dataType: DataType): Value {
export function toProtoValue(ts: unknown, dataType: DataType, decimal?: DecimalSpec): Value {
if (ts === null || ts === undefined) return EMPTY_VALUE;

switch (dataType) {
Expand Down Expand Up @@ -188,5 +192,19 @@ export function toProtoValue(ts: unknown, dataType: DataType): Value {
valueData: { case: 'stringValue', value: safeStringifyJson(ts) },
});
}
case DataType.Decimal128: {
if (!decimal) {
throw new ValueError('Decimal128 column is missing precision/scale metadata');
}
const unscaled = decimalToUnscaled(
ts as string | number | bigint,
decimal.precision,
decimal.scale,
);
const { hi, lo } = decimal128Parts(unscaled);
return create(ValueSchema, {
valueData: { case: 'decimal128Value', value: create(Decimal128Schema, { hi, lo }) },
});
}
}
}
Loading
Loading