Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions doc/clickhouse_fdw.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ This library contains a single PostgreSQL extension, a [foreign data wrapper]
for [ClickHouse] databases. It supports PostgreSQL 13 and higher and
ClickHouse 22 and higher.



## Usage

### Functions
Expand Down Expand Up @@ -124,6 +122,26 @@ any of these functions cannot be pushed down they will raise an exception.
* [uniqExact](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/uniqexact)
* [uniqHLL12](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/uniqhll12)
* [uniqTheta](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/uniqthetasketch)
* [quantile](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/quantile)
* [quantileExact](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/quantileexact)

#### Parametric Aggregates

To pass parameters to ClickHouse [Parametric aggregate functions], pass a call
to the special `params()` function as the first argument. For example, a
ClickHouse query such as

```sql
SELECT quantile(0.25)(val) FROM t;
```

Should be written in PostgreSQL as:

```sql
SELECT quantile(params(0.25), val) FROM t;
```

Omit `params()` to get the default value, where relevant.

## Authors

Expand All @@ -141,3 +159,4 @@ any of these functions cannot be pushed down they will raise an exception.
[foreign data wrapper]: https://www.postgresql.org/docs/current/fdwhandler.html
"PostgreSQL Docs: Writing a Foreign Data Wrapper"
[ClickHouse]: https://clickhouse.com/clickhouse
[Parametric aggregate functions]: https://clickhouse.com/docs/sql-reference/aggregate-functions/parametric-functions
93 changes: 73 additions & 20 deletions sql/clickhouse_fdw.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FOREIGN DATA WRAPPER clickhouse_fdw
HANDLER clickhouse_fdw_handler
VALIDATOR clickhouse_fdw_validator;
HANDLER clickhouse_fdw_handler
VALIDATOR clickhouse_fdw_validator;

-- Function used for parametric aggregate parameters.
CREATE TYPE params AS ();
CREATE FUNCTION params(VARIADIC "any") RETURNS params
AS 'MODULE_PATHNAME', 'clickhouse_func_push_fail'
LANGUAGE C STRICT;

-- Function used by variadic aggregate functions when pushdown fails. The
-- first argument should describe the operation that should have been pushed
Expand All @@ -34,12 +40,27 @@ CREATE FUNCTION ch_push_func_text(TEXT) RETURNS TEXT
AS 'MODULE_PATHNAME', 'clickhouse_func_push_fail'
LANGUAGE C STRICT;

-- Function used by parametric aggregates that take an array for the
-- parameters plus an item of any type.
CREATE FUNCTION ch_param_any_text(TEXT, params, "any") RETURNS TEXT
AS 'MODULE_PATHNAME', 'clickhouse_func_push_fail'
LANGUAGE C STRICT;

-- Function used by aggregates that take a single value of any type.
CREATE FUNCTION ch_any_text(TEXT, "any") RETURNS TEXT
AS 'MODULE_PATHNAME', 'clickhouse_func_push_fail'
LANGUAGE C STRICT;

-- No-op functions used for aggregate final functions that specific types.
-- Allows their states to be text. Return NULL.
CREATE FUNCTION ch_noop_bigint(TEXT) RETURNS BIGINT
AS 'MODULE_PATHNAME', 'clickhouse_noop'
LANGUAGE C STRICT;

CREATE FUNCTION ch_noop_float8(TEXT) RETURNS float8
AS 'MODULE_PATHNAME', 'clickhouse_noop'
LANGUAGE C STRICT;

-- Create error-raising argMax aggregate that should be pushed down to
-- ClickHouse.
CREATE FUNCTION ch_argmax(anyelement, anyelement, anycompatible)
Expand Down Expand Up @@ -74,54 +95,86 @@ CREATE AGGREGATE argMin(anyelement, anycompatible)
stype = anyelement
);

CREATE AGGREGATE quantile(params, "any")
(
SFUNC = ch_param_any_text, -- raises error
INITCOND = 'aggregate quantile()', -- what to push down
STYPE = TEXT, -- state type
FINALFUNC = ch_noop_float8 -- returns NULL
);

