|
64 | 64 | ;; ---------------------------------------------------------------- |
65 | 65 | (defmethod sql-jdbc.conn/connection-details->spec :arrow-flight-sql |
66 | 66 | [_ details] |
67 | | - (let [{:keys [host port token username user password useEncryption disableCertificateVerification] |
| 67 | + (let [{:keys [host port token username user password catalog useEncryption disableCertificateVerification] |
68 | 68 | :or {useEncryption true |
69 | 69 | disableCertificateVerification false}} details |
70 | 70 | ;; prefer :username (manifest) but fall back to :user |
|
84 | 84 | :else []) |
85 | 85 |
|
86 | 86 | ;; Build query params |
87 | | - params (-> [] |
88 | | - (into auth-qps) |
89 | | - (conj (str "useEncryption=" (boolean useEncryption))) |
90 | | - (conj (str "disableCertificateVerification=" (boolean disableCertificateVerification)))) |
| 87 | + params (cond-> [] |
| 88 | + true (into auth-qps) |
| 89 | + true (conj (str "useEncryption=" (boolean useEncryption))) |
| 90 | + true (conj (str "disableCertificateVerification=" (boolean disableCertificateVerification))) |
| 91 | + ;; Add catalog if specified |
| 92 | + (and (string? catalog) (not (str/blank? catalog))) |
| 93 | + (conj (str "catalog=" (codec/url-encode catalog)))) |
91 | 94 | qp (str/join "&" params) |
92 | 95 |
|
93 | 96 | ;; Full JDBC URL |
94 | 97 | full-url (str "jdbc:arrow-flight-sql://" |
95 | 98 | (or host "localhost") ":" |
96 | 99 | (or port 443) |
97 | 100 | (when-not (str/blank? qp) (str "?" qp)))] |
| 101 | + (log/debug "Arrow Flight SQL connection URL:" full-url) |
98 | 102 | (let [scheme "jdbc:arrow-flight-sql:" |
99 | 103 | subname (subs full-url (count scheme))] |
100 | 104 | {:classname "org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver" |
|
115 | 119 | ;; Executes a simple "SELECT 1" query to verify connectivity. |
116 | 120 | (defmethod driver/can-connect? :arrow-flight-sql |
117 | 121 | [driver details] |
118 | | - (try |
119 | | - (sql-jdbc.conn/with-connection-spec-for-testing-connection [spec [driver details]] |
120 | | - (jdbc/query spec "SELECT 1")) |
121 | | - true |
122 | | - (catch Exception e |
123 | | - (log/error e "Flight SQL connection test failed.") |
124 | | - false))) |
| 122 | + (let [query-succeeded? (atom false)] |
| 123 | + (try |
| 124 | + (let [spec (sql-jdbc.conn/connection-details->spec driver details)] |
| 125 | + (log/info "Testing connection with spec, attempting to get connection...") |
| 126 | + (try |
| 127 | + (with-open [conn (jdbc/get-connection spec)] |
| 128 | + (log/info "Connection obtained, executing test query...") |
| 129 | + (let [result (jdbc/query {:connection conn} ["SELECT 1"])] |
| 130 | + (log/info "Test query successful:" result) |
| 131 | + (reset! query-succeeded? true) |
| 132 | + true)) |
| 133 | + (catch java.sql.SQLException e |
| 134 | + ;; If the query succeeded but closing failed (known issue with Flight SQL + catalog param), |
| 135 | + ;; treat as successful connection |
| 136 | + (if @query-succeeded? |
| 137 | + (do |
| 138 | + (log/warn "Connection close failed but query succeeded, treating as success:" (.getMessage e)) |
| 139 | + true) |
| 140 | + (throw e))))) |
| 141 | + (catch Exception e |
| 142 | + (log/error e "Flight SQL connection test failed.") |
| 143 | + false)))) |
| 144 | + |
125 | 145 |
|
126 | 146 | ;; ---------------------------------------------------------------- |
127 | 147 | ;; Map raw database types to Metabase base types. |
|
183 | 203 | ;; Custom Schema Sync Implementations |
184 | 204 | ;; ---------------------------------------------------------------- |
185 | 205 |
|
| 206 | +;; Helper function to safely close connections when catalog parameter is used |
| 207 | +(defn- safely-close-connection [conn] |
| 208 | + (try |
| 209 | + (.close conn) |
| 210 | + (catch java.sql.SQLException e |
| 211 | + (when-not (re-find #"Channel shutdown" (.getMessage e)) |
| 212 | + (throw e)) |
| 213 | + ;; Log but ignore "Channel shutdown" errors during close |
| 214 | + (log/debug "Ignoring connection close error (known Flight SQL issue with catalog param):" (.getMessage e))))) |
| 215 | + |
| 216 | + |
186 | 217 | ;; List tables by querying information_schema.tables. |
187 | 218 | (defmethod driver/describe-database :arrow-flight-sql |
188 | 219 | [driver database] |
189 | | - (let [spec (sql-jdbc.conn/connection-details->spec :arrow-flight-sql (:details database))] |
190 | | - (with-open [conn (jdbc/get-connection spec)] |
| 220 | + (let [spec (sql-jdbc.conn/connection-details->spec :arrow-flight-sql (:details database)) |
| 221 | + conn (jdbc/get-connection spec)] |
| 222 | + (try |
191 | 223 | (let [rows (jdbc/query {:connection conn} |
192 | | - ["SELECT table_name, table_schema FROM information_schema.tables"] |
| 224 | + ["SELECT table_name, table_schema FROM information_schema.tables WHERE table_catalog = CURRENT_CATALOG()"] |
193 | 225 | {:identifiers str/lower-case}) |
194 | 226 | formatted (->> rows |
195 | 227 | (filter #(not= (str/lower-case (:table_schema %)) "information_schema")) |
196 | 228 | (map (fn [row] |
197 | 229 | {:name (:table_name row) |
198 | 230 | :schema (:table_schema row)})))] |
199 | | - {:tables (into #{} formatted)})))) ;; Return a set of formatted table information |
| 231 | + {:tables (into #{} formatted)}) |
| 232 | + (finally |
| 233 | + (safely-close-connection conn))))) |
200 | 234 |
|
201 | 235 | ;; ---------------------------------------------------------------- |
202 | 236 | ;; Describe a specific table by executing a DESCRIBE query. |
203 | 237 | (defmethod driver/describe-table :arrow-flight-sql |
204 | 238 | [_ driver database {:keys [name schema]}] |
205 | | - (let [spec (sql-jdbc.conn/connection-details->spec :arrow-flight-sql (:details database))] |
206 | | - (with-open [conn (jdbc/get-connection spec)] |
| 239 | + (let [spec (sql-jdbc.conn/connection-details->spec :arrow-flight-sql (:details database)) |
| 240 | + conn (jdbc/get-connection spec)] |
| 241 | + (try |
207 | 242 | (let [query (format "DESCRIBE \"%s\".\"%s\"" schema name) ;; Build the DESCRIBE query using schema and table name |
208 | 243 | results (jdbc/query {:connection conn} [query] {:identifiers str/lower-case}) |
209 | 244 | fields (mapv (fn [{:keys [column_name data_type is_nullable]}] |
|
221 | 256 | (log/info "Parsed fields:" fields) |
222 | 257 | {:name name |
223 | 258 | :schema schema |
224 | | - :fields fields})))) |
| 259 | + :fields fields}) |
| 260 | + (finally |
| 261 | + (safely-close-connection conn))))) |
225 | 262 |
|
226 | 263 | ;; ---------------------------------------------------------------- |
227 | 264 | ;; Define a method to describe table foreign keys. |
|
237 | 274 | (defmethod sql-jdbc.sync/describe-fields-sql :arrow-flight-sql |
238 | 275 | [driver & {:keys [schema-names table-names details]}] |
239 | 276 | (let [base-condition [:>= [:inline 1] [:inline 1]] |
| 277 | + catalog-condition [:= :table_catalog [:raw "CURRENT_CATALOG()"]] |
240 | 278 | schema-condition (when (seq schema-names) |
241 | 279 | [:in [:lower :table_schema] |
242 | 280 | (mapv (fn [s] [:inline (str/lower-case s)]) schema-names)]) |
243 | 281 | table-condition (when (seq table-names) |
244 | 282 | [:in [:lower :table_name] |
245 | 283 | (mapv (fn [t] [:inline (str/lower-case t)]) table-names)]) |
246 | | - where-clause (cond-> [base-condition] |
| 284 | + where-clause (cond-> [base-condition catalog-condition] |
247 | 285 | schema-condition (conj schema-condition) |
248 | 286 | table-condition (conj table-condition))] |
249 | 287 | (sql/format |
|
0 commit comments