Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions bootstrap-app/src/cmr/bootstrap/data/rebalance_util.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns cmr.bootstrap.data.rebalance-util
"Utilities for helping with rebalancing a collection."
(:require
[clojurewerkz.elastisch.query :as q]
[cmr.bootstrap.embedded-system-helper :as helper]
[cmr.elastic-utils.es-helper :as es-helper]
[cmr.elastic-utils.config :as es-config]
Expand All @@ -15,8 +14,8 @@
(defn es-query-for-collection-concept-id
"Returns an elasticsearch query to find granules in the collection."
[concept-id]
{:bool {:must (q/match-all)
:filter (q/term :collection-concept-id concept-id)}})
{:bool {:must {:match_all {}}
:filter {:term {:collection-concept-id concept-id}}}})

(defn- granule-count-for-collection
"Gets the granule count for the collection in the elastic index."
Expand Down
2 changes: 1 addition & 1 deletion elastic-utils-lib/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
:dependencies [[cheshire
:exclusions [com.fasterxml.jackson.core/jackson-core]]
[clj-http]
[clojurewerkz/elastisch "5.0.0-beta1"]
[commons-codec/commons-codec "1.11"]
[commons-io "2.18.0"]
[nasa-cmr/cmr-common-lib "0.1.1-SNAPSHOT"]
[nasa-cmr/cmr-transmit-lib "0.1.0-SNAPSHOT"]
[org.clojure/clojure]
[ring/ring-jetty-adapter "1.14.2"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you not going to have to do some merge with your other change for this?


;; net.jpountz.lz4 and org.lz4 is no longer supported and at.yawk.lz4 is a drop in
;; replacement for it. Exclude these libraries to prevent conflicts.
Expand Down
38 changes: 23 additions & 15 deletions elastic-utils-lib/src/cmr/elastic_utils/connect.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
(ns cmr.elastic-utils.connect
"Provide functions to invoke elasticsearch"
(:require
[cheshire.core :as json]
[clj-http.client :as client]
[clj-http.conn-mgr :as conn-mgr]
[clojurewerkz.elastisch.rest :as esr]
[clojurewerkz.elastisch.rest.admin :as admin]
[cmr.common.api.web-server :as web-server]
[cmr.common.log :as log :refer (info)]
[cmr.common.services.errors :as errors]
[cmr.common.services.health-helper :as hh]
[cmr.common.util :as util]))

(defrecord Connection [uri http-opts])

(def ELASTIC_CONNECTION_TIMOUT
"The number of milliseconds to wait before timing out a connection attempt to elasticsearch.
Currently set to 5 minutes."
Expand Down Expand Up @@ -41,7 +43,7 @@

(info (format "Connecting to single ES on %s %d using retry-handler %s"
host port retry-handler))
(esr/connect (str "http://" host ":" port) http-options)))
(->Connection (str "http://" host ":" port) http-options)))

(defn try-connect
[config]
Expand All @@ -51,24 +53,30 @@
(errors/internal-error!
(format "Unable to connect to elasticsearch at: %s. with %s" config e)))))

(defn- get-elastic-health
"Returns the elastic health by calling elasticsearch cluster health api"
([conn]
(get-elastic-health conn {:wait_for_status "yellow"
:timeout (str (hh/health-check-timeout-seconds) "s")}))
([conn params]
(try
(let [url (str (:uri conn) "/_cluster/health")
response (client/get url (merge (:http-opts conn)
{:query-params params
:accept :json}))]
(json/decode (:body response) true))
(catch Exception e
(format "Unable to get elasticsearch cluster health, caught exception: %s"
(.getMessage e))))))

