55import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
66import com .clickhouse .client .api .data_formats .NativeFormatReader ;
77import com .clickhouse .client .api .data_formats .RowBinaryFormatReader ;
8- import com .clickhouse .client .api .data_formats .RowBinaryFormatSerializer ;
98import com .clickhouse .client .api .data_formats .RowBinaryWithNamesAndTypesFormatReader ;
109import com .clickhouse .client .api .data_formats .RowBinaryWithNamesFormatReader ;
1110import com .clickhouse .client .api .data_formats .internal .BinaryStreamReader ;
1211import com .clickhouse .client .api .data_formats .internal .MapBackedRecord ;
1312import com .clickhouse .client .api .data_formats .internal .ProcessParser ;
14- import com .clickhouse .client .api .data_formats .internal .SerializerUtils ;
1513import com .clickhouse .client .api .enums .Protocol ;
1614import com .clickhouse .client .api .enums .ProxyType ;
1715import com .clickhouse .client .api .http .ClickHouseHttpProto ;
18- import com .clickhouse .client .api .insert .DataSerializationException ;
1916import com .clickhouse .client .api .insert .InsertResponse ;
2017import com .clickhouse .client .api .insert .InsertSettings ;
21- import com .clickhouse .client .api .insert .POJOSerializer ;
2218import com .clickhouse .client .api .internal .ClickHouseLZ4OutputStream ;
2319import com .clickhouse .client .api .internal .ClientStatisticsHolder ;
2420import com .clickhouse .client .api .internal .HttpAPIClientHelper ;
3127import com .clickhouse .client .api .metrics .ClientMetrics ;
3228import com .clickhouse .client .api .metrics .OperationMetrics ;
3329import com .clickhouse .client .api .query .GenericRecord ;
34- import com .clickhouse .client .api .query .POJOSetter ;
3530import com .clickhouse .client .api .query .QueryResponse ;
3631import com .clickhouse .client .api .query .QuerySettings ;
3732import com .clickhouse .client .api .query .Records ;
33+ import com .clickhouse .client .api .serde .DataSerializationException ;
34+ import com .clickhouse .client .api .serde .POJOFieldDeserializer ;
35+ import com .clickhouse .client .api .serde .POJOFieldSerializer ;
36+ import com .clickhouse .client .api .serde .POJOSerDe ;
3837import com .clickhouse .client .api .transport .Endpoint ;
3938import com .clickhouse .client .api .transport .HttpEndpoint ;
4039import com .clickhouse .client .config .ClickHouseClientOption ;
5453import java .io .InputStream ;
5554import java .io .OutputStream ;
5655import java .lang .reflect .InvocationTargetException ;
57- import java .lang .reflect .Method ;
5856import java .net .URL ;
5957import java .nio .charset .StandardCharsets ;
6058import java .time .Duration ;
116114 *
117115 */
118116public class Client implements AutoCloseable {
117+ private static final Logger LOG = LoggerFactory .getLogger (Client .class );
119118
120119 private HttpAPIClientHelper httpClientHelper = null ;
121120
122121 private final List <Endpoint > endpoints ;
123-
124122 private final Map <String , String > configuration ;
125123
126124 private final Map <String , String > readOnlyConfig ;
125+
126+ private final POJOSerDe pojoSerDe ;
127127
128- // POJO serializer mapping (class -> (schema -> (format -> serializer)))
129- private final Map <Class <?>, Map <String , Map <String , POJOSerializer >>> serializers ;
130-
131- // POJO deserializer mapping (class -> (schema -> (format -> deserializer)))
132- private final Map <Class <?>, Map <String , Map <String , POJOSetter >>> deserializers ;
133-
134- private static final Logger LOG = LoggerFactory .getLogger (Client .class );
135128 private final ExecutorService sharedOperationExecutor ;
136129
137130 private final boolean isSharedOpExecutorOwned ;
138131
139132 private final Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
140133
141- private Map <String , TableSchema > tableSchemaCache = new ConcurrentHashMap <>();
142- private Map <String , Boolean > tableSchemaHasDefaults = new ConcurrentHashMap <>();
134+ private final Map <String , TableSchema > tableSchemaCache = new ConcurrentHashMap <>();
143135
144- private final ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy ;
136+ private final Map < String , Boolean > tableSchemaHasDefaults = new ConcurrentHashMap <>() ;
145137
146138 // Server context
147139 private String serverVersion ;
@@ -162,8 +154,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
162154 this .metricsRegistry = metricsRegistry ;
163155
164156 // Serialization
165- this .serializers = new ConcurrentHashMap <>();
166- this .deserializers = new ConcurrentHashMap <>();
157+ this .pojoSerDe = new POJOSerDe (columnToMethodMatchingStrategy );
167158
168159 // Operation Execution
169160 boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
@@ -175,9 +166,6 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
175166 this .sharedOperationExecutor = sharedOperationExecutor ;
176167 }
177168
178- this .columnToMethodMatchingStrategy = columnToMethodMatchingStrategy ;
179-
180-
181169 // Transport
182170 ImmutableList .Builder <Endpoint > tmpEndpoints = ImmutableList .builder ();
183171 boolean initSslContext = false ;
@@ -1243,7 +1231,6 @@ public boolean ping(long timeout) {
12431231 * @param schema - correlating table schema
12441232 */
12451233 public synchronized void register (Class <?> clazz , TableSchema schema ) {
1246- LOG .debug ("Registering POJO: {}" , clazz .getName ());
12471234 String schemaKey ;
12481235 if (schema .getTableName () != null && schema .getQuery () == null ) {
12491236 schemaKey = schema .getTableName ();
@@ -1253,55 +1240,9 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
12531240 throw new IllegalArgumentException ("Table schema has both query and table name set. Only one is allowed." );
12541241 }
12551242 tableSchemaCache .put (schemaKey , schema );
1243+ tableSchemaHasDefaults .put (schemaKey , schema .hasDefaults ());
12561244
1257- ColumnToMethodMatchingStrategy matchingStrategy = columnToMethodMatchingStrategy ;
1258-
1259- //Create a new POJOSerializer with static .serialize(object, columns) methods
1260- Map <String , Method > classGetters = new HashMap <>();
1261- Map <String , Method > classSetters = new HashMap <>();
1262- for (Method method : clazz .getMethods ()) {//Clean up the method names
1263- if (matchingStrategy .isGetter (method .getName ())) {
1264- String methodName = matchingStrategy .normalizeMethodName (method .getName ());
1265- classGetters .put (methodName , method );
1266- } else if (matchingStrategy .isSetter (method .getName ())) {
1267- String methodName = matchingStrategy .normalizeMethodName (method .getName ());
1268- classSetters .put (methodName , method );
1269- }
1270- }
1271-
1272- Map <String , POJOSerializer > schemaSerializers = new HashMap <>();
1273- Map <String , POJOSetter > schemaDeserializers = new ConcurrentHashMap <>();
1274- boolean defaultsSupport = schema .hasDefaults ();
1275- tableSchemaHasDefaults .put (schemaKey , defaultsSupport );
1276- for (ClickHouseColumn column : schema .getColumns ()) {
1277- String propertyName = columnToMethodMatchingStrategy .normalizeColumnName (column .getColumnName ());
1278- Method getterMethod = classGetters .get (propertyName );
1279- if (getterMethod != null ) {
1280- schemaSerializers .put (column .getColumnName (), (obj , stream ) -> {
1281- Object value = getterMethod .invoke (obj );
1282-
1283- if (RowBinaryFormatSerializer .writeValuePreamble (stream , defaultsSupport , column , value )) {
1284- SerializerUtils .serializeData (stream , value , column );
1285- }
1286- });
1287- } else {
1288- LOG .warn ("No getter method found for column: {}" , propertyName );
1289- }
1290-
1291- // Deserialization stuff
1292- Method setterMethod = classSetters .get (propertyName );
1293- if (setterMethod != null ) {
1294- schemaDeserializers .put (column .getColumnName (), SerializerUtils .compilePOJOSetter (setterMethod , column ));
1295- } else {
1296- LOG .warn ("No setter method found for column: {}" , propertyName );
1297- }
1298- }
1299-
1300- Map <String , Map <String , POJOSerializer >> classSerializers = serializers .computeIfAbsent (clazz , k -> new HashMap <>());
1301- Map <String , Map <String , POJOSetter >> classDeserializers = deserializers .computeIfAbsent (clazz , k -> new HashMap <>());
1302-
1303- classSerializers .put (schemaKey , schemaSerializers );
1304- classDeserializers .put (schemaKey , schemaDeserializers );
1245+ pojoSerDe .registerClass (clazz , schema );
13051246 }
13061247
13071248 /**
@@ -1367,14 +1308,14 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
13671308 throw new IllegalArgumentException ("Table schema not found for table: " + tableName + ". Did you forget to register it?" );
13681309 }
13691310 //Lookup the Serializer for the POJO
1370- Map <String , POJOSerializer > classSerializers = serializers . getOrDefault (data .get (0 ).getClass (), Collections . emptyMap ())
1371- . getOrDefault ( tableName , Collections . emptyMap () );
1372- List <POJOSerializer > serializersForTable = new ArrayList <>();
1311+ Map <String , POJOFieldSerializer > classSerializers = pojoSerDe . getFieldSerializers (data .get (0 ).getClass (),
1312+ tableSchema );
1313+ List <POJOFieldSerializer > serializersForTable = new ArrayList <>();
13731314 for (ClickHouseColumn column : tableSchema .getColumns ()) {
13741315 if (column .hasDefault () && column .getDefaultValue () != ClickHouseColumn .DefaultValue .DEFAULT ) {
13751316 continue ;
13761317 }
1377- POJOSerializer serializer = classSerializers .get (column .getColumnName ());
1318+ POJOFieldSerializer serializer = classSerializers .get (column .getColumnName ());
13781319 if (serializer == null ) {
13791320 throw new IllegalArgumentException ("No serializer found for column '" + column .getColumnName () + "'. Did you forget to register it?" );
13801321 }
@@ -1405,7 +1346,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
14051346 out .write (" \n " .getBytes ());
14061347 for (Object obj : data ) {
14071348
1408- for (POJOSerializer serializer : serializersForTable ) {
1349+ for (POJOFieldSerializer serializer : serializersForTable ) {
14091350 try {
14101351 serializer .serialize (obj , out );
14111352 } catch (InvocationTargetException | IllegalAccessException | IOException e ) {
@@ -1918,9 +1859,7 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz, TableSchema schema)
19181859 */
19191860 @ SuppressWarnings ("unchecked" )
19201861 public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , TableSchema schema , Supplier <T > allocator ) {
1921- Map <String , POJOSetter > classDeserializers = deserializers .getOrDefault (clazz ,
1922- Collections .emptyMap ()).getOrDefault (schema .getTableName () == null ?
1923- schema .getQuery () : schema .getTableName (), Collections .emptyMap ());
1862+ Map <String , POJOFieldDeserializer > classDeserializers = pojoSerDe .getFieldDeserializers (clazz , schema );
19241863
19251864 if (classDeserializers .isEmpty ()) {
19261865 throw new IllegalArgumentException ("No deserializers found for the query and class '" + clazz + "'. Did you forget to register it?" );
0 commit comments