Skip to content

Commit 36688ef

Browse files
authored
Use the common etcd client in etos-iut (#83)
1 parent d5f6724 commit 36688ef

File tree

4 files changed

+85
-53
lines changed

4 files changed

+85
-53
lines changed

cmd/iut/main.go

+4-11
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ import (
2525
"time"
2626

2727
config "github.com/eiffel-community/etos-api/internal/configs/iut"
28+
"github.com/eiffel-community/etos-api/internal/database/etcd"
2829
"github.com/eiffel-community/etos-api/internal/logging"
2930
server "github.com/eiffel-community/etos-api/internal/server"
3031
"github.com/eiffel-community/etos-api/pkg/application"
3132
"github.com/eiffel-community/etos-api/pkg/iut/v1alpha1"
3233
"github.com/sirupsen/logrus"
3334
"github.com/snowzach/rotatefilehook"
3435
"go.elastic.co/ecslogrus"
35-
clientv3 "go.etcd.io/etcd/client/v3"
3636
)
3737

3838
// main sets up logging and starts up the webserver.
@@ -62,17 +62,10 @@ func main() {
6262
"user_log": false,
6363
})
6464

65-
// Database connection test
66-
cli, err := clientv3.New(clientv3.Config{
67-
Endpoints: []string{cfg.DatabaseURI()},
68-
DialTimeout: 5 * time.Second,
69-
})
70-
if err != nil {
71-
log.WithError(err).Fatal("failed to create etcd connection")
72-
}
73-
65+
iutEtcdTreePrefix := "/iut"
66+
db := etcd.New(cfg, logger, iutEtcdTreePrefix)
7467
log.Info("Loading v1alpha1 routes")
75-
v1alpha1App := v1alpha1.New(cfg, log, ctx, cli)
68+
v1alpha1App := v1alpha1.New(cfg, log, ctx, db)
7669
defer v1alpha1App.Close()
7770
router := application.New(v1alpha1App)
7871

internal/database/database.go

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ import (
2222
"github.com/google/uuid"
2323
)
2424