CREATE AGGREGATE quantile("any")
(
SFUNC = ch_any_text, -- raises error
INITCOND = 'aggregate quantile()', -- what to push down
STYPE = TEXT, -- state type
FINALFUNC = ch_noop_float8 -- returns NULL
);

CREATE AGGREGATE quantileExact(params, "any")
(
SFUNC = ch_param_any_text, -- raises error
INITCOND = 'aggregate quantileExact()', -- what to push down
STYPE = TEXT, -- state type
FINALFUNC = ch_noop_float8 -- returns NULL
);

CREATE AGGREGATE quantileExact("any")
(
SFUNC = ch_any_text, -- raises error
INITCOND = 'aggregate quantileExact()', -- what to push down
STYPE = TEXT, -- state type
FINALFUNC = ch_noop_float8 -- returns NULL
);

-- Variadic aggregates that take any number of arguments of any type and
-- return a UINT64 (we settle for BIGINT).
CREATE AGGREGATE uniq(VARIADIC "any")
(
SFUNC = ch_push_agg_text, -- raises error
INITCOND = 'aggregate uniq()', -- what to push down
STYPE = TEXT, -- state type
FINALFUNC = ch_noop_bigint -- returns NULL
INITCOND = 'aggregate uniq()', -- what to push down
STYPE = TEXT, -- state type
FINALFUNC = ch_noop_bigint -- returns NULL
);

CREATE AGGREGATE uniqExact(VARIADIC "any")
(
SFUNC = ch_push_agg_text,
INITCOND = 'aggregate uniqExact()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
INITCOND = 'aggregate uniqExact()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
);

CREATE AGGREGATE uniqCombined(VARIADIC "any")
(
SFUNC = ch_push_agg_text,
INITCOND = 'aggregate uniqCombined()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
INITCOND = 'aggregate uniqCombined()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
);

CREATE AGGREGATE uniqCombined64(VARIADIC "any")
(
SFUNC = ch_push_agg_text,
INITCOND = 'aggregate uniqCombined64()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
INITCOND = 'aggregate uniqCombined64()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
);

CREATE AGGREGATE uniqHLL12(VARIADIC "any")
(
SFUNC = ch_push_agg_text,
INITCOND = 'aggregate uniqHLL12()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
INITCOND = 'aggregate uniqHLL12()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
);

CREATE AGGREGATE uniqTheta(VARIADIC "any")
(
SFUNC = ch_push_agg_text,
INITCOND = 'aggregate uniqTheta()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
INITCOND = 'aggregate uniqTheta()',
STYPE = TEXT,
FINALFUNC = ch_noop_bigint
);