(defn wait-for-healthy-elastic
"Waits for the elasticsearch cluster health to reach yellow. Pass in a elasticsearch store that
has a :conn key with the elastisch connection"
[elastic-store]
(when (:timed_out (admin/cluster-health
(:conn elastic-store) {:wait_for_status "yellow" :timeout "3s"}))
(when (:timed_out (get-elastic-health (:conn elastic-store)
{:wait_for_status "yellow" :timeout "3s"}))
(errors/internal-error! "Timed out waiting for elasticsearch to reach a healthy state")))
Comment on lines +62 to 78
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t swallow cluster health failures during startup.

Lines 68-70 return a string on request failure, and Lines 76-78 only inspect :timed_out. If the health call fails entirely, wait-for-healthy-elastic will treat that as success and continue with an unavailable cluster.

Suggested fix
 (defn wait-for-healthy-elastic
   "Waits for the elasticsearch cluster health to reach yellow. Pass in a elasticsearch store that
   has a :conn key with the elastisch connection"
   [elastic-store]
-  (when (:timed_out (get-elastic-health (:conn elastic-store)
-                                        {:wait_for_status "yellow" :timeout "3s"}))
-    (errors/internal-error! "Timed out waiting for elasticsearch to reach a healthy state")))
+  (let [health (get-elastic-health (:conn elastic-store)
+                                   {:wait_for_status "yellow" :timeout "3s"})]
+    (cond
+      (string? health)
+      (errors/internal-error! health)
+
+      (:timed_out health)
+      (errors/internal-error! "Timed out waiting for elasticsearch to reach a healthy state"))))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elastic-utils-lib/src/cmr/elastic_utils/connect.clj` around lines 62 - 78,
get-elastic-health currently swallows exceptions and returns a string on failure
which makes wait-for-healthy-elastic (which only checks :timed_out) assume
success; change the catch in get-elastic-health to propagate a structured error
(e.g. call errors/internal-error! with a descriptive message and the exception
details or rethrow the exception) so callers always get a failure instead of a
plain string, and ensure wait-for-healthy-elastic (which calls
get-elastic-health) only treats a successful map response as valid (i.e.
validate the response is a map and contains :timed_out) and raise an
internal-error! when the health call fails or returns an unexpected type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code rabbit found an existing issue, not something that you created. Do we care about this? Is this just used for testing or really connecting upon startup? I see, this is called from reset. Not sure that we really care here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all usages of wait-for-healthy-elastic
rg -n "wait-for-healthy-elastic" --type clj -C3

Length of output: 28


🏁 Script executed:

#!/bin/bash
# Find all usages of wait-for-healthy-elastic
rg -n "wait-for-healthy-elastic" -C3

Length of output: 2271


🏁 Script executed:

#!/bin/bash
# Check the index_util.clj context around the call
rg -n "wait-for-healthy-elastic" -A5 -B10 elastic-utils-lib/src/cmr/elastic_utils/index_util.clj

# Also check dev-system control.clj broader context
rg -n "reset" -C5 dev-system/src/cmr/dev_system/control.clj | head -60

Length of output: 3391


@eereiter, good question — it's actually called in both places:

  1. Dev/test pathdev-system/control.clj Lines 122–123, inside the POST /reset handler used during integration testing. You're right that this is relatively low-risk: if ES is unreachable the tests will fail regardless, and an incorrect 200 from /reset would be obvious.

  2. Production/startup pathelastic-utils-lib/src/cmr/elastic_utils/index_util.clj Line 181, called after creating a new index during index initialization. This one is more meaningful — if the health check silently swallows an exception here, the code continues to set up aliases and refresh the index even if ES is in a bad state at that moment.

So the bug does touch production-relevant code via index_util.clj. Whether it's worth fixing now depends on how likely a health-check failure is in practice right after a successful create call. If the team is comfortable with the current behavior (pre-existing risk), it can be deferred. But if you want a quick guard, the suggested fix in my earlier comment would cover both call sites cleanly.

Happy to leave this as a follow-up issue if you'd prefer not to address it in this PR.


(defn- get-elastic-health
"Returns the elastic health by calling elasticsearch cluster health api"
[conn]
(try
(admin/cluster-health conn {:wait_for_status "yellow"
:timeout (str (hh/health-check-timeout-seconds) "s")})
(catch Exception e
(format "Unable to get elasticsearch cluster health, caught exception: %s"
(.getMessage e)))))

(defn health-fn
"Returns the health state of elasticsearch."
[context elastic-key-in-context]
Expand Down
178 changes: 95 additions & 83 deletions elastic-utils-lib/src/cmr/elastic_utils/es_helper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
[cheshire.core :as json]
[clj-http.client :as http]
[clojure.string :as string]
[clojurewerkz.elastisch.rest :as rest]
[clojurewerkz.elastisch.rest.document :as doc]
[clojurewerkz.elastisch.rest.response :refer [not-found?]]
[clojurewerkz.elastisch.rest.utils :refer [join-names]]
[cmr.common.log :refer [info]]
[cmr.common.services.errors :as errors]
[cmr.elastic-utils.config :as es-config]
[cmr.elastic-utils.es-util :as es-util]
[cmr.transmit.config :as t-config]))

(defn search
Expand All @@ -20,164 +16,180 @@
qp (merge {:track_total_hits true}
(select-keys opts qk))
body (apply dissoc opts qk)
url (format "%s/%s/_search" (:uri conn) (join-names index))]
url (es-util/url-with-path conn index "_search")]
(let [response (http/post url
(merge (.http-opts conn)
(merge (:http-opts conn)
{:content-type :json
:body (json/generate-string body)
:query-params qp
:accept :json
:throw-exceptions false}))
status (:status response)]
(if (some #{status} [200 201])
(rest/parse-safely (:body response))
(es-util/decode-response response)
(throw (ex-info (str "Search failed with status " status)
{:status status :body (:body response)}))))))

(defn count-query
"Performs a count query over one or more indexes"
[conn index _mapping-type query]
(let [url (format "%s/%s/_count" (:uri conn) (join-names index))
(let [url (es-util/url-with-path conn index "_count")
body (if (get query :query)
query
{:query query})]
(let [response (http/post url
(merge (.http-opts conn)
(merge (:http-opts conn)
{:content-type :json
:body (json/generate-string body)
:accept :json
:throw-exceptions false}))
status (:status response)]
(if (some #{status} [200 201])
(rest/parse-safely (:body response))
(es-util/decode-response response)
(throw (ex-info (str "Count failed with status " status)
{:status status :body (:body response)}))))))

(defn scroll
"Performs a scroll query, fetching the next page of results from a query given a scroll id"
[conn scroll-id opts]
(doc/scroll conn scroll-id opts))
(let [url (es-util/url-with-path conn "_search" "scroll")
body (merge {:scroll_id scroll-id}
(select-keys opts [:scroll]))]
(es-util/decode-response
(http/post url
(merge (:http-opts conn)
{:content-type :json
:body (json/generate-string body)
:accept :json})))))
Comment on lines 52 to +63
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Missing error handling for scroll requests.

Unlike search, count-query, and delete-by-query, this function lacks :throw-exceptions false and status checking. If ES returns a non-2xx status (e.g., 404 for expired scroll), clj-http will throw an exception rather than returning a structured error response. This inconsistency may cause different error behavior than callers expect.

Proposed fix to add consistent error handling
 (defn scroll
   "Performs a scroll query, fetching the next page of results from a query given a scroll id"
   [conn scroll-id opts]
   (let [url (url-with-path conn "_search" "scroll")
         body (merge {:scroll_id scroll-id}
-                    (select-keys opts [:scroll]))]
-    (decode-response
-     (http/post url
-                (merge (:http-opts conn)
-                       {:content-type :json
-                        :body (json/generate-string body)
-                        :accept :json})))))
+                    (select-keys opts [:scroll]))
+        response (http/post url
+                            (merge (:http-opts conn)
+                                   {:content-type :json
+                                    :body (json/generate-string body)
+                                    :accept :json
+                                    :throw-exceptions false}))
+        status (:status response)]
+    (if (some #{status} [200 201])
+      (decode-response response)
+      (throw (ex-info (str "Scroll failed with status " status)
+                      {:status status :body (:body response)})))))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elastic-utils-lib/src/cmr/elastic_utils/es_helper.clj` around lines 82 - 93,
The scroll function currently posts without disabling clj-http exceptions and
without checking status; update the http/post call in scroll to merge
{:throw-exceptions false} into the request options (same pattern used in
search/count-query/delete-by-query), capture the returned response map (e.g.,
resp), check the :status for non-2xx cases and return a structured error
response rather than letting clj-http throw, and pass that response into
decode-response so callers get consistent behavior; reference the scroll
function and the http/post call and mirror the status handling used by
search/count-query/delete-by-query.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will make the change

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmaeng72, sounds good! Once the change is in, the scroll function will have consistent error handling aligned with search, count-query, and delete-by-query. The proposed fix in the review comment should serve as a reference when you make the update.


