1
+ apiVersion : v1
2
+ kind : ConfigMap
3
+ metadata :
4
+ name : {{ .Release.Name }}-admin-nginx-config
5
+ namespace : {{ .Release.Namespace }}
6
+ labels : {{- include "graphscope-interactive.labels" . | nindent 4 }}
7
+ app.kubernetes.io/component : configmap
8
+ {{- if .Values.commonLabels }}
9
+ {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
10
+ {{- end }}
11
+ {{- if .Values.commonAnnotations }}
12
+ annotations : {{- include "graphscope-interactive.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
13
+ {{- end }}
14
+ data :
15
+ nginx.conf : |
16
+ events {}
17
+ http {
18
+ resolver local=on valid=5s;
19
+ server {
20
+ {{- $adminPort := .Values.primary.service.adminPort | int }}
21
+ listen {{ $adminPort }};
22
+ client_body_buffer_size 10M;
23
+ client_max_body_size 10M;
24
+ location / {
25
+ {{- $primaryBaseName := include "graphscope-interactive.primary.fullname" . }}
26
+ {{- $secondaryBaseName := include "graphscope-interactive.secondary.fullname" . }}
27
+ {{- $replicaCount := .Values.secondary.replicaCount | int }}
28
+ {{- $primaryServiceName := printf "%s.%s.svc.%s" (include "graphscope-interactive.primary.fullname" .) .Release.Namespace .Values.clusterDomain }}
29
+ {{- $secondaryServiceName := printf "%s.%s.svc.%s" (include "graphscope-interactive.secondary.fullname" .) .Release.Namespace .Values.clusterDomain }}
30
+ {{- $port := .Values.secondary.service.adminPort | int }}
31
+ proxy_pass {{ printf "http://%s-0.%s:%d" $primaryBaseName $primaryServiceName $port | quote }};
32
+ content_by_lua_block {
33
+ function arrayToString(arr, separator)
34
+ separator = separator or ", " -- Default separator if not provided
35
+ return table.concat(arr, separator)
36
+ end
37
+ function send_request(http, full_uri, method, body_data, headers)
38
+ local httpc = http.new()
39
+ if method == "GET" or method == "DELETE" then
40
+ return httpc:request_uri(full_uri, {
41
+ method = method,
42
+ })
43
+ elseif method == "POST" or method == "PUT" then
44
+ return httpc:request_uri(full_uri, {
45
+ method = method,
46
+ body = body_data,
47
+ headers = headers
48
+ })
49
+ else
50
+ ngx.log(ngx.ERR, " not recognized method ", method)
51
+ end
52
+ end
53
+ local http = require "resty.http"
54
+ local res = {}
55
+ local status_codes = {}
56
+ local error_message = nil -- Initialize a variable to capture error messages
57
+
58
+ local urls = {
59
+ {{ printf "http://%s-0.%s:%d" $primaryBaseName $primaryServiceName $port | quote }};
60
+ {{- if eq $replicaCount 1 }}
61
+ {{ printf "http://%s-0.%s:%d" $secondaryBaseName $secondaryServiceName $port | quote }}
62
+ {{- else }}
63
+ {{- range $i := until (sub $replicaCount 1 | int ) }}
64
+ {{ printf "\"http://%s-%d.%s:%d\"," $secondaryBaseName $i $secondaryServiceName $port }}
65
+ {{- end }}
66
+ {{ printf "http://%s-%d.%s:%d" $secondaryBaseName (sub $replicaCount 1) $secondaryServiceName $port | quote }}
67
+ {{- end }}
68
+ }
69
+
70
+ local original_headers = ngx.req.get_headers()
71
+ local request_uri=ngx.var.request_uri
72
+ local method = ngx.req.get_method()
73
+
74
+ -- Create a table for modified headers
75
+ local backend_headers = {}
76
+
77
+ -- Copy the relevant headers, if needed, or modify them
78
+ for key, value in pairs(original_headers) do
79
+ -- You can filter headers if needed (e.g., skip "host" or "authorization")
80
+ if key ~= "Host" and key ~= "User-Agent" and key ~= "Content-Length" then
81
+ backend_headers[key] = value
82
+ end
83
+ end
84
+
85
+
86
+ ngx.req.read_body() -- Read the request body
87
+
88
+ -- resize status_codes to the number of replicas
89
+ for i = 1, #urls do
90
+ status_codes[i] = 0
91
+ res[i] = ""
92
+ end
93
+
94
+ local threads = {}
95
+ local body_data = ngx.req.get_body_data()
96
+ for index, backend in ipairs(urls) do
97
+ -- full_uri is backend + request_uri
98
+ local full_uri = backend .. request_uri
99
+ threads[index] = ngx.thread.spawn(function()
100
+ local response, err = send_request(http, full_uri, method, body_data, backend_headers)
101
+ local status_code = 0
102
+ if response ~= nil then
103
+ status_code = response.status
104
+ res[index] = response.body
105
+ if response.status < 200 or response.status >= 300 then
106
+ if not error_message then -- Capture the error message from the first failed request
107
+ error_message = response.body or "Failed request without a body."
108
+ end
109
+ end
110
+ else
111
+ status_code = 500
112
+ if err ~= nil then
113
+ ngx.log(ngx.ERR, "Failed to request: ", err)
114
+ if not error_message then -- Capture error when no response
115
+ error_message = "Error: " .. err
116
+ end
117
+ else
118
+ error_message = "Not found"
119
+ end
120
+ end
121
+ status_codes[index] = status_code
122
+ end)
123
+ end
124
+
125
+ for _, thread in ipairs(threads) do
126
+ coroutine.resume(thread)
127
+ end
128
+
129
+ for _, thread in ipairs(threads) do
130
+ ngx.thread.wait(thread)
131
+ end
132
+
133
+ local success = true
134
+ local final_status_code = 200
135
+ for i = 1, #urls do
136
+ if status_codes[i] < 200 or status_codes[i] >= 300 then
137
+ ngx.log(ngx.ERR, "Failed to request: ", urls[i], " with status code: ", status_codes[i], " and response: ", res[i], " index: ", i)
138
+ success = false
139
+ final_status_code = status_codes[i]
140
+ break
141
+ end
142
+ end
143
+
144
+ ngx.header.content_type = 'application/json'
145
+ if success then
146
+ ngx.status = final_status_code
147
+ ngx.log(ngx.INFO, "Success: ", arrayToString(res, ", "), " with status code: ", final_status_code)
148
+ ngx.say(res[1])
149
+ ngx.exit(final_status_code)
150
+ else
151
+ ngx.status = final_status_code
152
+ ngx.log(ngx.ERR, "Failed to request: ", error_message, " with status code: ", final_status_code)
153
+ ngx.say(error_message)
154
+ ngx.exit(final_status_code)
155
+ end
156
+ }
157
+ }
158
+ }
159
+ }
0 commit comments