44#include "logger_private.h"
55#include "unistd.h"
66
7+ #define INT32_STRING_MAX_LENGTH 12
8+
79int check_mutation (HoloMutation mutation );
810int check_partition (HoloClient * client , HoloMutation mutation );
911int check_get (HoloGet get );
@@ -95,6 +97,7 @@ int holo_client_close_client(HoloClient* client) {
9597}
9698
9799HoloTableSchema * holo_client_get_tableschema_by_tablename (HoloClient * client , HoloTableName name , bool withCache , char * * errMsgAddr ) {
100+ LOG_DEBUG ("get table schema by table name:%s" , name .fullName );
98101 Meta meta = NULL ;
99102 HoloTableSchema * schema = NULL ;
100103 if (withCache ) {
@@ -292,23 +295,38 @@ int check_mutation(HoloMutation mutation) {
292295 return HOLO_CLIENT_RET_OK ;
293296}
294297
298+ static int get_int_partition_value_string (char * intValueStr , const char * intPartitionValue ) {
299+ char * copyValue = MALLOC (4 , char );
300+ memcpy (copyValue , intPartitionValue , 4 );
301+ endian_swap (copyValue , 4 );
302+ snprintf (intValueStr , INT32_STRING_MAX_LENGTH , "%d" , * (int32_t * )copyValue );
303+ endian_swap (copyValue , 4 );
304+ FREE (copyValue );
305+ return HOLO_CLIENT_RET_OK ;
306+ }
307+
295308void * find_partition_table_name (PGconn * conn , void * arg ) {
296309 HoloRecord * record = arg ;
297310 const char * findPartitionSql = "with inh as (SELECT i.inhrelid, i.inhparent FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_catalog.pg_inherits i on c.oid=i.inhparent where n.nspname = $1 and c.relname= $2) select n.nspname as schema_name, c.relname as table_name, inh.inhrelid, inh.inhparent, p.partstrat, pg_get_expr(c.relpartbound, c.oid, true) as part_expr, p.partdefid, p.partnatts, p.partattrs from inh join pg_catalog.pg_class c on inh.inhrelid = c.oid join pg_catalog.pg_namespace n on c.relnamespace = n.oid join pg_partitioned_table p on p.partrelid = inh.inhparent where pg_get_expr(c.relpartbound, c.oid, true) = $3 limit 1" ;
298311 const char * schemaName = record -> schema -> tableName -> schemaName ;
299312 const char * tableName = record -> schema -> tableName -> tableName ;
300- int length = strlen (record -> values [record -> schema -> partitionColumn ]) + 19 ;
301- char * partitionInfo = MALLOC (length , char );
302- const char * params [3 ] = {schemaName , tableName , partitionInfo };
313+ char * partitionInfo = NULL ;
303314 PGresult * res = NULL ;
304315 HoloTableName * name = NULL ;
305- Oid type = record -> schema -> columns [record -> schema -> partitionColumn ].type ;
306- if (type == HOLO_TYPE_TEXT || type == HOLO_TYPE_VARCHAR ) {
316+ char intValStr [INT32_STRING_MAX_LENGTH ];
317+ if (record -> valueFormats [record -> schema -> partitionColumn ] == 1 ) {
318+ //int
319+ get_int_partition_value_string (intValStr , record -> values [record -> schema -> partitionColumn ]);
320+ int length = strlen (intValStr ) + 17 ;
321+ partitionInfo = MALLOC (length , char );
322+ snprintf (partitionInfo , length , "FOR VALUES IN (%s)" , intValStr );
323+ } else {
307324 //text or varchar
325+ int length = strlen (record -> values [record -> schema -> partitionColumn ]) + 19 ;
326+ partitionInfo = MALLOC (length , char );
308327 snprintf (partitionInfo , length , "FOR VALUES IN ('%s')" , record -> values [record -> schema -> partitionColumn ]);
309- } else {
310- snprintf (partitionInfo , length , "FOR VALUES IN (%s)" , record -> values [record -> schema -> partitionColumn ]);
311328 }
329+ const char * params [3 ] = {schemaName , tableName , partitionInfo };
312330 res = PQexecParams (conn , findPartitionSql , 3 , NULL , params , NULL , NULL , 0 );
313331 if (PQntuples (res ) == 0 ) {
314332 PQclear (res );
@@ -328,13 +346,13 @@ void* retry_create_partition_child_table(PGconn* conn, void* arg) {
328346 int retry = 0 ;
329347 HoloRecord * record = arg ;
330348 HoloTableSchema * parentSchema = record -> schema ;
331- char intVal [ 12 ] ;
332- char * partitionValue = record -> values [ record -> schema -> partitionColumn ];
349+ char * partitionValue = NULL ;
350+ char intValStr [ INT32_STRING_MAX_LENGTH ];
333351 if (record -> valueFormats [parentSchema -> partitionColumn ] == 1 ) {
334- endian_swap ( partitionValue , 4 );
335- snprintf ( intVal , 12 , "%d" , * ( int32_t * ) partitionValue ) ;
336- endian_swap ( partitionValue , 4 );
337- partitionValue = intVal ;
352+ get_int_partition_value_string ( intValStr , record -> values [ record -> schema -> partitionColumn ] );
353+ partitionValue = intValStr ;
354+ } else {
355+ partitionValue = record -> values [ record -> schema -> partitionColumn ] ;
338356 }
339357 int maxTableNameLength = strlen (parentSchema -> tableName -> tableName ) + strlen (partitionValue ) + 22 ;
340358 int maxSqlLength = 2 * strlen (parentSchema -> tableName -> schemaName ) + strlen (parentSchema -> tableName -> tableName ) + maxTableNameLength + strlen (partitionValue ) + 57 ;
@@ -411,15 +429,14 @@ int check_partition(HoloClient* client, HoloMutation mutation) {
411429 HoloTableName * name = NULL ;
412430 Sql sql = NULL ;
413431 char * partition = NULL ;
414- char intVal [ 12 ];
432+ char intValStr [ INT32_STRING_MAX_LENGTH ];
415433 if (mutation -> record -> schema -> partitionColumn == -1 ) return HOLO_CLIENT_RET_OK ;
416434 parentSchema = mutation -> record -> schema ;
417- partitionValue = mutation -> record -> values [parentSchema -> partitionColumn ];
418435 if (mutation -> record -> valueFormats [parentSchema -> partitionColumn ] == 1 ) {
419- endian_swap ( partitionValue , 4 );
420- snprintf ( intVal , 12 , "%d" , * ( int32_t * ) partitionValue ) ;
421- endian_swap ( partitionValue , 4 );
422- partitionValue = intVal ;
436+ get_int_partition_value_string ( intValStr , mutation -> record -> values [ parentSchema -> partitionColumn ] );
437+ partitionValue = intValStr ;
438+ } else {
439+ partitionValue = mutation -> record -> values [ parentSchema -> partitionColumn ] ;
423440 }
424441 partitionSchema = meta_cache_find_partition (client -> workerPool -> metaCache , parentSchema , partitionValue );
425442 if (partitionSchema != NULL ) {
0 commit comments