Skip to content

Commit 18ac48b

Browse files
committed
Implement BeginInsert/InsertData/EndInsert flow
Replace the old (and commented-out) code with the new `BeginInsert` / `InsertData` / `EndInsert` pattern in `clickhouse-cpp`. This greatly simplifies the code, as the most of the necessary logic for creating the insert Block object and tracking the state of the insert is now handled by the Client object. Remove the old logic for unwrapping `LowCardinality` columns and fix `column_append` to properly append to those columns. This preserves traffic over the wire, I'm told. Thanks to @slabko for working out how to make that work again. Copy the test from the old FDW, but comment out the bit working with arrays, as it has not yet been updated.
1 parent 571b767 commit 18ac48b

9 files changed

Lines changed: 625 additions & 109 deletions

File tree

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[submodule "clickhouse-cpp"]
22
path = vendor/clickhouse-cpp
3-
url = https://github.com/ClickHouse/clickhouse-cpp.git
3+
url = https://github.com/theory/clickhouse-cpp.git

src/binary.cpp

Lines changed: 64 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -285,98 +285,70 @@ static Oid get_corr_postgres_type(const TypeRef & type)
285285
void ch_binary_insert_state_free(void * c)
286286
{
287287
auto * state = (ch_binary_insert_state *)c;
288-
if (state->columns)
288+
if (state->insert_block)
289289
{
290-
/* try to send empty block that sets proper ClickHouse state */
291-
if (!state->success)
290+
/* Finish the insert to set the proper ClickHouse state */
291+
Client * client = (Client *)state->conn->client;
292+
try
292293
{
293-
try
294-
{
295-
Client * client = (Client *)state->conn->client;
296-
client->Insert(state->table_name, Block());
297-
}
298-
catch (const std::exception & e)
299-
{
300-
// just ignore, next query will fail
301-
elog(NOTICE, "clickhouse_fdw: could not send empty packet");
302-
}
294+
client->EndInsert();
303295
}
304-
305-
delete (std::vector<clickhouse::ColumnRef> *)state->columns;
296+
catch (const std::exception & e)
297+
{
298+
// just ignore, next query will fail
299+
elog(NOTICE, "clickhouse_fdw: could not finish INSERT: - %s", e.what());
300+
}
301+
delete (Block *)state->insert_block;
306302
}
307303
}
308304

309305
void ch_binary_prepare_insert(void * conn, char * query, ch_binary_insert_state * state)
310306
{
311-
throw std::runtime_error("clickhouse_fdw: XXX ch_binary_prepare_insert not implemented");
312-
313-
// std::vector<clickhouse::ColumnRef> * vec = nullptr;
314-
// Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
315-
316-
// try
317-
// {
318-
// client->PrepareInsert(
319-
// std::string(query) + " VALUES", [&state, &vec](const Block & sample_block) {
320-
// if (sample_block.GetColumnCount() == 0)
321-
// return true;
322-
323-
// vec = new std::vector<clickhouse::ColumnRef>();
324-
325-
// state->len = sample_block.GetColumnCount();
326-
327-
// #if PG_VERSION_NUM < 120000
328-
// state->outdesc = CreateTemplateTupleDesc(state->len, false);
329-
// #else
330-
// state->outdesc = CreateTemplateTupleDesc(state->len);
331-
// #endif
332-
333-
// for (size_t i = 0; i < state->len; i++)
334-
// {
335-
// bool error = false;
336-
// clickhouse::ColumnRef col = sample_block[i];
337-
338-
// auto chtype = col->Type();
339-
// if (chtype->GetCode() == Type::LowCardinality)
340-
// {
341-
// chtype = col->As<ColumnLowCardinality>()->GetNestedType();
342-
// }
343-
344-
// Oid pg_type = get_corr_postgres_type(chtype);
345-
346-
// vec->push_back(clickhouse::CreateColumnByType(col->Type()->GetName()));
347-
// const char * colname = sample_block.GetColumnName(i).c_str();
348-
349-
// /* we can't afford long jumps outside of this function */
350-
// PG_TRY();
351-
// {
352-
// TupleDescInitEntry(
353-
// state->outdesc, (AttrNumber)i + 1, colname, pg_type, -1, 0);
354-
// }
355-
// PG_CATCH();
356-
// {
357-
// error = true;
358-
// }
359-
// PG_END_TRY();
360-
361-
// if (error)
362-
// throw std::runtime_error("could not init tuple descriptor");
363-
// }
364-
365-
// return true;
366-
// });
367-
// }
368-
// catch (const std::exception & e)
369-
// {
370-
// client->ResetConnection();
371-
372-
// if (vec != nullptr)
373-
// delete vec;
374-
375-
// elog(ERROR, "clickhouse_fdw: error while insert preparation - %s", e.what());
376-
// }
377-
378-
// if (vec != nullptr)
379-
// state->columns = (void *)vec;
307+
// Start the INSERT.
308+
Block block;
309+
Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
310+
try
311+
{
312+
block = client->BeginInsert(std::string(query) + " VALUES");
313+
}
314+
catch (const std::exception & e)
315+
{
316+
elog(ERROR, "clickhouse_fdw: could prepare insert - %s", e.what());
317+
}
318+
319+
// Setup the column config (or return if no columns).
320+
state->len = block.GetColumnCount();
321+
if (state->len == 0) return;
322+
state->outdesc = CreateTemplateTupleDesc(state->len);
323+
324+
// Iterate over the list of columns returned by ClickHouse.
325+
auto newBlock = new Block();
326+
AttrNumber i = 0;
327+
for (Block::Iterator bi(block); bi.IsValid(); bi.Next())
328+
{
329+
// Copy the ClickHouse Column.
330+
auto chtype = bi.Type()->GetName();
331+
newBlock->AppendColumn(bi.Name(), clickhouse::CreateColumnByType(chtype));
332+
333+
// Determine the Postgres column type.
334+
Oid pg_type = get_corr_postgres_type(bi.Type());
335+
const char * colname = bi.Name().c_str();
336+
337+
PG_TRY();
338+
{
339+
TupleDescInitEntry(state->outdesc, ++i, colname, pg_type, -1, 0);
340+
}
341+
PG_CATCH();
342+
{
343+
// Clean up and re-thrrow.
344+
client->ResetConnection();
345+
delete newBlock;
346+
PG_RE_THROW();
347+
}
348+
PG_END_TRY();
349+
}
350+
351+
state->insert_block = (ch_insert_block_h *) newBlock;
380352
}
381353

