Skip to content

Commit 59d1ade

Browse files
committed
Implement BeginInsert/InsertData/EndInsert flow
Replace the old (and commented-out) code with the new `BeginInsert` / `InsertData` / `EndInsert` pattern in `clickhouse-cpp`. This greatly simplifies the code, as the most of the necessary logic for creating the insert Block object and tracking the state of the insert is now handled by the Client object. Copy the test from the old FDW, but comment out the bit working with arrays, as it has not yet been updated.
1 parent fe2d232 commit 59d1ade

9 files changed

Lines changed: 612 additions & 101 deletions

File tree

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[submodule "clickhouse-cpp"]
22
path = vendor/clickhouse-cpp
3-
url = https://github.com/ClickHouse/clickhouse-cpp.git
3+
url = https://github.com/theory/clickhouse-cpp.git

src/binary.cpp

Lines changed: 51 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -285,98 +285,60 @@ static Oid get_corr_postgres_type(const TypeRef & type)
285285
void ch_binary_insert_state_free(void * c)
286286
{
287287
auto * state = (ch_binary_insert_state *)c;
288-
if (state->columns)
288+
if (state->insert_block)
289289
{
290-
/* try to send empty block that sets proper ClickHouse state */
291-
if (!state->success)
290+
/* Finish the insert to set the proper ClickHouse state */
291+
Client * client = (Client *)state->conn->client;
292+
try
292293
{
293-
try
294-
{
295-
Client * client = (Client *)state->conn->client;
296-
client->Insert(state->table_name, Block());
297-
}
298-
catch (const std::exception & e)
299-
{
300-
// just ignore, next query will fail
301-
elog(NOTICE, "clickhouse_fdw: could not send empty packet");
302-
}
294+
client->EndInsert();
303295
}
304-
305-
delete (std::vector<clickhouse::ColumnRef> *)state->columns;
296+
catch (const std::exception & e)
297+
{
298+
// just ignore, next query will fail
299+
elog(NOTICE, "clickhouse_fdw: could not finish INSERT: - %s", e.what());
300+
}
301+
delete (Block *)state->insert_block;
306302
}
307303
}
308304

309305
void ch_binary_prepare_insert(void * conn, char * query, ch_binary_insert_state * state)
310306
{
311-
throw std::runtime_error("clickhouse_fdw: XXX ch_binary_prepare_insert not implemented");
312-
313-
// std::vector<clickhouse::ColumnRef> * vec = nullptr;
314-
// Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
315-
316-
// try
317-
// {
318-
// client->PrepareInsert(
319-
// std::string(query) + " VALUES", [&state, &vec](const Block & sample_block) {
320-
// if (sample_block.GetColumnCount() == 0)
321-
// return true;
322-
323-
// vec = new std::vector<clickhouse::ColumnRef>();
324-
325-
// state->len = sample_block.GetColumnCount();
326-
327-
// #if PG_VERSION_NUM < 120000
328-
// state->outdesc = CreateTemplateTupleDesc(state->len, false);
329-
// #else
330-
// state->outdesc = CreateTemplateTupleDesc(state->len);
331-
// #endif
332-
333-
// for (size_t i = 0; i < state->len; i++)
334-
// {
335-
// bool error = false;
336-
// clickhouse::ColumnRef col = sample_block[i];
337-
338-
// auto chtype = col->Type();
339-
// if (chtype->GetCode() == Type::LowCardinality)
340-
// {
341-
// chtype = col->As<ColumnLowCardinality>()->GetNestedType();
342-
// }
343-
344-
// Oid pg_type = get_corr_postgres_type(chtype);
345-
346-
// vec->push_back(clickhouse::CreateColumnByType(col->Type()->GetName()));
347-
// const char * colname = sample_block.GetColumnName(i).c_str();
348-
349-
// /* we can't afford long jumps outside of this function */
350-
// PG_TRY();
351-
// {
352-
// TupleDescInitEntry(
353-
// state->outdesc, (AttrNumber)i + 1, colname, pg_type, -1, 0);
354-
// }
355-
// PG_CATCH();
356-
// {
357-
// error = true;
358-
// }
359-
// PG_END_TRY();
360-
361-
// if (error)
362-
// throw std::runtime_error("could not init tuple descriptor");
363-
// }
364-
365-
// return true;
366-
// });
367-
// }
368-
// catch (const std::exception & e)
369-
// {
370-
// client->ResetConnection();
371-
372-
// if (vec != nullptr)
373-
// delete vec;
374-
375-
// elog(ERROR, "clickhouse_fdw: error while insert preparation - %s", e.what());
376-
// }
377-
378-
// if (vec != nullptr)
379-
// state->columns = (void *)vec;
307+
Block *block = nullptr;
308+
Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
309+
try
310+
{
311+
// The Client retursn a `unique_ptr`. Grab the internal pointer to
312+
// take ownership of it and manage its memory ourselves.
313+
block = client->BeginInsert(std::string(query) + " VALUES").release();
314+
}
315+
catch (const std::exception & e)
316+
{
317+
elog(ERROR, "clickhouse_fdw: could prepare insert - %s", e.what());
318+
}
319+
state->len = block->GetColumnCount();
320+
if (state->len == 0) return;
321+
state->outdesc = CreateTemplateTupleDesc(state->len);
322+
323+
AttrNumber i = 0;
324+
for (Block::Iterator bi(*block); bi.IsValid(); bi.Next())
325+
{
326+
Oid pg_type = get_corr_postgres_type(bi.Type());
327+
const char * colname = bi.Name().c_str();
328+
329+
PG_TRY();
330+
{
331+
TupleDescInitEntry(state->outdesc, ++i, colname, pg_type, -1, 0);
332+
}
333+
PG_CATCH();
334+
{
335+
client->ResetConnection();
336+
delete block;
337+
}
338+
PG_END_TRY();
339+
}
340+
341+
state->insert_block = (ch_insert_block_h *) block;
380342
}
381343

