Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
}

func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Duration, error) {
esLastModified, err := ix.GetESLastModified(ix.Name())
esLastModified, err := ix.GetESLastModified(ctx, ix.Name())

Check warning on line 144 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L144

Added line #L144 was not covered by tests
if err != nil {
return 0, fmt.Errorf("error getting ES last modified: %w", err)
}
Expand Down
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ require (
github.com/getsentry/sentry-go v0.31.1
github.com/lib/pq v1.10.9
github.com/nyaruka/ezconf v0.3.0
github.com/nyaruka/gocommon v1.60.5
github.com/nyaruka/gocommon v1.61.1
github.com/samber/slog-multi v1.4.0
github.com/samber/slog-sentry/v2 v2.9.3
github.com/stretchr/testify v1.10.0
)

require (
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.29.9 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.62 // indirect
github.com/aws/aws-sdk-go-v2/config v1.29.12 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.65 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 // indirect
github.com/aws/smithy-go v1.22.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -36,15 +36,15 @@ require (
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.1 // indirect
github.com/nyaruka/null/v2 v2.0.3 // indirect
github.com/nyaruka/phonenumbers v1.5.0 // indirect
github.com/nyaruka/phonenumbers v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.49.1 // indirect
github.com/samber/slog-common v0.18.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
32 changes: 16 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/config v1.29.9 h1:Kg+fAYNaJeGXp1vmjtidss8O2uXIsXwaRqsQJKXVr+0=
github.com/aws/aws-sdk-go-v2/config v1.29.9/go.mod h1:oU3jj2O53kgOU4TXq/yipt6ryiooYjlkqqVaZk7gY/U=
github.com/aws/aws-sdk-go-v2/credentials v1.17.62 h1:fvtQY3zFzYJ9CfixuAQ96IxDrBajbBWGqjNTCa79ocU=
github.com/aws/aws-sdk-go-v2/credentials v1.17.62/go.mod h1:ElETBxIQqcxej++Cs8GyPBbgMys5DgQPTwo7cUPDKt8=
github.com/aws/aws-sdk-go-v2/config v1.29.12 h1:Y/2a+jLPrPbHpFkpAAYkVEtJmxORlXoo5k2g1fa2sUo=
github.com/aws/aws-sdk-go-v2/config v1.29.12/go.mod h1:xse1YTjmORlb/6fhkWi8qJh3cvZi4JoVNhc+NbJt4kI=
github.com/aws/aws-sdk-go-v2/credentials v1.17.65 h1:q+nV2yYegofO/SUXruT+pn4KxkxmaQ++1B/QedcKBFM=
github.com/aws/aws-sdk-go-v2/credentials v1.17.65/go.mod h1:4zyjAuGOdikpNYiSGpsGz8hLGmUzlY8pc8r9QQ/RXYQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
Expand All @@ -18,10 +18,10 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
github.com/aws/aws-sdk-go-v2/service/sso v1.25.1 h1:8JdC7Gr9NROg1Rusk25IcZeTO59zLxsKgE0gkh5O6h0=
github.com/aws/aws-sdk-go-v2/service/sso v1.25.1/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.1 h1:KwuLovgQPcdjNMfFt9OhUd9a2OwcOKhxfvF4glTzLuA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs=
github.com/aws/aws-sdk-go-v2/service/sso v1.25.2 h1:pdgODsAhGo4dvzC3JAG5Ce0PX8kWXrTZGx+jxADD+5E=
github.com/aws/aws-sdk-go-v2/service/sso v1.25.2/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.0 h1:90uX0veLKcdHVfvxhkWUQSCi5VabtwMLFutYiRke4oo=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.0/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 h1:PZV5W8yk4OtH1JAuhV2PXwwO9v5G5Aoj+eMCn4T+1Kc=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4=
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
Expand Down Expand Up @@ -58,12 +58,12 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nyaruka/ezconf v0.3.0 h1:kGvJqVN8AHowb4HdaHAviJ0Z3yI5Pyekp1WqibFEaGk=
github.com/nyaruka/ezconf v0.3.0/go.mod h1:89GUW6EPRNLIxT7lC4LWnjWTgZeQwRoX7lBmc8ralAU=
github.com/nyaruka/gocommon v1.60.5 h1:V10rosGzVArRspilfbi65TyHBZzjLQbwmaBeicr2Drw=
github.com/nyaruka/gocommon v1.60.5/go.mod h1:kFJuOq8COneV7ssfK6xgCMJ8gP8fQifLQnNXBnE4YL0=
github.com/nyaruka/gocommon v1.61.1 h1:qulMx0jHDWoDagCJbVs7CMLZA33jfa5kYutlXR66pHM=
github.com/nyaruka/gocommon v1.61.1/go.mod h1:HcwpCzwt8XK7SxmGHYRF1R9viOAvlU7VtXTGuEmJx/8=
github.com/nyaruka/null/v2 v2.0.3 h1:rdmMRQyVzrOF3Jff/gpU/7BDR9mQX0lcLl4yImsA3kw=
github.com/nyaruka/null/v2 v2.0.3/go.mod h1:OCVeCkCXwrg5/qE6RU0c1oUVZBy+ZDrT+xYg1XSaIWA=
github.com/nyaruka/phonenumbers v1.5.0 h1:0M+Gd9zl53QC4Nl5z1Yj1O/zPk2XXBUwR/vlzdXSJv4=
github.com/nyaruka/phonenumbers v1.5.0/go.mod h1:gv+CtldaFz+G3vHHnasBSirAi3O2XLqZzVWz4V1pl2E=
github.com/nyaruka/phonenumbers v1.6.0 h1:r9ax45fFg+YLUs2X4bNXm5RAxWl00hYjFgNlv32vtHk=
github.com/nyaruka/phonenumbers v1.6.0/go.mod h1:7gjs+Lchqm49adhAKB5cdcng5ZXgt6x7Jgvi0ZorUtU=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -88,14 +88,14 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw=
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM=
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
31 changes: 16 additions & 15 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Indexer interface {
Index(rt *runtime.Runtime, rebuild, cleanup bool) (string, error)
Stats() Stats

GetESLastModified(index string) (time.Time, error)
GetESLastModified(ctx context.Context, index string) (time.Time, error)
GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error)
}

Expand Down Expand Up @@ -98,9 +98,9 @@ func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration
type infoResponse map[string]interface{}

// FindIndexes finds all our physical indexes
func (i *baseIndexer) FindIndexes() []string {
func (i *baseIndexer) FindIndexes(ctx context.Context) []string {
response := infoResponse{}
_, err := utils.MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response)
_, err := utils.MakeJSONRequest(ctx, http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response)
indexes := make([]string, 0)

// error could mean a variety of things, but we'll figure that out later
Expand Down Expand Up @@ -128,7 +128,7 @@ func (i *baseIndexer) FindIndexes() []string {
// that index to `contacts`.
//
// If the day-specific name already exists, we append a .1 or .2 to the name.
func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
func (i *baseIndexer) createNewIndex(ctx context.Context, def *IndexDefinition) (string, error) {
// create our day-specific name
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
idx := 0
Expand All @@ -152,7 +152,7 @@ func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
// create the new index
settings := jsonx.MustMarshal(def)

_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s", i.elasticURL, index), settings, nil)
_, err := utils.MakeJSONRequest(ctx, http.MethodPut, fmt.Sprintf("%s/%s", i.elasticURL, index), settings, nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -185,11 +185,11 @@ type removeAliasCommand struct {
}

// maps this indexer's alias to the new physical index, removing existing aliases if they exist
func (i *baseIndexer) updateAlias(newIndex string) error {
func (i *baseIndexer) updateAlias(ctx context.Context, newIndex string) error {
commands := make([]interface{}, 0)

// find existing physical indexes
existing := i.FindIndexes()
existing := i.FindIndexes(ctx)
for _, idx := range existing {
remove := removeAliasCommand{}
remove.Remove.Alias = i.name
Expand All @@ -207,7 +207,7 @@ func (i *baseIndexer) updateAlias(newIndex string) error {

aliasJSON := jsonx.MustMarshal(aliasCommand{Actions: commands})

_, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil)
_, err := utils.MakeJSONRequest(ctx, http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil)

i.log().Info("updated alias", "index", newIndex)

Expand All @@ -222,9 +222,9 @@ type healthResponse struct {
}

// removes all indexes that are older than the currently active index
func (i *baseIndexer) cleanupIndexes() error {
func (i *baseIndexer) cleanupIndexes(ctx context.Context) error {
// find our current indexes
currents := i.FindIndexes()
currents := i.FindIndexes(ctx)

// no current indexes? this a noop
if len(currents) == 0 {
Expand All @@ -233,7 +233,7 @@ func (i *baseIndexer) cleanupIndexes() error {

// find all the current indexes
healthResponse := healthResponse{}
_, err := utils.MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse)
_, err := utils.MakeJSONRequest(ctx, http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse)
if err != nil {
return err
}
Expand All @@ -242,7 +242,7 @@ func (i *baseIndexer) cleanupIndexes() error {
for key := range healthResponse.Indices {
if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 {
slog.Info("removing old index", "index", key)
_, err = utils.MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil)
_, err = utils.MakeJSONRequest(ctx, http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil)
if err != nil {
return err
}
Expand All @@ -268,11 +268,11 @@ type indexResponse struct {
}

// indexes the batch of contacts
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) {
func (i *baseIndexer) indexBatch(ctx context.Context, index string, batch []byte) (int, int, int, error) {
response := indexResponse{}
indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)

_, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
_, err := utils.MakeJSONRequest(ctx, http.MethodPut, indexURL, batch, &response)
if err != nil {
return 0, 0, 0, err
}
Expand Down Expand Up @@ -324,12 +324,13 @@ type queryResponse struct {
}

// GetESLastModified queries a concrete index and finds the last modified document, returning its modified time
func (i *baseIndexer) GetESLastModified(index string) (time.Time, error) {
func (i *baseIndexer) GetESLastModified(ctx context.Context, index string) (time.Time, error) {
lastModified := time.Time{}

// get the newest document on our index
queryResponse := &queryResponse{}
_, err := utils.MakeJSONRequest(
ctx,
http.MethodPost,
fmt.Sprintf("%s/%s/_search", i.elasticURL, index),
[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1, "track_total_hits": false}`),
Expand Down
3 changes: 2 additions & 1 deletion indexers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexers_test

import (
"bytes"
"context"
"database/sql"
"io"
"log/slog"
Expand Down Expand Up @@ -90,7 +91,7 @@ func elasticRequest(t *testing.T, cfg *runtime.Config, method, path string, data
if data != nil {
body = bytes.NewReader(jsonx.MustMarshal(data))
}
req, err := httpx.NewRequest(method, cfg.ElasticURL+path, body, map[string]string{"Content-Type": "application/json"})
req, err := httpx.NewRequest(context.Background(), method, cfg.ElasticURL+path, body, map[string]string{"Content-Type": "application/json"})
require.NoError(t, err)

trace, err := httpx.DoTrace(http.DefaultClient, req, nil, nil, -1)
Expand Down
12 changes: 6 additions & 6 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (i *ContactIndexer) Index(rt *runtime.Runtime, rebuild, cleanup bool) (stri
var err error

// find our physical index
physicalIndexes := i.FindIndexes()
physicalIndexes := i.FindIndexes(ctx)

physicalIndex := ""
if len(physicalIndexes) > 0 {
Expand All @@ -49,15 +49,15 @@ func (i *ContactIndexer) Index(rt *runtime.Runtime, rebuild, cleanup bool) (stri

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || rebuild {
physicalIndex, err = i.createNewIndex(i.definition)
physicalIndex, err = i.createNewIndex(ctx, i.definition)
if err != nil {
return "", fmt.Errorf("error creating new index: %w", err)
}
i.log().Info("created new physical index", "index", physicalIndex)
remapAlias = true
}

lastModified, err := i.GetESLastModified(physicalIndex)
lastModified, err := i.GetESLastModified(ctx, physicalIndex)
if err != nil {
return "", fmt.Errorf("error finding last modified: %w", err)
}
Expand All @@ -72,7 +72,7 @@ func (i *ContactIndexer) Index(rt *runtime.Runtime, rebuild, cleanup bool) (stri

// if the index didn't previously exist or we are rebuilding, remap to our alias
if remapAlias {
err := i.updateAlias(physicalIndex)
err := i.updateAlias(ctx, physicalIndex)
if err != nil {
return "", fmt.Errorf("error updating alias: %w", err)
}
Expand All @@ -81,7 +81,7 @@ func (i *ContactIndexer) Index(rt *runtime.Runtime, rebuild, cleanup bool) (stri

// cleanup our aliases if appropriate
if cleanup {
err := i.cleanupIndexes()
err := i.cleanupIndexes(ctx)
if err != nil {
return "", fmt.Errorf("error cleaning up old indexes: %w", err)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st

indexSubBatch := func(b *bytes.Buffer) error {
t := time.Now()
created, updated, deleted, err := i.indexBatch(index, b.Bytes())
created, updated, deleted, err := i.indexBatch(ctx, index, b.Bytes())
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ var contactQueryTests = []struct {
}

func TestContacts(t *testing.T) {
ctx := context.Background()
rt := setup(t)

ix1 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4)
Expand All @@ -175,7 +176,7 @@ func TestContacts(t *testing.T) {
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), dbModified, 0)

// error trying to get ES last modified on before index exists
_, err = ix1.GetESLastModified(rt.Config.ContactsIndex)
_, err = ix1.GetESLastModified(ctx, rt.Config.ContactsIndex)
assert.Error(t, err)

expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))
Expand All @@ -186,7 +187,7 @@ func TestContacts(t *testing.T) {

time.Sleep(1 * time.Second)

esModified, err := ix1.GetESLastModified(rt.Config.ContactsIndex)
esModified, err := ix1.GetESLastModified(ctx, rt.Config.ContactsIndex)
assert.NoError(t, err)
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0)

Expand All @@ -197,7 +198,7 @@ func TestContacts(t *testing.T) {
assertQuery(t, rt.Config, tc.query, tc.expected, "query mismatch for %s", tc.query)
}

lastModified, err := ix1.GetESLastModified(indexName)
lastModified, err := ix1.GetESLastModified(ctx, indexName)
assert.NoError(t, err)
assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC))

Expand Down
5 changes: 3 additions & 2 deletions utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -50,10 +51,10 @@ func shouldRetry(request *http.Request, response *http.Response, withDelay time.
}

// MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct
func MakeJSONRequest(method string, url string, body []byte, dest any) (*http.Response, error) {
func MakeJSONRequest(ctx context.Context, method string, url string, body []byte, dest any) (*http.Response, error) {
l := slog.With("url", url, "method", method)

req, _ := httpx.NewRequest(method, url, bytes.NewReader(body), map[string]string{"Content-Type": "application/json"})
req, _ := httpx.NewRequest(ctx, method, url, bytes.NewReader(body), map[string]string{"Content-Type": "application/json"})
resp, err := httpx.Do(http.DefaultClient, req, retryConfig, nil)
if err != nil {
l.Error("error making request", "error", err)
Expand Down
5 changes: 4 additions & 1 deletion utils/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils_test

import (
"context"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -11,6 +12,8 @@ import (
)

func TestRetryServer(t *testing.T) {
ctx := context.Background()

responseCounter := 0
responses := []func(w http.ResponseWriter, r *http.Request){
func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -32,7 +35,7 @@ func TestRetryServer(t *testing.T) {
}))
defer ts.Close()

resp, err := utils.MakeJSONRequest("GET", ts.URL, nil, nil)
resp, err := utils.MakeJSONRequest(ctx, "GET", ts.URL, nil, nil)
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode)

Expand Down