-
Notifications
You must be signed in to change notification settings - Fork 101
Expand file tree
/
Copy pathconnect.clj
More file actions
96 lines (87 loc) · 4.4 KB
/
connect.clj
File metadata and controls
96 lines (87 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
(ns cmr.elastic-utils.connect
"Provide functions to invoke elasticsearch"
(:require
[cheshire.core :as json]
[clj-http.client :as client]
[clj-http.conn-mgr :as conn-mgr]
[cmr.common.api.web-server :as web-server]
[cmr.common.log :as log :refer (info)]
[cmr.common.services.errors :as errors]
[cmr.common.services.health-helper :as hh]
[cmr.common.util :as util]))
(defrecord Connection [uri http-opts])
(def ELASTIC_CONNECTION_TIMOUT
"The number of milliseconds to wait before timing out a connection attempt to elasticsearch.
Currently set to 5 minutes."
(* 5 60 util/second-as-milliseconds))
(defn- connect-with-config
"Connects to ES with the given config"
[config]
(let [{:keys [host port retry-handler]} config
http-options {:conn-mgr (conn-mgr/make-reusable-conn-manager
{;; Maximum number of threads that will be used for connecting.
;; Very important that this matches the maximum number of threads
;; that will be running
:threads (web-server/MAX_THREADS)
;; Maximum number of simultaneous connections per host
;; There's usually one elasticsearch hostname and we always
;; connect to the same host so it makes sense to make this
;; larger.
:default-per-route (web-server/MAX_THREADS)
;; This is the length of time in _seconds_ that a connection will
;; be left open for reuse. The default is 5 seconds which is way
;; too short.
:timeout 120})
:retry-handler retry-handler
:socket-timeout ELASTIC_CONNECTION_TIMOUT
:conn-timeout ELASTIC_CONNECTION_TIMOUT
:headers {"Accept" "application/vnd.elasticsearch+json; compatible-with=8"
"Content-Type" "application/vnd.elasticsearch+json; compatible-with=8"}}]
(info (format "Connecting to single ES on %s %d using retry-handler %s"
host port retry-handler))
(->Connection (str "http://" host ":" port) http-options)))
(defn try-connect
[config]
(try
(connect-with-config config)
(catch Exception e
(errors/internal-error!
(format "Unable to connect to elasticsearch at: %s. with %s" config e)))))
(defn- get-elastic-health
"Returns the elastic health by calling elasticsearch cluster health api"
([conn]
(get-elastic-health conn {:wait_for_status "yellow"
:timeout (str (hh/health-check-timeout-seconds) "s")}))
([conn params]
(try
(let [url (str (:uri conn) "/_cluster/health")
response (client/get url (merge (:http-opts conn)
{:query-params params
:accept :json}))]
(json/decode (:body response) true))
(catch Exception e
(format "Unable to get elasticsearch cluster health, caught exception: %s"
(.getMessage e))))))
(defn wait-for-healthy-elastic
"Waits for the elasticsearch cluster health to reach yellow. Pass in a elasticsearch store that
has a :conn key with the elastisch connection"
[elastic-store]
(when (:timed_out (get-elastic-health (:conn elastic-store)
{:wait_for_status "yellow" :timeout "3s"}))
(errors/internal-error! "Timed out waiting for elasticsearch to reach a healthy state")))
(defn health-fn
"Returns the health state of elasticsearch."
[context elastic-key-in-context]
(let [conn (get-in context [:system elastic-key-in-context :conn])
health-detail (get-elastic-health conn)
status (:status health-detail)]
(if (some #{status} ["green" "yellow"])
{:ok? true}
{:ok? false
:problem health-detail})))
(defn health
"Returns the elasticsearch health with timeout handling."
[context elastic-key-in-context]
(let [;; We add 1 second to allow get-elastic-health operation to timeout first
timeout-ms (* util/second-as-milliseconds (inc (hh/health-check-timeout-seconds)))]
(hh/get-health #(health-fn context elastic-key-in-context) timeout-ms)))