382344
static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, bool isnull)
@@ -573,7 +535,7 @@ static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, boo
573535
{
574536
case Type::Array: {
575537
// XXX Figure out proper value to create and pass to
576-
// Append.
538+
// Append. Then uncomment the test in binary_inserts.sql.
577539
throw std::runtime_error(
578540
"clickhouse_fdw: XXX unsupported column type "
579541
+ col->Type()->GetName()
@@ -607,8 +569,8 @@ void ch_binary_column_append_data(ch_binary_insert_state * state, size_t colidx)
607569
{
608570
try
609571
{
610-
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
611-
auto col = columns[colidx];
572+
auto block = (Block *)state->insert_block;
573+
auto col = (*block)[colidx];
612574

613575
Datum val = state->values[colidx];
614576
Oid valtype = TupleDescAttr(state->outdesc, colidx)->atttypid;
@@ -626,16 +588,9 @@ void ch_binary_insert_columns(ch_binary_insert_state * state)
626588
{
627589
try
628590
{
629-
Block block;
630-
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
631-
for (int i = 0; i < state->outdesc->natts; ++i)
632-
{
633-
Form_pg_attribute att = TupleDescAttr(state->outdesc, i);
634-
block.AppendColumn(NameStr(att->attname), columns[i]);
635-
}
636-
637591
Client * client = (Client *)state->conn->client;
638-
client->Insert(state->table_name, block);
592+
auto block = (Block *)state->insert_block;
593+
client->InsertData(*block);
639594
}
640595
catch (const std::exception & e)
641596
{

src/include/binary.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ extern "C" {
66
#endif
77

88
typedef struct ch_binary_connection_t ch_binary_connection_t;
9+
typedef struct ch_insert_block_h ch_insert_block_h;
910
typedef struct ch_binary_response_t
1011
{
1112
void *values;
@@ -48,7 +49,7 @@ typedef struct {
4849
MemoryContextCallback callback;
4950

5051
TupleDesc outdesc;
51-
void *columns; /* std::vector */
52+
ch_insert_block_h *insert_block; /* clickhouse::Block */
5253
size_t len;
5354
void *conversion_states;
5455
char *table_name;

src/pglink.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,6 @@ binary_insert_tuple(void *istate, TupleTableSlot *slot)
729729
else
730730
{
731731
ch_binary_insert_columns(state);
732-
state->success = true;
733732
}
734733
}
735734

0 commit comments

Comments
 (0)