382354
static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, bool isnull)
@@ -504,14 +476,7 @@ static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, boo
504476
col->As<ColumnEnum16>()->Append(s);
505477
break;
506478
case Type::Code::LowCardinality: {
507-
// XXX Figure out proper value to create and pass to
508-
// Append.
509-
throw std::runtime_error(
510-
"clickhouse_fdw: XXX unsupported column type "
511-
+ col->Type()->GetName()
512-
);
513-
// auto item = ItemView{Type::String, std::string_view(s)};
514-
// col->As<ColumnLowCardinality>()->Append(item);
479+
col->AsStrict<ColumnLowCardinalityT<ColumnString>>()->Append(s);
515480
break;
516481
}
517482
default:
@@ -573,7 +538,7 @@ static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, boo
573538
{
574539
case Type::Array: {
575540
// XXX Figure out proper value to create and pass to
576-
// Append.
541+
// Append. Then uncomment the test in binary_inserts.sql.
577542
throw std::runtime_error(
578543
"clickhouse_fdw: XXX unsupported column type "
579544
+ col->Type()->GetName()
@@ -607,8 +572,8 @@ void ch_binary_column_append_data(ch_binary_insert_state * state, size_t colidx)
607572
{
608573
try
609574
{
610-
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
611-
auto col = columns[colidx];
575+
auto block = (Block *)state->insert_block;
576+
auto col = (*block)[colidx];
612577

613578
Datum val = state->values[colidx];
614579
Oid valtype = TupleDescAttr(state->outdesc, colidx)->atttypid;
@@ -626,16 +591,11 @@ void ch_binary_insert_columns(ch_binary_insert_state * state)
626591
{
627592
try
628593
{
629-
Block block;
630-
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
631-
for (int i = 0; i < state->outdesc->natts; ++i)
632-
{
633-
Form_pg_attribute att = TupleDescAttr(state->outdesc, i);
634-
block.AppendColumn(NameStr(att->attname), columns[i]);
635-
}
636-
637594
Client * client = (Client *)state->conn->client;
638-
client->Insert(state->table_name, block);
595+
auto block = (Block *)state->insert_block;
596+
block->RefreshRowCount();
597+
client->InsertData(*block);
598+
block->Clear();
639599
}
640600
catch (const std::exception & e)
641601
{

src/include/binary.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ extern "C" {
66
#endif
77

88
typedef struct ch_binary_connection_t ch_binary_connection_t;
9+
typedef struct ch_insert_block_h ch_insert_block_h;
910
typedef struct ch_binary_response_t
1011
{
1112
void *values;
@@ -48,7 +49,7 @@ typedef struct {
4849
MemoryContextCallback callback;
4950

5051
TupleDesc outdesc;
51-
void *columns; /* std::vector */
52+
ch_insert_block_h *insert_block; /* clickhouse::Block */
5253
size_t len;
5354
void *conversion_states;
5455
char *table_name;

src/pglink.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,6 @@ binary_insert_tuple(void *istate, TupleTableSlot *slot)
729729
else
730730
{
731731
ch_binary_insert_columns(state);
732-
state->success = true;
733732
}
734733
}
735734

0 commit comments

Comments
 (0)