/*
Expand Down
26 changes: 21 additions & 5 deletions src/custom_types.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
#define F_EXTRACT_TEXT_TIMESTAMPTZ 6203
#define F_EXTRACT_TEXT_DATE 6199
#endif
// regexp_like was added in Postgres 15; Mock it for earlier versions.
#if PG_VERSION_NUM < 150000
#define F_REGEXP_LIKE_TEXT_TEXT 6263
#endif


static HTAB *custom_objects_cache = NULL;
Expand Down Expand Up @@ -119,9 +123,7 @@ CustomObjectDef *chfdw_check_for_custom_function(Oid funcid)
case F_STRPOS:
case F_BTRIM_TEXT_TEXT:
case F_BTRIM_TEXT:
#if PG_VERSION_NUM >= 150000
case F_REGEXP_LIKE_TEXT_TEXT:
#endif
special_builtin = true;
break;
default:
Expand All @@ -141,7 +143,6 @@ CustomObjectDef *chfdw_check_for_custom_function(Oid funcid)
entry = hash_search(custom_objects_cache, (void *) &funcid, HASH_ENTER, NULL);
entry->cf_oid = funcid;
init_custom_entry(entry);

switch (funcid)
{
case F_DATE_TRUNC_TEXT_TIMESTAMPTZ:
Expand Down Expand Up @@ -185,14 +186,12 @@ CustomObjectDef *chfdw_check_for_custom_function(Oid funcid)
strcpy(entry->custom_name, "position");
break;
}
#if PG_VERSION_NUM >= 150000
case F_REGEXP_LIKE_TEXT_TEXT:
{
entry->cf_type = CF_MATCH;
strcpy(entry->custom_name, "match");
break;
}
#endif
}

if (special_builtin)
Expand Down Expand Up @@ -313,6 +312,10 @@ CustomObjectDef *chfdw_check_for_custom_function(Oid funcid)
strcpy(entry->custom_name, "uniqTheta");
else if (strcmp(proname, "dictget") == 0)
strcpy(entry->custom_name, "dictGet");
else if (strcmp(proname, "params") == 0)
entry->custom_name[0] = '\1'; // Will have no function name.
else if (strcmp(proname, "quantileexact") == 0)
strcpy(entry->custom_name, "quantileExact");
else
strcpy(entry->custom_name, proname);
}
Expand All @@ -324,6 +327,19 @@ CustomObjectDef *chfdw_check_for_custom_function(Oid funcid)
return entry;
}

FuncExpr * ch_get_params_function(TargetEntry *tle)
{
Node *n = (Node *) tle->expr;
if (nodeTag(n) != T_FuncExpr) return NULL;

FuncExpr *fe = (FuncExpr *) n;
Oid extoid = getExtensionOfObject(ProcedureRelationId, fe->funcid);
if (strcmp(get_extension_name(extoid), "clickhouse_fdw") != 0) return NULL;
if (strcmp(get_func_name(fe->funcid), "params") != 0) return NULL;

return fe;
}

static Oid
find_rowfunc(char *procname, Oid rettype)
{
Expand Down
20 changes: 16 additions & 4 deletions src/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ typedef struct deparse_expr_cxt
StringInfo buf; /* output buffer to append to */
List **params_list; /* exprs that will become remote Params */
CustomObjectDef *func; /* custom function deparse */
void *func_arg; /* custom function context args */
CHFdwRelationInfo *fpinfo; /* fdw relation info */
bool interval_op;
bool array_as_tuple;
bool array_as_tuple; /* determines array output format */
} deparse_expr_cxt;

#define REL_ALIAS_PREFIX "r"
Expand Down Expand Up @@ -3297,6 +3296,7 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
uint8 brcount = 1;
bool use_variadic;
bool omit_star;
int first_arg = 0;

/* Only basic, non-split aggregation accepted. */
Assert(node->aggsplit == AGGSPLIT_SIMPLE);
Expand All @@ -3308,6 +3308,18 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
cdef = context->func;
context->func = appendFunctionName(node->aggfnoid, context);

// Emit params list if first arg is params() function.
if (context->func && context->func->cf_type == CF_CH_FUNCTION && list_length(node->args) > 0)
{
FuncExpr *fe = ch_get_params_function((TargetEntry *) linitial(node->args));
if (fe)
{
// Deparse params first.
deparseFuncExpr(fe, context);
first_arg = 1;
}
}

/* Omit * for COUNT(*) but not COUNT(DISTINCT *)
* https://github.com/ClickHouse/clickhouse_fdw/issues/25
* To be fixed in ClickHouse 25.11, so can be omitted once relased.
Expand Down Expand Up @@ -3366,7 +3378,7 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
else
{
/* default arguments output */
foreach (arg, node->args)
for_each_from (arg, node->args, first_arg)
{
TargetEntry *tle = (TargetEntry *) lfirst(arg);
Node *n = (Node *) tle->expr;
Expand All @@ -3382,7 +3394,7 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
if (use_variadic && lnext(node->args, arg) == NULL)
{
// Convert variadic array to list of arguments.
Assert(nodeTag(node) == T_ArrayExpr);
Assert(nodeTag(n) == T_ArrayExpr);
deparseArrayList((ArrayExpr *) n, context);
}
else
Expand Down
1 change: 1 addition & 0 deletions src/include/fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ typedef struct CustomColumnInfo
} CustomColumnInfo;

extern CustomObjectDef *chfdw_check_for_custom_function(Oid funcid);
extern FuncExpr * ch_get_params_function(TargetEntry *tle);
extern CustomObjectDef *chfdw_check_for_custom_type(Oid typeoid);
extern void chfdw_apply_custom_table_options(CHFdwRelationInfo *fpinfo, Oid relid);
extern CustomColumnInfo *chfdw_get_custom_column_info(Oid relid, uint16 varattno);
Expand Down
Loading