30
30
import com .hivemq .adapter .sdk .api .state .ProtocolAdapterState ;
31
31
import com .hivemq .adapter .sdk .api .tag .Tag ;
32
32
import com .hivemq .edge .adapters .postgresql .config .PostgreSQLAdapterConfig ;
33
+ import com .hivemq .edge .adapters .postgresql .config .PostgreSQLAdapterTag ;
33
34
import com .hivemq .edge .adapters .postgresql .config .PostgreSQLAdapterTagDefinition ;
34
35
import org .jetbrains .annotations .NotNull ;
35
36
import org .slf4j .Logger ;
36
37
import org .slf4j .LoggerFactory ;
37
38
39
+ import java .sql .Connection ;
38
40
import java .sql .PreparedStatement ;
39
41
import java .sql .ResultSet ;
40
42
import java .sql .ResultSetMetaData ;
41
- import java .sql .SQLException ;
42
43
import java .util .ArrayList ;
43
- import java .util .HashMap ;
44
44
import java .util .List ;
45
- import java .util .Optional ;
46
45
47
46
48
47
public class PostgreSQLPollingProtocolAdapter implements PollingProtocolAdapter {
@@ -56,12 +55,7 @@ public class PostgreSQLPollingProtocolAdapter implements PollingProtocolAdapter
56
55
private final @ NotNull ProtocolAdapterState protocolAdapterState ;
57
56
private final @ NotNull String adapterId ;
58
57
private final @ NotNull List <Tag > tags ;
59
- private final @ NotNull String compiledUri ;
60
- private final @ NotNull String username ;
61
- private final @ NotNull String password ;
62
- private final @ NotNull DatabaseConnection databaseConnection = new DatabaseConnection ();
63
- private final @ NotNull List <PollingContext > pollingContexts ;
64
- private final @ NotNull HashMap <String , PreparedStatement > tagNameToPreparedStatement = new HashMap <>();
58
+ private final @ NotNull DatabaseConnection databaseConnection ;
65
59
66
60
public PostgreSQLPollingProtocolAdapter (
67
61
final @ NotNull ProtocolAdapterInformation adapterInformation ,
@@ -71,13 +65,14 @@ public PostgreSQLPollingProtocolAdapter(
71
65
this .adapterConfig = input .getConfig ();
72
66
this .protocolAdapterState = input .getProtocolAdapterState ();
73
67
this .tags = input .getTags ();
74
- this . compiledUri = String .format ("jdbc:postgresql://%s:%s/%s" ,
68
+ final String compiledUri = String .format ("jdbc:postgresql://%s:%s/%s" ,
75
69
adapterConfig .getServer (),
76
70
adapterConfig .getPort (),
77
71
adapterConfig .getDatabase ());
78
- this .username = adapterConfig .getUsername ();
79
- this .password = adapterConfig .getPassword ();
80
- this .pollingContexts = input .getPollingContexts ();
72
+ this .databaseConnection = new DatabaseConnection (compiledUri ,
73
+ adapterConfig .getUsername (),
74
+ adapterConfig .getPassword (),
75
+ adapterConfig .getConnectionTimeout ());
81
76
}
82
77
83
78
@ Override
@@ -94,14 +89,7 @@ public void start(
94
89
output .failStart (e , null );
95
90
return ;
96
91
}
97
- databaseConnection .connect (compiledUri , username , password );
98
-
99
- try {
100
- preparePreparedStatements (output );
101
- } catch (final SQLException e ) {
102
- output .failStart (e , null );
103
- return ;
104
- }
92
+ databaseConnection .connect ();
105
93
106
94
try {
107
95
log .debug ("Starting connection to the database instance" );
@@ -119,26 +107,6 @@ public void start(
119
107
}
120
108
}
121
109
122
- private void preparePreparedStatements (final @ NotNull ProtocolAdapterStartOutput output ) throws SQLException {
123
- for (final PollingContext pollingContext : pollingContexts ) {
124
- final Optional <Tag > optDomainTag =
125
- tags .stream ().filter (tag -> tag .getName ().equals (pollingContext .getTagName ())).findFirst ();
126
- if (optDomainTag .isPresent ()) {
127
- final PostgreSQLAdapterTagDefinition definition =
128
- (PostgreSQLAdapterTagDefinition ) optDomainTag .get ().getDefinition ();
129
- final PreparedStatement preparedStatement =
130
- databaseConnection .getConnection ().prepareStatement (definition .getQuery ());
131
- tagNameToPreparedStatement .put (pollingContext .getTagName (), preparedStatement );
132
- } else {
133
- output .failStart (new IllegalStateException (
134
- "Polling for PostgreSQL protocol adapter failed because the used tag '" +
135
- pollingContext .getTagName () +
136
- "' was not found. For the polling to work the tag must be created via REST API or the UI." ),
137
- null );
138
- }
139
- }
140
- }
141
-
142
110
@ Override
143
111
public void stop (
144
112
final @ NotNull ProtocolAdapterStopInput protocolAdapterStopInput ,
@@ -164,49 +132,53 @@ public void poll(final @NotNull PollingInput pollingInput, final @NotNull Pollin
164
132
tags .stream ()
165
133
.filter (tag -> tag .getName ().equals (pollingContext .getTagName ()))
166
134
.findFirst ()
167
- .ifPresentOrElse (def -> loadDataFromDB (pollingOutput , def ),
135
+ .ifPresentOrElse (tag -> loadDataFromDB (pollingOutput , ( PostgreSQLAdapterTag ) tag ),
168
136
() -> pollingOutput .fail ("Polling for PostgreSQL protocol adapter failed because the used tag '" +
169
137
pollingInput .getPollingContext ().getTagName () +
170
138
"' was not found. For the polling to work the tag must be created via REST API or the UI." ));
171
139
pollingOutput .finish ();
172
140
}
173
141
174
- private void loadDataFromDB (final @ NotNull PollingOutput pollingOutput , final @ NotNull Tag tag ) {
142
+ private void loadDataFromDB (final @ NotNull PollingOutput output , final @ NotNull PostgreSQLAdapterTag tag ) {
175
143
try {
176
144
log .debug ("Getting tag definition" );
177
145
/* Get the tag definition (Query, RowLimit and Split Lines)*/
178
- final PostgreSQLAdapterTagDefinition definition = ( PostgreSQLAdapterTagDefinition ) tag .getDefinition ();
146
+ final PostgreSQLAdapterTagDefinition definition = tag .getDefinition ();
179
147
180
148
/* Execute query and handle result */
181
- final PreparedStatement preparedStatement = tagNameToPreparedStatement .get (tag .getName ());
182
- final ResultSet result = preparedStatement .executeQuery ();
183
- assert result != null ;
184
- final ArrayList <ObjectNode > resultObject = new ArrayList <>();
185
- final ResultSetMetaData resultSetMD = result .getMetaData ();
186
- while (result .next ()) {
187
- final int numColumns = resultSetMD .getColumnCount ();
188
- final ObjectNode node = OBJECT_MAPPER .createObjectNode ();
189
- for (int i = 1 ; i <= numColumns ; i ++) {
190
- final String column_name = resultSetMD .getColumnName (i );
191
- node .put (column_name , result .getString (column_name ));
149
+ try (final Connection connection = databaseConnection .getConnection ()) {
150
+ final PreparedStatement preparedStatement = connection .prepareStatement (tag .getDefinition ().getQuery ());
151
+ final ResultSet result = preparedStatement .executeQuery ();
152
+ assert result != null ;
153
+ final ArrayList <ObjectNode > resultObject = new ArrayList <>();
154
+ final ResultSetMetaData resultSetMD = result .getMetaData ();
155
+ while (result .next ()) {
156
+ final int numColumns = resultSetMD .getColumnCount ();
157
+ final ObjectNode node = OBJECT_MAPPER .createObjectNode ();
158
+ for (int i = 1 ; i <= numColumns ; i ++) {
159
+ final String column_name = resultSetMD .getColumnName (i );
160
+ node .put (column_name , result .getString (column_name ));
161
+ }
162
+
163
+ /* Publish datapoint with a single line if split is required */
164
+ if (definition .getSpiltLinesInIndividualMessages ()) {
165
+ log .debug ("Splitting lines in multiple messages" );
166
+ output .addDataPoint ("queryResult" , node );
167
+ } else {
168
+ resultObject .add (node );
169
+ }
192
170
}
193
171
194
- /* Publish datapoint with a single line if split is required */
195
- if (definition .getSpiltLinesInIndividualMessages ()) {
196
- log .debug ("Splitting lines in multiple messages" );
197
- pollingOutput .addDataPoint ("queryResult" , node );
198
- } else {
199
- resultObject .add (node );
172
+ /* Publish datapoint with all lines if no split is required */
173
+ if (!definition .getSpiltLinesInIndividualMessages ()) {
174
+ log .debug ("Publishing all lines in a single message" );
175
+ output .addDataPoint ("queryResult" , resultObject );
200
176
}
201
- }
202
-
203
- /* Publish datapoint with all lines if no split is required */
204
- if (!definition .getSpiltLinesInIndividualMessages ()) {
205
- log .debug ("Publishing all lines in a single message" );
206
- pollingOutput .addDataPoint ("queryResult" , resultObject );
177
+ } catch (final Exception e ) {
178
+ output .fail (e , null );
207
179
}
208
180
} catch (final Exception e ) {
209
- pollingOutput .fail (e , null );
181
+ output .fail (e , null );
210
182
}
211
183
}
212
184
0 commit comments