Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 29 additions & 52 deletions client/elasticClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"fmt"
"io"
"net/http"
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/multiversx/mx-chain-es-indexer-go/data"
"github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
logger "github.com/multiversx/mx-chain-logger-go"
)
Expand Down Expand Up @@ -76,6 +74,26 @@ func (ec *elasticClient) CheckAndCreatePolicy(policyName string, policy *bytes.B
return ec.createPolicy(policyName, policy)
}

// SetWriteIndexTrue will set the provided index as write index
func (ec *elasticClient) SetWriteIndexTrue(alias string, index string) error {
body := fmt.Sprintf(`{"actions" : [ { "add" : { "index" : "%s", "alias" : "%s", "is_write_index" : true } }]}`, index, alias)
res, err := ec.client.Indices.UpdateAliases(
bytes.NewBuffer([]byte(body)),
)
if err != nil {
return err
}

defer closeBody(res)

if res.IsError() {
return fmt.Errorf("%s", res.String())
}

return nil

}

// CheckAndCreateIndex creates a new index if it does not already exist
func (ec *elasticClient) CheckAndCreateIndex(indexName string) error {
if ec.indexExists(indexName) {
Expand Down Expand Up @@ -225,36 +243,18 @@ func (ec *elasticClient) indexExists(index string) bool {

// PolicyExists checks if a policy was already created
func (ec *elasticClient) PolicyExists(policy string) bool {
policyRoute := fmt.Sprintf(
"%s/%s/ism/policies/%s",
ec.elasticBaseUrl,
kibanaPluginPath,
policy,
res, err := ec.client.ILM.GetLifecycle(
ec.client.ILM.GetLifecycle.WithPolicy(policy),
)

req := newRequest(http.MethodGet, policyRoute, nil)
res, err := ec.client.Transport.Perform(req)
if err != nil {
log.Warn("elasticClient.PolicyExists",
"error performing request", err.Error())
log.Warn("elasticClient.PolicyExists", "error", err.Error())
return false
}

response := &esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}

existsRes := &data.Response{}
err = parseResponse(response, existsRes, kibanaResponseErrorHandler)
if err != nil {
log.Warn("elasticClient.PolicyExists",
"error returned by kibana api", err.Error())
return false
if res.StatusCode == http.StatusOK {
return true
}

return existsRes.Status == http.StatusConflict
return false
}

// AliasExists checks if an index alias already exists
Expand Down Expand Up @@ -293,38 +293,15 @@ func (ec *elasticClient) createIndex(index string) error {

// CreatePolicy creates a new policy for elastic indexes. Policies define rollover parameters
func (ec *elasticClient) createPolicy(policyName string, policy *bytes.Buffer) error {
policyRoute := fmt.Sprintf(
"%s/_opendistro/_ism/policies/%s",
ec.elasticBaseUrl,
res, err := ec.client.ILM.PutLifecycle(
policyName,
ec.client.ILM.PutLifecycle.WithBody(policy),
)

req := newRequest(http.MethodPut, policyRoute, policy)
req.Header[headerContentType] = headerContentTypeJSON
req.Header[headerXSRF] = []string{"false"}
res, err := ec.client.Transport.Perform(req)
if err != nil {
return err
}

response := &esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}

existsRes := &data.Response{}
err = parseResponse(response, existsRes, kibanaResponseErrorHandler)
if err != nil {
return err
}

errStr := fmt.Sprintf("%v", existsRes.Error)
if existsRes.Status == http.StatusConflict && !strings.Contains(errStr, errPolicyAlreadyExists) {
return dataindexer.ErrCouldNotCreatePolicy
}

return nil
return parseResponse(res, nil, elasticDefaultErrorResponseHandler)
}

// CreateIndexTemplate creates an elasticsearch index template
Expand Down
6 changes: 6 additions & 0 deletions cmd/elasticindexer/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
"receipts", "scresults", "accountsesdt", "accountsesdthistory", "epochinfo", "scdeploys", "tokens", "tags",
"logs", "delegators", "operations", "esdts", "values", "events", "executionresults"
]
use-templates-from-files = true
[config.policies]
enabled = true
indices-with-policy = ["rating", "transactions", "blocks", "validators", "miniblocks", "rounds", "accounts",
"accountshistory", "receipts", "scresults", "accountsesdt", "accountsesdthistory", "epochinfo",
"scdeploys", "tokens", "tags", "logs", "delegators", "operations", "esdts", "events", "executionresults"]
[config.address-converter]
length = 32
type = "bech32"
Expand Down
46 changes: 46 additions & 0 deletions cmd/elasticindexer/config/indices/accounts.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"index_patterns": [
"accounts-*"
],
"template": {
"settings": {
"index.lifecycle.name": "accounts-policy",
"index.lifecycle.rollover_alias": "accounts",
"number_of_replicas": 1,
"number_of_shards": 5
},
"mappings": {
"properties": {
"address": {
"type": "keyword"
},
"balance": {
"type": "keyword"
},
"balanceNum": {
"type": "double"
},
"nonce": {
"type": "double"
},
"shardID": {
"type": "long"
},
"timestamp": {
"format": "epoch_second",
"type": "date"
},
"timestampMs": {
"type": "date",
"format": "epoch_millis"
},
"totalBalanceWithStake": {
"type": "keyword"
},
"totalBalanceWithStakeNum": {
"type": "double"
}
}
}
}
}
91 changes: 91 additions & 0 deletions cmd/elasticindexer/config/indices/accountsesdt.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"index_patterns": [
"accountsesdt-*"
],
"template": {
"settings": {
"index.lifecycle.name": "accountsesdt-policy",
"index.lifecycle.rollover_alias": "accountsesdt",
"number_of_replicas": 1,
"number_of_shards": 5
},
"mappings": {
"properties": {
"address": {
"type": "keyword"
},
"balance": {
"type": "keyword"
},
"balanceNum": {
"type": "double"
},
"currentOwner": {
"type": "keyword"
},
"data": {
"properties": {
"attributes": {
"index": "false",
"type": "keyword"
},
"creator": {
"type": "keyword"
},
"hash": {
"index": "false",
"type": "keyword"
},
"metadata": {
"index": "false",
"type": "keyword"
},
"name": {
"type": "keyword"
},
"nonEmptyURIs": {
"type": "boolean"
},
"royalties": {
"index": "false",
"type": "long"
},
"tags": {
"type": "text"
},
"uris": {
"type": "text"
}
},
"type": "nested"
},
"identifier": {
"type": "text"
},
"properties": {
"type": "keyword"
},
"shardID": {
"type": "long"
},
"timestamp": {
"format": "epoch_second",
"type": "date"
},
"timestampMs": {
"type": "date",
"format": "epoch_millis"
},
"token": {
"type": "keyword"
},
"tokenNonce": {
"type": "double"
},
"type": {
"type": "keyword"
}
}
}
}
}
49 changes: 49 additions & 0 deletions cmd/elasticindexer/config/indices/accountsesdthistory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"index_patterns": [
"accountsesdthistory-*"
],
"template": {
"settings": {
"index.lifecycle.name": "accountsesdthistory-policy",
"index.lifecycle.rollover_alias": "accountsesdthistory",
"number_of_replicas": 1,
"number_of_shards": 5
},
"mappings": {
"properties": {
"address": {
"type": "keyword"
},
"balance": {
"type": "keyword"
},
"identifier": {
"type": "text"
},
"isSender": {
"type": "boolean"
},
"isSmartContract": {
"type": "boolean"
},
"shardID": {
"type": "long"
},
"timestamp": {
"format": "epoch_second",
"type": "date"
},
"timestampMs": {
"type": "date",
"format": "epoch_millis"
},
"token": {
"type": "keyword"
},
"tokenNonce": {
"type": "double"
}
}
}
}
}
40 changes: 40 additions & 0 deletions cmd/elasticindexer/config/indices/accountshistory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"index_patterns": [
"accountshistory-*"
],
"template": {
"settings": {
"index.lifecycle.name": "accountshistory-policy",
"index.lifecycle.rollover_alias": "accountshistory",
"number_of_replicas": 1,
"number_of_shards": 5
},
"mappings": {
"properties": {
"address": {
"type": "keyword"
},
"balance": {
"type": "keyword"
},
"isSender": {
"type": "boolean"
},
"isSmartContract": {
"type": "boolean"
},
"shardID": {
"type": "long"
},
"timestamp": {
"format": "epoch_second",
"type": "date"
},
"timestampMs": {
"type": "date",
"format": "epoch_millis"
}
}
}
}
}
Loading
Loading