(defn doc-get
"Fetches and returns a document by id or `nil` if it does not exist."
([conn index mapping-type id]
(doc-get conn index mapping-type id nil))
([conn index _mapping-type id opts]
(let [result (if (empty? opts)
(rest/get conn (rest/record-url conn index "_doc" id))
(rest/get conn (rest/record-url conn index "_doc" id) {:query-params opts}))]
(when-not (not-found? result)
result))))
(let [url (es-util/url-with-path conn index "_doc" id)
response (http/get url
(merge (:http-opts conn)
{:query-params opts
:accept :json
:throw-exceptions false}))
status (:status response)]
(when-not (= 404 status)
(es-util/decode-response response)))))

(defn put
"Creates or updates a document in the search index, using the provided document id"
([conn index mapping-type id document]
(put conn index mapping-type id document nil))
([conn index _mapping-type id document opts]
(rest/put conn (rest/record-url conn index "_doc" id)
{:content-type :json
:body document
:query-params opts})))
(let [url (es-util/url-with-path conn index "_doc" id)]
(es-util/decode-response
(http/put url
(merge (:http-opts conn)
{:content-type :json
:body (if (string? document) document (json/generate-string document))
:query-params opts
:accept :json
:throw-exceptions false}))))))

(defn delete
"Deletes document from the index."
([conn index mapping-type id]
(delete conn index mapping-type id nil))
([conn index _mapping-type id opts]
(-> (rest/record-url conn index "_doc" id)
(http/delete (merge {:throw-exceptions false}
(.http-opts conn)
{:content-type :json :query-params opts}
{:accept :json}))
(:body)
(rest/parse-safely))))
(let [url (es-util/url-with-path conn index "_doc" id)]
(es-util/decode-response
(http/delete url
(merge (:http-opts conn)
{:content-type :json
:query-params opts
:accept :json
:throw-exceptions false}))))))

