-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservice.go
More file actions
418 lines (388 loc) · 18.3 KB
/
Copy pathservice.go
File metadata and controls
418 lines (388 loc) · 18.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
package wservice
import (
"bufio"
"database/sql"
"errors"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/jmoiron/sqlx"
// importing as blank for side-effects puposes only (init)
_ "github.com/lib/pq"
)
// This is where the core business logic resides (the service layer of the gokit onion) and on top of it we will be layering other functionalities that go-kit helps with
// WalletService is the inteface to be used from outside the package that provides operations on accounts.
// GetTable is a method that can be used to get either the Accounts or Transfers table from the Postgres db, basically fetching
// the information about accounts or about all registered transfes. It takes an input sting as the name of the table (Accounts or Tansfers) and return a slice of strings containing
// all the entries in the specific table and a error (nil if the method ran successfully)
// DoTransfer is the method that actually implements the wallet's fund transfer functionality from one account to another. It takes 3 input strings (the source account,
// the destination account and the transferred amount) and returns a status string (like "successful") and an error.
type WalletService interface {
GetTable(string) ([]string, error)
DoTransfer(string, string, string) (string, error)
}
// sqlDBTx is a type that defines the necessary information to establish a Postgres
// database connection and what tables to access (structure of the DB)
type sqlDBTx struct {
sqlDriver string
sqlHost string
sqlPort string
sqlUser string
sqlPassword string
sqlDbName string
sslmode string
accountsTable string
transfersTable string
}
func parseArgs() (string, int) {
var fileName string
// Parse the postrgres configuration file name and path. if not deifned the default is "postgresql.cfg" from /cmd
flag.StringVar(&fileName, "file", "./postgresql.cfg", "Path of postgresql config file to be parsed.")
var portNumber int
// Parse the port number that the server uses to listen and serve. If none is defined the default is 8080
flag.IntVar(&portNumber, "port", 8080, "Port on which the server will listen and serve.")
flag.Parse()
return fileName, portNumber
}
func getDbConfig(fileName string) (WalletService, error) {
// Open Postgres configuration file
file, err := os.Open(fileName)
// If there is an error return a suggestive error message
if err != nil {
sError := "There was a problem opening file " + fileName + " "
var ErrReadFile = errors.New(sError)
cErr := errors.New(ErrReadFile.Error() + err.Error())
var d = sqlDBTx{}
return d, cErr
}
// Defering the file closure to make sure it will eventually be closed
defer file.Close()
// declaring a sqlDBTx stuct that will hold the Postgres connection info
var configStruct sqlDBTx
// Read file and split each line on the " : " separator and then split the string to the right of
// the separtor by another spearator (",") and keep the string to the left of separator
cSlice := []string{}
scanner := bufio.NewScanner(file)
counter := 0
for scanner.Scan() {
item := scanner.Text()
if strings.Index(item, " : ") == -1 {
// Check if there is actually a " : " delimiter on the line, if not then there is a formating issue
var d = sqlDBTx{}
var ErrFormat = errors.New("err: postgres config file error, no ' : ' delimiter found")
return d, ErrFormat
}
if strings.LastIndex(item, " : ") != strings.Index(item, " : ") {
// Check if there are more than one " : " delimiter on the line, if so then there is a formating issue
var d = sqlDBTx{}
var ErrFormat = errors.New("err: postgres config file error, too many ' : ' delimiters found")
return d, ErrFormat
}
s := strings.Split(item, " : ")
v := strings.Split(s[1], ",")
cSlice = append(cSlice, v[0])
counter++
}
// If there are more or less than 9 lines in the config file, then there is a fomatting issue
if counter != 9 {
var d = sqlDBTx{}
var ErrFormat = errors.New("err: postgres config file error, the number of lines in the config file is not the expected one (9)")
return d, ErrFormat
}
// In case of error return the message to the outer function
if err := scanner.Err(); err != nil {
var d = sqlDBTx{}
return d, err
}
// Assign the gathered values to the configStruct struct of type sqlDBTx
configStruct = sqlDBTx{
sqlDriver: cSlice[0],
sqlHost: cSlice[1],
sqlPort: cSlice[2],
sqlUser: cSlice[3],
sqlPassword: cSlice[4],
sqlDbName: cSlice[5],
sslmode: cSlice[6],
accountsTable: cSlice[7],
transfersTable: cSlice[8],
}
return configStruct, nil
}
// NewService exported to be accessible from outside the package (from main)
// NewService is necessary because we need the ability to create a sqlDBTx stuct from outside the package (like from main)
func NewService() (WalletService, int, error) {
// Call function to parse cli arguments
fileName, portNumber := parseArgs()
configStruct, err := getDbConfig(fileName)
// Return the sqlDBTx struct that holds the Postgres db configuration parameters and the Listen and Serve port number
return configStruct, portNumber, err
}
// GetTable is a sqlDBTx type method and its purpose is to fetch the information contained in one of the 2 tables
// of the DB (one that keeps track of transfers and one that keeps track of the information in the wallet accounts)
// GetTable is also one of core functionalities of the Wallet service and has its own go-kit endpoint
func (s sqlDBTx) GetTable(t string) ([]string, error) {
// Based on the information contained on a sqlDBTx struct created with the "NewService" function a DB connection string is defined and a connection is opened
connectionString := "host=" + s.sqlHost + " port=" + s.sqlPort + " user=" + s.sqlUser + " password=" + s.sqlPassword + " dbname=" + s.sqlDbName + " sslmode=" + s.sslmode
db, err := sqlx.Open(s.sqlDriver, connectionString)
// If any error, return it to parent function
if err != nil {
return nil, err
}
// Make sure we actually close the connction once we're done
defer db.Close()
// While the transaction we are about to execute is not committed we will retry it until successful
var isCommitted = false
var results []string
for ok := true; ok; ok = !isCommitted {
// Start a transaction against the Postgres db
// If at anypoint between the "begin" and "commit" there is any kind of issue all changes to the db will be reverted
tx, err := db.Begin()
// If we get an error return a descriptive message and roll back the transaction in the "defer" section
if err != nil {
var ErrStartTx = errors.New("err: error beginning transaction in postgres")
cErr := errors.New(ErrStartTx.Error() + err.Error())
return nil, cErr
}
defer tx.Rollback()
// Set the transaction ISOLATION LEVEL to "Serializable" to allow for multiple instances of the server to run transactions against the same Postgres db
_, err = tx.Exec(`set transaction isolation level serializable`)
//_, err = tx.Exec(`set transaction isolation level repeatable read`) // <=== SET ISOLATION LEVEL
// If an error occurs return it to the parent function
if err != nil {
return nil, err
}
// Set a table lock so we exclude any type of conflicts that could generate data corruption
_, err = tx.Exec("LOCK TABLE Accounts IN SHARE ROW EXCLUSIVE MODE;") // <=== Lock table
// If an error occurs we retry the transaction
if err != nil {
//log.Println(err, "...continuing...")
continue
}
var txString string
if t == s.accountsTable {
// If we are trying to access the table that keeps information about accounts run the following query
txString = "SELECT * FROM " + s.accountsTable + " ORDER BY AccountID;"
} else {
// If, instead we are trying to access the table that keeps information about fund transfers run the following query
txString = "SELECT * FROM " + s.transfersTable + ";"
}
// Get the query result
rows, err := tx.Query(txString)
// If there is an error when sending the query
if err != nil {
// If the error message is indicative of a db collision retry the transaction in a new iteration
if strings.Contains(err.Error(), "could not serialize access due to") {
continue
}
// If the error is that the table has now rows
if err == sql.ErrNoRows {
// And if the table has information about accounts
if t == s.accountsTable {
// Return an appropriate error to the outer function
var ErrAcc = errors.New("err: there are no defined accounts")
cErr := errors.New(ErrAcc.Error() + err.Error())
return nil, cErr
}
// Or, if the table has information about fund transfers return an appropriate error to the outer function
var ErrAcc = errors.New("err: there are no defined transfers")
cErr := errors.New(ErrAcc.Error() + err.Error())
return nil, cErr
}
// If we got a different error return
var ErrUnexp = errors.New("err: Unexpected error occurred")
cErr := errors.New(ErrUnexp.Error() + err.Error())
return nil, cErr
}
// For each row returned in the query results
for rows.Next() {
if t == s.accountsTable {
// If the table has information about accounts get the account ID, balance, currency and the initial balance in a slice of strings
var accountID string
var balance float64
var currency string
var initialBalance float64
if err := rows.Scan(&accountID, &balance, ¤cy, &initialBalance); err != nil {
log.Fatal(err)
}
sBalance := fmt.Sprintf("%f", balance)
sIBalance := fmt.Sprintf("%f", initialBalance)
rString := "Account: " + accountID + " Balance = " + sBalance + " " + currency + " Initial Balance = " + sIBalance
results = append(results, rString)
} else {
// If the table has information about func transfers get the payment ID, source account, destination account, currency and timestamp in a slice of strings
var paymentID int
var fromAccount string
var toAccount string
var amount float64
var currency string
var time string
if err := rows.Scan(&paymentID, &fromAccount, &toAccount, &amount, ¤cy, &time); err != nil {
log.Fatal(err)
}
sPayment := fmt.Sprintf("%d", paymentID)
sAmount := fmt.Sprintf("%f", amount)
rString := "Transfer #" + sPayment + " from: " + fromAccount + " to: " + toAccount + " in the amount of " + sAmount + " " + currency + " at " + time
results = append(results, rString)
}
}
// If we've gotten this far without any errors we can commit our transaction, break out of the transaction loop as the transaction was successful
// and return the slice with all the information parsed from the query's result and a nil error to the parent function
tx.Commit()
isCommitted = true
}
if t == s.transfersTable && len(results) == 0 {
results = append(results, "No submitted transfers yet, this is not an error.")
}
results = append(results, "Success.")
return results, nil
}
// DoTransfer is a sqlDBTx type method that is responsible for the actual fund transfer transaction from one account to another
// DoTransfer takes in 3 arguments: the source account, the destination account and the transferred amount and returns a confirmation string and an empty error
// GetTable is also one of core functionalities of the Wallet service and has its own go-kit endpoint
func (s sqlDBTx) DoTransfer(fromAccount string, toAccount string, transferAmount string) (string, error) {
// Based on the information contained on a sqlDBTx struct created with the "NewService" function a DB connection string is defined and a connection is opened
connectionString := "host=" + s.sqlHost + " port=" + s.sqlPort + " user=" + s.sqlUser + " password=" + s.sqlPassword + " dbname=" + s.sqlDbName + " sslmode=" + s.sslmode
db, err := sqlx.Open(s.sqlDriver, connectionString)
// If any error, return it to parent function
if err != nil {
log.Println("err", err)
return "error", err
}
// check if the source account and destination account are the same and return an error before any transactions happen as we do not support transactions of this type
if fromAccount == toAccount {
var ErrSameAcc = errors.New("the source account is the same as the destination account. ")
//log.Println("err", ErrSameAcc)
return "error", ErrSameAcc
}
// Make sure we actually close the connction once we're done
defer db.Close()
// While the transaction we are about to execute is not committed we will retry it until successful
var isCommitted = false
for ok := true; ok; ok = !isCommitted {
// Start a transaction against the Postgres db
tx, err := db.Begin()
// If we get an error return a descriptive message and roll back the transaction in the "defer" section, then restart transaction
// If at anypoint between the "begin" and "commit" there is an issue all changes to the db will be reverted
if err != nil {
var ErrStartTx = errors.New("err: error beginning transaction in postgres")
cErr := errors.New(ErrStartTx.Error() + err.Error())
return "error", cErr
}
defer tx.Rollback()
// Set the transaction ISOLATION LEVEL to "Serializable" to allow for multiple instances of the server to run transactions against the same Postgres db
_, err = tx.Exec(`set transaction isolation level serializable`)
//_, err = tx.Exec(`set transaction isolation level repeatable read`) // <=== SET ISOLATION LEVEL
// If an error occurs return it to the parent function and restart the transaction
if err != nil {
return "error", err
}
// // Set a table lock so we exclude any type of conflicts that could generate data corruption
_, err = tx.Exec("LOCK TABLE Accounts IN SHARE ROW EXCLUSIVE MODE;") // <=== Lock table
// If an error occurs we retry the transaction
if err != nil {
log.Println(err, "...continuing...")
continue
}
// Fetch the balance and source account currency
var sBalance string
var sCurrency string
txString := "SELECT Balance , Currency FROM " + s.accountsTable + " WHERE AccountID ='" + fromAccount + "';"
err = tx.QueryRow(txString).Scan(&sBalance, &sCurrency)
// Return error messages if the query finds that the indicated source account does not return any results
if err != nil {
if err == sql.ErrNoRows {
var ErrNoSource = errors.New("The source account does not exist")
return "error", ErrNoSource
}
// Otherwise return a relevant error message
var ErrUnexpect = errors.New("err: unexpected error")
cErr := errors.New(ErrUnexpect.Error() + err.Error())
return "error", cErr
}
// If there is an issue with reading the balance return an appropriate error
fBalance, err := strconv.ParseFloat(sBalance, 64)
if err != nil {
var ErrParse = errors.New("Error parsing blance")
return "error", ErrParse
}
// If there is an issue with determining the transferred amout return an appropriate error
fAmount, err := strconv.ParseFloat(transferAmount, 64)
if err != nil {
var ErrParse = errors.New("err: error beginning transaction in postgres")
cErr := errors.New(ErrParse.Error() + err.Error())
return "error", cErr
}
// If the balance is insuficcient to allow the indicated amount transfer return an appropriate message
if fBalance < fAmount {
var ErrBalance = errors.New("Balance insuficient for transaction")
return "error", ErrBalance
}
// Fetch currency of the destination account
var dCurrency string
txString = "SELECT Currency FROM " + s.accountsTable + " WHERE AccountID ='" + toAccount + "';"
err = tx.QueryRow(txString).Scan(&dCurrency)
// if there is an error while fetching the currency retun an appropriate error
if err != nil {
if err == sql.ErrNoRows {
var ErrNoSource = errors.New("The destination account does not exist")
return "error", ErrNoSource
}
return "error", err
}
// If the source account currency is not the same as the destination account currency, then the transfer is not allowed
if dCurrency != sCurrency {
var ErrMissmatch = errors.New("Not same currency in transaction source and destination")
return "error", ErrMissmatch
}
// Make query to implement in the Account table the subtraction of the transfer amount from the source account
txString = "UPDATE " + s.accountsTable + " SET balance = balance - " + transferAmount + " WHERE accountid = '" + fromAccount + "';"
_, err = tx.Exec(txString)
// In case of failures return appropriate error messages
if err != nil {
if strings.Contains(err.Error(), "could not serialize access due to") {
log.Println(err, "...continuing...")
continue
} else {
if strings.Contains(err.Error(), "new row for relation \"accounts\" violates check constraint") {
var ErrParse = errors.New("err: Please check available balance before making transactions. ")
return "error", ErrParse
}
return "error", err
}
}
// Make query to implement in the Account table the addition of the transfer amount to the destination account
txString = "UPDATE " + s.accountsTable + " SET balance = balance + " + transferAmount + " WHERE accountid= '" + toAccount + "';"
_, err = tx.Exec(txString)
// In case of failures if the error message is indicative of a db collision retry the transaction in a new iteration
if err != nil {
if strings.Contains(err.Error(), "could not serialize access due to") {
continue
}
// otherwise return the error message to the outer function
return "error", err
}
t0 := time.Now().Format(time.RFC3339)
// Insert into the table responsible for tracking transactions the information about this particular transfer:
// Transaction ID, Source account, Destination Account, Amount transferred, Currency of amount transferred and Timestamp of transaction
txString = "INSERT INTO " + s.transfersTable + " (transid, From_Account, To_Account, Amount, Currency, TTime) VALUES( nextval('Payment_counter'), '" + fromAccount + "', '" + toAccount + "', '" + transferAmount + "', '" + sCurrency + "', '" + t0 + "' );"
_, err = tx.Exec(txString)
if err != nil {
// In case of a db write conflict retry the transaction in a new iteration
if strings.Contains(err.Error(), "could not serialize access due to") {
continue
} else {
// otherwise return the error to the outer function
return "error", err
}
}
// If we've gotten this far without any errors we can commit our transaction, break out of the transaction loop as the transaction was successful
// and return an appropriate status mesage with a nil error
tx.Commit()
isCommitted = true
}
return "success", nil
}