|
6 | 6 | [clojure.string :as string] |
7 | 7 | [cmr.common.services.errors :as errors] |
8 | 8 | [cmr.elastic-utils.config :as es-config] |
| 9 | + [cmr.elastic-utils.es-util :as es-util] |
9 | 10 | [cmr.transmit.config :as t-config])) |
10 | 11 |
|
11 | | -(defn- parse-safely |
12 | | - "Parses the json body from the response safely" |
13 | | - [body] |
14 | | - (when body |
15 | | - (if (string? body) |
16 | | - (json/decode body true) |
17 | | - body))) |
18 | | - |
19 | | -(defn- decode-response |
20 | | - "Decodes the response body from the given response" |
21 | | - [response] |
22 | | - (-> response |
23 | | - :body |
24 | | - parse-safely)) |
25 | | - |
26 | | -(defn- join-names |
27 | | - "Joins names together with a comma" |
28 | | - [names] |
29 | | - (if (sequential? names) |
30 | | - (string/join "," names) |
31 | | - names)) |
32 | | - |
33 | | -(defn- url-with-path |
34 | | - "Returns the url with the given path" |
35 | | - [conn & path-parts] |
36 | | - (let [path (->> path-parts |
37 | | - (map join-names) |
38 | | - (filter identity) |
39 | | - (string/join "/"))] |
40 | | - (str (:uri conn) "/" path))) |
41 | | - |
42 | 12 | (defn search |
43 | 13 | "Performs a search query across one or more indexes" |
44 | 14 | [conn index _mapping-type opts] |
45 | 15 | (let [qk [:search_type :scroll :routing :preference :ignore_unavailable] |
46 | 16 | qp (merge {:track_total_hits true} |
47 | 17 | (select-keys opts qk)) |
48 | 18 | body (apply dissoc opts qk) |
49 | | - url (url-with-path conn index "_search")] |
| 19 | + url (es-util/url-with-path conn index "_search")] |
50 | 20 | (let [response (http/post url |
51 | 21 | (merge (:http-opts conn) |
52 | 22 | {:content-type :json |
|
56 | 26 | :throw-exceptions false})) |
57 | 27 | status (:status response)] |
58 | 28 | (if (some #{status} [200 201]) |
59 | | - (decode-response response) |
| 29 | + (es-util/decode-response response) |
60 | 30 | (throw (ex-info (str "Search failed with status " status) |
61 | 31 | {:status status :body (:body response)})))))) |
62 | 32 |
|
63 | 33 | (defn count-query |
64 | 34 | "Performs a count query over one or more indexes" |
65 | 35 | [conn index _mapping-type query] |
66 | | - (let [url (url-with-path conn index "_count") |
| 36 | + (let [url (es-util/url-with-path conn index "_count") |
67 | 37 | body (if (get query :query) |
68 | 38 | query |
69 | 39 | {:query query})] |
|
75 | 45 | :throw-exceptions false})) |
76 | 46 | status (:status response)] |
77 | 47 | (if (some #{status} [200 201]) |
78 | | - (decode-response response) |
| 48 | + (es-util/decode-response response) |
79 | 49 | (throw (ex-info (str "Count failed with status " status) |
80 | 50 | {:status status :body (:body response)})))))) |
81 | 51 |
|
82 | 52 | (defn scroll |
83 | 53 | "Performs a scroll query, fetching the next page of results from a query given a scroll id" |
84 | 54 | [conn scroll-id opts] |
85 | | - (let [url (url-with-path conn "_search" "scroll") |
| 55 | + (let [url (es-util/url-with-path conn "_search" "scroll") |
86 | 56 | body (merge {:scroll_id scroll-id} |
87 | 57 | (select-keys opts [:scroll]))] |
88 | | - (decode-response |
| 58 | + (es-util/decode-response |
89 | 59 | (http/post url |
90 | 60 | (merge (:http-opts conn) |
91 | 61 | {:content-type :json |
|
97 | 67 | ([conn index mapping-type id] |
98 | 68 | (doc-get conn index mapping-type id nil)) |
99 | 69 | ([conn index _mapping-type id opts] |
100 | | - (let [url (url-with-path conn index "_doc" id) |
| 70 | + (let [url (es-util/url-with-path conn index "_doc" id) |
101 | 71 | response (http/get url |
102 | 72 | (merge (:http-opts conn) |
103 | 73 | {:query-params opts |
104 | 74 | :accept :json |
105 | 75 | :throw-exceptions false})) |
106 | 76 | status (:status response)] |
107 | 77 | (when-not (= 404 status) |
108 | | - (decode-response response))))) |
| 78 | + (es-util/decode-response response))))) |
109 | 79 |
|
110 | 80 | (defn put |
111 | 81 | "Creates or updates a document in the search index, using the provided document id" |
112 | 82 | ([conn index mapping-type id document] |
113 | 83 | (put conn index mapping-type id document nil)) |
114 | 84 | ([conn index _mapping-type id document opts] |
115 | | - (let [url (url-with-path conn index "_doc" id)] |
116 | | - (decode-response |
| 85 | + (let [url (es-util/url-with-path conn index "_doc" id)] |
| 86 | + (es-util/decode-response |
117 | 87 | (http/put url |
118 | 88 | (merge (:http-opts conn) |
119 | 89 | {:content-type :json |
|
127 | 97 | ([conn index mapping-type id] |
128 | 98 | (delete conn index mapping-type id nil)) |
129 | 99 | ([conn index _mapping-type id opts] |
130 | | - (let [url (url-with-path conn index "_doc" id)] |
131 | | - (decode-response |
| 100 | + (let [url (es-util/url-with-path conn index "_doc" id)] |
| 101 | + (es-util/decode-response |
132 | 102 | (http/delete url |
133 | 103 | (merge (:http-opts conn) |
134 | 104 | {:content-type :json |
|
142 | 112 | otherwise specifying a string suffices." |
143 | 113 | [conn index _mapping-type query] |
144 | 114 | (let [admin-token (es-config/elastic-admin-token) |
145 | | - url (url-with-path conn index "_delete_by_query") |
| 115 | + url (es-util/url-with-path conn index "_delete_by_query") |
146 | 116 | response (http/post url |
147 | 117 | (merge (:http-opts conn) |
148 | 118 | {:headers {"Authorization" admin-token |
|
153 | 123 | :throw-exceptions false})) |
154 | 124 | status (:status response)] |
155 | 125 | (if (#{200 201} status) |
156 | | - (decode-response response) |
| 126 | + (es-util/decode-response response) |
157 | 127 | (throw (ex-info (str "Delete by query failed with status " status) |
158 | 128 | {:status status :body (:body response)}))))) |
159 | 129 |
|
160 | 130 | (defn delete-index |
161 | 131 | "Deletes an index from the elastic store" |
162 | 132 | [conn index] |
163 | | - (let [url (url-with-path conn index)] |
164 | | - (decode-response |
| 133 | + (let [url (es-util/url-with-path conn index)] |
| 134 | + (es-util/decode-response |
165 | 135 | (http/delete url |
166 | 136 | (merge (:http-opts conn) |
167 | 137 | {:accept :json}))))) |
|
171 | 141 | ([conn operations] (bulk conn operations nil)) |
172 | 142 | ([conn operations params] |
173 | 143 | (when (not-empty operations) |
174 | | - (let [url (url-with-path conn "_bulk")] |
175 | | - (decode-response |
| 144 | + (let [url (es-util/url-with-path conn "_bulk")] |
| 145 | + (es-util/decode-response |
176 | 146 | (http/post url |
177 | 147 | (merge (:http-opts conn) |
178 | 148 | {:body (-> (map json/encode operations) |
|
187 | 157 | (defn clear-scroll |
188 | 158 | "Performs a clear scroll call for the given scroll id" |
189 | 159 | [conn scroll-id] |
190 | | - (let [url (url-with-path conn "_search" "scroll")] |
191 | | - (decode-response |
| 160 | + (let [url (es-util/url-with-path conn "_search" "scroll")] |
| 161 | + (es-util/decode-response |
192 | 162 | (http/delete url |
193 | 163 | (merge (:http-opts conn) |
194 | 164 | {:content-type :json |
|
202 | 172 | (let [body {"source" {:index source-index} |
203 | 173 | "dest" {:index target-index |
204 | 174 | :version_type "external_gte"}} |
205 | | - url (str (url-with-path conn "_reindex") "?wait_for_completion=false")] |
206 | | - (decode-response |
| 175 | + url (str (es-util/url-with-path conn "_reindex") "?wait_for_completion=false")] |
| 176 | + (es-util/decode-response |
207 | 177 | (http/post url |
208 | 178 | (merge (:http-opts conn) |
209 | 179 | {:body (json/encode body) |
|
215 | 185 | Returns a map that captures the complete status and if there were any failures." |
216 | 186 | [conn index reindex-task-id] |
217 | 187 | (try |
218 | | - (let [url (url-with-path conn "_tasks" reindex-task-id) |
219 | | - resp (decode-response |
| 188 | + (let [url (es-util/url-with-path conn "_tasks" reindex-task-id) |
| 189 | + resp (es-util/decode-response |
220 | 190 | (http/get url |
221 | 191 | (merge (:http-opts conn) |
222 | 192 | {:accept :json}))) |
|
0 commit comments