Skip to content

Commit d6d05cb

Browse files
authored
Merge pull request #6 from iluhinsky/feature/es
Added ElasticSearch support
2 parents f746c91 + f835d14 commit d6d05cb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+6983
-464
lines changed

acronis-db-bench/events.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,24 @@ func (e *EventBus) DoAlign() (bool, error) {
327327
* step #1 - get fresh events
328328
*/
329329

330-
var rows, err = tx.Search("acronis_db_bench_eventbus_events",
331-
"internal_id, topic_internal_id, event_type_internal_id, event_id, source, sequence, tenant_id, client_id, trace_parent, "+
332-
"subject_id, data_ref, data, data_base64, created_at",
333-
"",
334-
"internal_id",
335-
e.batchSize,
336-
false)
330+
var rows, err = tx.Query(fmt.Sprintf(`
331+
SELECT internal_id,
332+
topic_internal_id,
333+
event_type_internal_id,
334+
event_id,
335+
source,
336+
sequence,
337+
tenant_id,
338+
client_id,
339+
trace_parent,
340+
subject_id,
341+
data_ref,
342+
data,
343+
data_base64,
344+
created_at
345+
FROM acronis_db_bench_eventbus_events
346+
ORDER BY internal_id
347+
LIMIT %d;`, e.batchSize))
337348
if err != nil {
338349
return err
339350
}
@@ -501,13 +512,16 @@ func (e *EventBus) DoFetch() (bool, error) {
501512
return false, err
502513
}
503514

504-
var rows, err = sess.Search("acronis_db_bench_eventbus_stream s INNER JOIN acronis_db_bench_eventbus_data d ON s.int_id = d.int_id",
505-
"s.int_id, s.topic_id, d.type_id, s.seq, s.seq_time, d.data",
506-
"s.topic_id = $1 AND s.seq IS NOT NULL AND s.seq > $2",
507-
"s.seq",
508-
e.batchSize,
509-
false,
510-
t, cur64)
515+
var rows, err = sess.Query(fmt.Sprintf(`
516+
SELECT s.int_id, s.topic_id, d.type_id, s.seq, s.seq_time, d.data
517+
FROM acronis_db_bench_eventbus_stream s
518+
INNER JOIN acronis_db_bench_eventbus_data d ON s.int_id = d.int_id
519+
WHERE s.topic_id = %s
520+
AND s.seq IS NOT NULL
521+
AND s.seq > %d
522+
ORDER BY s.seq
523+
LIMIT %d;`,
524+
t, cur64, e.batchSize))
511525
if err != nil {
512526
return false, err
513527
}

acronis-db-bench/tables.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ var TestTableTimeSeriesSQL = TestTable{
454454
CreateQueryPatchFuncs: []CreateQueryPatchFunc{
455455
func(table string, query string, dialect db.DialectName) (string, error) { //nolint:revive
456456
if dialect == db.CASSANDRA {
457-
query = strings.ReplaceAll(query, "{$bigint_autoinc_pk}", "{$bigint_autoinc}")
457+
query = strings.ReplaceAll(query, string(db.DataTypeBigIntAutoIncPK), string(db.DataTypeBigIntAutoInc))
458458
query = strings.ReplaceAll(query, "value int {$notnull}", `value int,
459459
PRIMARY KEY ((tenant_id, device_id, metric_id), id, ts)
460460
`)

acronis-db-bench/tenant_generator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ func (tc *TenantsCache) PopulateUuidsFromDB(database db.Database) {
453453

454454
var session = database.Session(database.Context(context.Background()))
455455

456-
var rows, err = session.Search(TableNameTenants, "uuid, id, kind, nesting_level", "", "", 0, false)
456+
var rows, err = session.Query(fmt.Sprintf("SELECT uuid, id, kind, nesting_level FROM %s", TableNameTenants))
457457
if err != nil {
458458
return
459459
}
@@ -477,7 +477,7 @@ func (tc *TenantsCache) PopulateUuidsFromDB(database db.Database) {
477477
}
478478

479479
var ctiRows db.Rows
480-
if ctiRows, err = session.Search(TableNameCtiEntities, "uuid", "", "", 0, false); err != nil {
480+
if ctiRows, err = session.Query(fmt.Sprintf("SELECT uuid FROM %s;", TableNameCtiEntities)); err != nil {
481481
return
482482
}
483483

@@ -520,7 +520,7 @@ func (tc *TenantsCache) CreateTenant(rw *benchmark.RandomizerWorker, tx db.Datab
520520
newTenantClosure := TenantClosureObj{ParentID: t.ID, ChildID: t.ID, ParentKind: t.Kind, Barrier: 0}
521521
tcToCreate = append(tcToCreate, newTenantClosure)
522522

523-
rows, err := tx.Search(TableNameTenantClosure, "parent_id, parent_kind, barrier", fmt.Sprintf("child_id = %d", t.ParentID), "", 0, false)
523+
rows, err := tx.Query(fmt.Sprintf("SELECT parent_id, parent_kind, barrier FROM %s WHERE child_id = %d", TableNameTenantClosure, t.ParentID))
524524
if err != nil {
525525
tc.logger.Log(benchmark.LogTrace, 0, fmt.Sprintf("error selecting from table %s: %v", TableNameTenantClosure, err))
526526
return "", err
@@ -599,7 +599,7 @@ func (tc *TenantsCache) createRandomCtiEntity(rw *benchmark.RandomizerWorker) (*
599599
func getMax(database db.Database, field string) int {
600600
var session = database.Session(database.Context(context.Background()))
601601

602-
var maxRows, err = session.Search(TableNameTenants, fmt.Sprintf("COALESCE(MAX(%s),0)", field), "", "", 0, false)
602+
var maxRows, err = session.Query(fmt.Sprintf("SELECT COALESCE(MAX(%s),0) FROM %s;", field, TableNameTenants))
603603
if err != nil {
604604
return 0
605605
}

acronis-db-bench/workers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func testSelect(
176176
}
177177

178178
var session = c.database.Session(c.database.Context(context.Background()))
179-
var rows, err = session.Search(from, what, where, orderBy, batch, explain)
179+
var rows, err = session.SearchRaw(from, what, where, orderBy, batch, explain)
180180
if err != nil {
181181
b.Exit(err)
182182
}

db/consts.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,7 @@ import "strings"
44

55
type DialectName string
66

7-
const (
8-
SQLITE DialectName = "sqlite" // SQLITE is the SQLite driver name
9-
SQLITE3 DialectName = "sqlite3" // SQLITE3 is the SQLite driver name
10-
POSTGRES DialectName = "postgres" // POSTGRES is the PostgreSQL driver name
11-
MYSQL DialectName = "mysql" // MYSQL is the MySQL driver name
12-
MSSQL DialectName = "mssql" // MSSQL is the Microsoft SQL Server driver name
13-
CLICKHOUSE DialectName = "clickhouse" // CLICKHOUSE is the ClickHouse driver name
14-
CASSANDRA DialectName = "cassandra" // CASSANDRA is the Cassandra driver name
15-
)
7+
const ()
168

179
var (
1810
// SupportedDrivers is a string containing all supported drivers

db/db.go

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,24 @@ import (
77
"time"
88
)
99

10+
// Supported dialect
11+
const (
12+
SQLITE DialectName = "sqlite" // SQLITE is the SQLite driver name
13+
SQLITE3 DialectName = "sqlite3" // SQLITE3 is the SQLite driver name
14+
POSTGRES DialectName = "postgres" // POSTGRES is the PostgreSQL driver name
15+
MYSQL DialectName = "mysql" // MYSQL is the MySQL driver name
16+
MSSQL DialectName = "mssql" // MSSQL is the Microsoft SQL Server driver name
17+
CLICKHOUSE DialectName = "clickhouse" // CLICKHOUSE is the ClickHouse driver name
18+
CASSANDRA DialectName = "cassandra" // CASSANDRA is the Cassandra driver name
19+
ELASTICSEARCH DialectName = "elasticsearch" // ELASTICSEARCH is the Elasticsearch driver name
20+
)
21+
22+
// Special conditions for searching
23+
const (
24+
SpecialConditionIsNull = "isnull()"
25+
SpecialConditionIsNotNull = "notnull()"
26+
)
27+
1028
// Connector is an interface for registering database connectors without knowing the specific connector implementations
1129
type Connector interface {
1230
ConnectionPool(cfg Config) (Database, error)
@@ -41,6 +59,9 @@ type Config struct {
4159
DryRun bool
4260
UseTruncate bool
4361

62+
TLSEnabled bool
63+
TLSCACert []byte
64+
4465
QueryLogger Logger
4566
ReadedRowsLogger Logger
4667
QueryTimeLogger Logger
@@ -93,10 +114,38 @@ type databaseQueryRegistrator interface {
93114
StatementExit(statement string, startTime time.Time, err error, showRowsAffected bool, result Result, format string, args []interface{}, rows Rows, dest []interface{})
94115
}
95116

117+
// Page is a struct for storing pagination information
118+
type Page struct {
119+
Limit int64
120+
Offset int64
121+
}
122+
123+
// SelectCtrl is a struct for storing select control information
124+
type SelectCtrl struct {
125+
Fields []string // empty means select count
126+
Where map[string][]string
127+
Order []string
128+
Page Page
129+
130+
OptimizeConditions bool
131+
}
132+
96133
// databaseSearcher is an interface for searching the database
97134
type databaseSearcher interface {
98-
Search(from string, what string, where string, orderBy string, limit int, explain bool, args ...interface{}) (Rows, error)
99-
Aggregate(from string, what string, where string, groupBy string, orderBy string, limit int, explain bool, args ...interface{}) (Rows, error)
135+
SearchRaw(from string, what string, where string, orderBy string, limit int, explain bool, args ...interface{}) (Rows, error)
136+
Search(tableName string, c *SelectCtrl) (Rows, error)
137+
}
138+
139+
// InsertStats is a struct for storing insert statistics
140+
type InsertStats struct {
141+
Successful int64
142+
Failed int64
143+
Total int64
144+
ExpectedSuccesses int64
145+
}
146+
147+
func (s *InsertStats) String() string {
148+
return fmt.Sprintf("successful: %d, failed: %d, total: %d", s.Successful, s.Failed, s.Total)
100149
}
101150

102151
// databaseInserter is an interface for inserting data into the database
@@ -149,14 +198,22 @@ type Session interface {
149198

150199
type TableRow struct {
151200
Name string
152-
Type string
201+
Type DataType
153202
NotNull bool
203+
Indexed bool // only for Elasticsearch
204+
}
205+
206+
type ResilienceSettings struct {
207+
NumberOfShards int
208+
NumberOfReplicas int
154209
}
155210

156211
type TableDefinition struct {
157212
TableRows []TableRow
158213
PrimaryKey []string
159214
Engine string
215+
Resilience ResilienceSettings
216+
LMPolicy string // only for Elasticsearch
160217
}
161218

162219
type IndexType string
@@ -235,9 +292,41 @@ type Database interface {
235292
Close() error
236293
}
237294

295+
type DataType string
296+
297+
const (
298+
DataTypeId DataType = "{$id}"
299+
DataTypeInt DataType = "{$int}"
300+
DataTypeString DataType = "{$string}"
301+
DataTypeString256 DataType = "{$string256}"
302+
DataTypeBigIntAutoIncPK DataType = "{$bigint_autoinc_pk}"
303+
DataTypeBigIntAutoInc DataType = "{$bigint_autoinc}"
304+
DataTypeAscii DataType = "{$ascii}"
305+
DataTypeUUID DataType = "{$uuid}"
306+
DataTypeVarCharUUID DataType = "{$varchar_uuid}"
307+
DataTypeLongBlob DataType = "{$longblob}"
308+
DataTypeHugeBlob DataType = "{$hugeblob}"
309+
DataTypeDateTime DataType = "{$datetime}"
310+
DataTypeDateTime6 DataType = "{$datetime6}"
311+
DataTypeTimestamp6 DataType = "{$timestamp6}"
312+
DataTypeCurrentTimeStamp6 DataType = "{$current_timestamp6}"
313+
DataTypeBinary20 DataType = "{$binary20}"
314+
DataTypeBinaryBlobType DataType = "{$binaryblobtype}"
315+
DataTypeBoolean DataType = "{$boolean}"
316+
DataTypeBooleanFalse DataType = "{$boolean_false}"
317+
DataTypeBooleanTrue DataType = "{$boolean_true}"
318+
DataTypeTinyInt DataType = "{$tinyint}"
319+
DataTypeLongText DataType = "{$longtext}"
320+
DataTypeUnique DataType = "{$unique}"
321+
DataTypeEngine DataType = "{$engine}"
322+
DataTypeNotNull DataType = "{$notnull}"
323+
DataTypeNull DataType = "{$null}"
324+
DataTypeTenantUUIDBoundID DataType = "{$tenant_uuid_bound_id}"
325+
)
326+
238327
// Dialect is an interface for database dialects
239328
type Dialect interface {
240-
GetType(id string) string
329+
GetType(id DataType) string
241330
}
242331

243332
// Recommendation is a struct for storing DB recommendation
@@ -267,6 +356,7 @@ func GetDatabases() []DBType {
267356
ret = append(ret, DBType{Driver: CLICKHOUSE, Symbol: "C", Name: "ClickHouse"})
268357
// "A" is used as the latest symbol of the "Cassandra" due to duplicate with ClickHouse "C"
269358
ret = append(ret, DBType{Driver: CASSANDRA, Symbol: "A", Name: "Cassandra"})
359+
ret = append(ret, DBType{Driver: ELASTICSEARCH, Symbol: "E", Name: "Elasticsearch"})
270360

271361
return ret
272362
}

0 commit comments

Comments
 (0)