(defn delete-by-query
"Performs a delete-by-query operation over one or more indexes and types.
Multiple indexes and types can be specified by passing in a seq of strings,
otherwise specifying a string suffices."
[conn index _mapping-type query]
(let [admin-token (es-config/elastic-admin-token)
delete-url (rest/delete-by-query-url
conn
(join-names index))
response (http/post delete-url
{:headers {"Authorization" admin-token
"Confirm-delete-action" "true"
:client-id t-config/cmr-client-id}
:content-type :json
:body (json/generate-string {:query query})
:throw-exceptions false})
status (:status response)]
(if (#{200 201} status)
(rest/parse-safely (:body response))
(throw (ex-info (str "Delete by query failed with status " status)
{:status status :body (:body response)})))))
(let [admin-token (es-config/elastic-admin-token)
url (es-util/url-with-path conn index "_delete_by_query")
response (http/post url
(merge (:http-opts conn)
{:headers {"Authorization" admin-token
"Confirm-delete-action" "true"
:client-id t-config/cmr-client-id}
:content-type :json
:body (json/generate-string {:query query})
:throw-exceptions false}))
status (:status response)]
(if (#{200 201} status)
(es-util/decode-response response)
(throw (ex-info (str "Delete by query failed with status " status)
{:status status :body (:body response)})))))

(defn delete-index
"Deletes an index from the elastic store"
[conn index]
(rest/delete conn
(rest/url-with-path conn index)))
(let [url (es-util/url-with-path conn index)]
(es-util/decode-response
(http/delete url
(merge (:http-opts conn)
{:accept :json})))))

(defn bulk
"Performs a bulk operation"
([conn operations] (bulk conn operations nil))
([conn operations params]
(when (not-empty operations)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(when (seq ...

(rest/post-string conn (rest/bulk-url conn)
{:body (-> (map json/encode operations)
(interleave (repeat "\n"))
(string/join))
:content-type "application/x-ndjson"
:query-params params}))))
(let [url (es-util/url-with-path conn "_bulk")]
(es-util/decode-response
(http/post url
(merge (:http-opts conn)
{:body (-> (map json/encode operations)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain in a comment what we are doing with the \n and why es needs it?

(interleave (repeat "\n"))
(string/join)
(str "\n"))
:content-type "application/x-ndjson"
:query-params params
:accept :json
:throw-exceptions false})))))))

(defn clear-scroll
"Performs a clear scroll call for the given scroll id"
[conn scroll-id]
(rest/delete conn (rest/scroll-url conn) {:content-type :json
:body {:scroll_id scroll-id}}))
(let [url (es-util/url-with-path conn "_search" "scroll")]
(es-util/decode-response
(http/delete url
(merge (:http-opts conn)
{:content-type :json
:body (json/generate-string {:scroll_id scroll-id})
:accept :json
:throw-exceptions false})))))

(defn migrate-index
"Copies the contents of one index into another. Used for resharding."
[conn source-index target-index]
(let [body {"source" {:index source-index}
"dest" {:index target-index
:version_type "external_gte"}}
url (str (rest/url-with-path conn "_reindex") "?wait_for_completion=false")]
(rest/post-string conn url
{:body (json/encode body)
:content-type "application/json"})))
url (str (es-util/url-with-path conn "_reindex") "?wait_for_completion=false")]
(es-util/decode-response
(http/post url
(merge (:http-opts conn)
{:body (json/encode body)
:content-type "application/json"
:accept :json})))))

(defn get-reindex-task-status
"Get the reindex task status and if there were any failures if the task is considered COMPLETE.
Returns a map that captures the complete status and if there were any failures.
Example map return:
{
:complete true
:failures [{
:index: \"new_index\",
:type: \"_doc\",
:id\": \"doc_id\",
:cause: {
:type: \"exception\",
:reason: \"failure reason\"
:caused_by: {
:type: \"number_format_exception\",
:reason: \"cause reason\"
}
}
:status: 400
}]
:error {:type \"type\"
:reason \"reason\"
:caused_by {}
}
}"
Returns a map that captures the complete status and if there were any failures."
[conn index reindex-task-id]
(try
(let [url (rest/url-with-path conn "_tasks" reindex-task-id)
resp (rest/get conn url)
(let [url (es-util/url-with-path conn "_tasks" reindex-task-id)
resp (es-util/decode-response
(http/get url
(merge (:http-opts conn)
{:accept :json})))
completed (:completed resp)
failures (get-in resp [:response :failures])
task-error (:error resp)
Expand Down
Loading
Loading