25+
type Deleter interface {
26+
Delete() error
27+
}
28+
2529
// Opener is the common interface for database clients
2630
type Opener interface {
2731
Open(context.Context, uuid.UUID) io.ReadWriter

internal/database/etcd/etcd.go

+30-13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
// TODO: refactor the client so that it does not store data it fetched.
3333
// However, without it implementing the database.Opener interface would be more complex (methods readByte, read).
3434
type Etcd struct {
35+
database.Deleter
3536
cfg config.Config
3637
client *clientv3.Client
3738
ID uuid.UUID
@@ -74,6 +75,7 @@ func (etcd Etcd) Write(p []byte) (int, error) {
7475
return 0, errors.New("please create a new etcd client using Open")
7576
}
7677
key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String())
78+
7779
_, err := etcd.client.Put(etcd.ctx, key, string(p))
7880
if err != nil {
7981
return 0, err
@@ -91,35 +93,50 @@ func (etcd *Etcd) readByte() byte {
9193
// Read reads data from etcd and returns p bytes to user
9294
func (etcd *Etcd) Read(p []byte) (n int, err error) {
9395
if etcd.ID == uuid.Nil {
94-
err = errors.New("please create a new etcd client using NewWithID")
95-
return n, err
96+
return 0, errors.New("please create a new etcd client using NewWithID")
9697
}
9798

9899
key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String())
99100

100101
if !etcd.hasRead {
101102
resp, err := etcd.client.Get(etcd.ctx, key)
102103
if err != nil {
103-
return n, err
104+
return 0, err
104105
}
105106
if len(resp.Kvs) == 0 {
106-
return n, io.EOF
107+
return 0, io.EOF
107108
}
108109
etcd.data = resp.Kvs[0].Value
109110
etcd.hasRead = true
110111
}
111112

112113
if len(etcd.data) == 0 {
113-
return n, io.EOF
114+
return 0, io.EOF
114115
}
115-
if c := cap(p); c > 0 {
116-
for n < c {
117-
p[n] = etcd.readByte()
118-
n++
119-
if len(etcd.data) == 0 {
120-
return n, io.EOF
121-
}
122-
}
116+
117+
// Copy as much data as possible to p
118+
// The copy function copies the minimum of len(p) and len(etcd.data) bytes from etcd.data to p
119+
// It returns the number of bytes copied, which is stored in n
120+
n = copy(p, etcd.data)
121+
122+
// Update etcd.data to remove the portion of data that has already been copied to p
123+
// etcd.data[n:] creates a new slice that starts from the n-th byte to the end of the original slice
124+
// This effectively removes the first n bytes from etcd.data, ensuring that subsequent reads start from the correct position
125+
etcd.data = etcd.data[n:]
126+
127+
if n == 0 {
128+
return 0, io.EOF
123129
}
130+
124131
return n, nil
125132
}
133+
134+
// Delete deletes the current key from the database
135+
func (etcd Etcd) Delete() error {
136+
key := fmt.Sprintf("%s/%s", etcd.treePrefix, etcd.ID.String())
137+
_, err := etcd.client.Delete(etcd.ctx, key)
138+
if err != nil {
139+
return fmt.Errorf("Failed to delete key %s: %s", key, err.Error())
140+
}
141+
return nil
142+
}

pkg/iut/v1alpha1/v1alpha1.go

+47-29
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ import (
1919
"context"
2020
"encoding/json"
2121
"fmt"
22+
"io"
2223
"net/http"
2324
"runtime"
2425
"sync"
2526
"time"
2627

2728
eiffelevents "github.com/eiffel-community/eiffelevents-sdk-go"
2829
config "github.com/eiffel-community/etos-api/internal/configs/iut"
30+
"github.com/eiffel-community/etos-api/internal/database"
2931
"github.com/eiffel-community/etos-api/pkg/application"
3032
packageurl "github.com/package-url/packageurl-go"
31-
clientv3 "go.etcd.io/etcd/client/v3"
3233

3334
"github.com/google/uuid"
3435
"github.com/julienschmidt/httprouter"
@@ -38,14 +39,14 @@ import (
3839
type V1Alpha1Application struct {
3940
logger *logrus.Entry
4041
cfg config.Config
41-
database *clientv3.Client
42+
database database.Opener
4243
wg *sync.WaitGroup
4344
}
4445

4546
type V1Alpha1Handler struct {
4647
logger *logrus.Entry
4748
cfg config.Config
48-
database *clientv3.Client
49+
database database.Opener
4950
wg *sync.WaitGroup
5051
}
5152

@@ -71,11 +72,11 @@ func (a *V1Alpha1Application) Close() {
7172
}
7273

7374
// New returns a new V1Alpha1Application object/struct
74-
func New(cfg config.Config, log *logrus.Entry, ctx context.Context, cli *clientv3.Client) application.Application {
75+
func New(cfg config.Config, log *logrus.Entry, ctx context.Context, db database.Opener) application.Application {
7576
return &V1Alpha1Application{
7677
logger: log,
7778
cfg: cfg,
78-
database: cli,
79+
database: db,
7980
wg: &sync.WaitGroup{},
8081
}
8182
}
@@ -120,25 +121,30 @@ type StatusRequest struct {
120121
Id uuid.UUID `json:"id"`
121122
}
122123

123-
type StopRequest struct {
124-
Id uuid.UUID `json:"id"`
125-
}
126-
127124
// Start creates a number of IUTs and stores them in the ETCD database returning a checkout ID.
128125
func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
126+
identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id"))
127+
logger := h.logger.WithField("identifier", identifier).WithContext(r.Context())
128+
if err != nil {
129+
RespondWithError(w, http.StatusInternalServerError, err.Error())
130+
return
131+
}
129132
checkOutID := uuid.New()
130133

131134
w.Header().Set("X-Content-Type-Options", "nosniff")
132135
w.Header().Set("Content-Type", "application/json")
133136

134137
var startReq StartRequest
135138
if err := json.NewDecoder(r.Body).Decode(&startReq); err != nil {
139+
logger.Errorf("Failed to decode request body: %s", r.Body)
136140
RespondWithError(w, http.StatusBadRequest, err.Error())
137141
return
138142
}
139143
defer r.Body.Close()
140144
purl, err := packageurl.FromString(startReq.ArtifactIdentity)
145+
141146
if err != nil {
147+
logger.Errorf("Failed to create a purl struct from artifact identity: %s", startReq.ArtifactIdentity)
142148
RespondWithError(w, http.StatusBadRequest, err.Error())
143149
return
144150
}
@@ -149,74 +155,86 @@ func (h V1Alpha1Handler) Start(w http.ResponseWriter, r *http.Request, ps httpro
149155
}
150156
iuts, err := json.Marshal(purls)
151157
if err != nil {
158+
logger.Errorf("Failed to marshal purls: %s", purls)
152159
RespondWithError(w, http.StatusInternalServerError, err.Error())
153160
return
154161
}
155-
_, err = h.database.Put(r.Context(), fmt.Sprintf("/iut/%s", checkOutID.String()), string(iuts))
162+
client := h.database.Open(r.Context(), identifier)
163+
_, err = client.Write([]byte(string(iuts)))
156164
if err != nil {
165+
logger.Errorf("Failed to write to database: %s", string(iuts))
157166
RespondWithError(w, http.StatusInternalServerError, err.Error())
158167
return
159168
}
160169
startResp := StartResponse{Id: checkOutID}
170+
logger.Debugf("Start response: %s", startResp)
161171
w.WriteHeader(http.StatusOK)
162172
response, _ := json.Marshal(startResp)
163173
_, _ = w.Write(response)
164174
}
165175

166176
// Status creates a simple DONE Status response with IUTs.
167177
func (h V1Alpha1Handler) Status(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
168-
identifier := r.Header.Get("X-Etos-Id")
178+
identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id"))
179+
if err != nil {
180+
RespondWithError(w, http.StatusInternalServerError, err.Error())
181+
}
169182
logger := h.logger.WithField("identifier", identifier).WithContext(r.Context())
170183

171184
id, err := uuid.Parse(r.URL.Query().Get("id"))
185+
client := h.database.Open(r.Context(), identifier)
186+
187+
data, err := io.ReadAll(client)
172188

173-
key := fmt.Sprintf("/iut/%s", id)
174-
dbResp, err := h.database.Get(r.Context(), key)
175189
if err != nil {
176-
logger.Errorf("Failed to look up status request id: %s", id)
177-
RespondWithError(w, http.StatusInternalServerError, err.Error())
178-
return
179-
}
180-
if len(dbResp.Kvs) == 0 {
181-
err = fmt.Errorf("No key found: %s", key)
190+
logger.Errorf("Failed to look up status request id: %s, %s", identifier, err.Error())
182191
RespondWithError(w, http.StatusInternalServerError, err.Error())
183192
return
184193
}
185194
statusResp := StatusResponse{
186195
Id: id,
187196
Status: "DONE",
188197
}
189-
if err = json.Unmarshal(dbResp.Kvs[0].Value, &statusResp.Iuts); err != nil {
198+
if err = json.Unmarshal(data, &statusResp.Iuts); err != nil {
199+
logger.Errorf("Failed to unmarshal data: %s", data)
190200
RespondWithError(w, http.StatusInternalServerError, err.Error())
191201
return
192202
}
193203
response, err := json.Marshal(statusResp)
194204
if err != nil {
205+
logger.Errorf("Failed to marshal status response: %s", statusResp)
195206
RespondWithError(w, http.StatusInternalServerError, err.Error())
196207
return
197208
}
209+
logger.Debugf("Status response: %s", statusResp)
198210
w.WriteHeader(http.StatusOK)
199211
_, _ = w.Write(response)
212+
200213
}
201214

202215
// Stop deletes the given IUTs from the database and returns an empty response.
203216
func (h V1Alpha1Handler) Stop(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
204-
identifier := r.Header.Get("X-Etos-Id")
217+
identifier, err := uuid.Parse(r.Header.Get("X-Etos-Id"))
218+
if err != nil {
219+
RespondWithError(w, http.StatusInternalServerError, err.Error())
220+
}
205221
logger := h.logger.WithField("identifier", identifier).WithContext(r.Context())
206222

207-
var stopReq StopRequest
208-
defer r.Body.Close()
209-
if err := json.NewDecoder(r.Body).Decode(&stopReq); err != nil {
210-
logger.Errorf("Bad delete request: %s", err.Error())
211-
RespondWithError(w, http.StatusBadRequest, err.Error())
212-
return
223+
client := h.database.Open(r.Context(), identifier)
224+
deleter, canDelete := client.(database.Deleter)
225+
if !canDelete {
226+
logger.Warning("The database does not support delete. Writing nil.")
227+
_, err = client.Write(nil)
228+
} else {
229+
err = deleter.Delete()
213230
}
214-
_, err := h.database.Delete(r.Context(), fmt.Sprintf("/iut/%s", stopReq.Id))
231+
215232
if err != nil {
216-
logger.Errorf("Etcd delete failed: %s", err.Error())
233+
logger.Errorf("Database delete failed: %s", err.Error())
217234
RespondWithError(w, http.StatusInternalServerError, err.Error())
218235
return
219236
}
237+
logger.Debugf("Stop request succeeded")
220238
w.WriteHeader(http.StatusNoContent)
221239
}
222240

0 commit comments

Comments
 (0)