Skip to content

Commit 17b89fe

Browse files
committed
Merge branch 'main' into real_elastic_tests_3
2 parents dc7984d + 062ec7f commit 17b89fe

18 files changed

+386
-216
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
v8.1.50 (2023-05-23)
2+
-------------------------
3+
* Remove support for passing URNs to flow/preview_start as that's not a thing we do
4+
* Make the name of the ES index for contacts configurable
5+
16
v8.1.49 (2023-05-18)
27
-------------------------
38
* Remove support for ticket assignment with a note

core/models/contacts.go

+58-11
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ func LoadContact(ctx context.Context, db Queryer, oa *OrgAssets, id ContactID) (
255255
// LoadContacts loads a set of contacts for the passed in ids. Note that the order of the returned contacts
256256
// won't necessarily match the order of the ids.
257257
func LoadContacts(ctx context.Context, db Queryer, oa *OrgAssets, ids []ContactID) ([]*Contact, error) {
258+
if len(ids) == 0 {
259+
return nil, nil
260+
}
261+
258262
start := time.Now()
259263

260264
rows, err := db.QueryxContext(ctx, sqlSelectContact, pq.Array(ids), oa.OrgID())
@@ -386,6 +390,10 @@ func GetContactIDsFromReferences(ctx context.Context, db Queryer, orgID OrgID, r
386390

387391
// gets the contact IDs for the passed in org and set of UUIDs
388392
func getContactIDsFromUUIDs(ctx context.Context, db Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) {
393+
if len(uuids) == 0 {
394+
return nil, nil
395+
}
396+
389397
ids, err := queryContactIDs(ctx, db, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids))
390398
if err != nil {
391399
return nil, errors.Wrapf(err, "error selecting contact ids by UUID")
@@ -647,34 +655,40 @@ func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, ur
647655
return contact, flowContact, created, nil
648656
}
649657

650-
// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as
651-
// the passed in URNs with the ids of the contacts.
652-
func GetOrCreateContactIDsFromURNs(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]ContactID, error) {
658+
// GetOrCreateContactsFromURNs will fetch or create the contacts for the passed in URNs, returning a map of the fetched
659+
// contacts and another map of the created contacts.
660+
func GetOrCreateContactsFromURNs(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]*Contact, map[urns.URN]*Contact, error) {
653661
// ensure all URNs are normalized
654662
for i, urn := range urnz {
655663
urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry()))
656664
}
657665

658666
// find current owners of these URNs
659-
owners, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz)
667+
owners, err := contactsFromURNs(ctx, db, oa, urnz)
660668
if err != nil {
661-
return nil, errors.Wrapf(err, "error looking up contacts for URNs")
669+
return nil, nil, errors.Wrap(err, "error looking up contacts for URNs")
662670
}
663671

672+
fetched := make(map[urns.URN]*Contact, len(urnz))
673+
created := make(map[urns.URN]*Contact, len(urnz))
674+
664675
// create any contacts that are missing
665-
for urn, contactID := range owners {
666-
if contactID == NilContactID {
676+
for urn, contact := range owners {
677+
if contact == nil {
667678
contact, _, _, err := GetOrCreateContact(ctx, db, oa, []urns.URN{urn}, NilChannelID)
668679
if err != nil {
669-
return nil, errors.Wrapf(err, "error creating contact")
680+
return nil, nil, errors.Wrapf(err, "error creating contact")
670681
}
671-
owners[urn] = contact.ID()
682+
created[urn] = contact
683+
} else {
684+
fetched[urn] = contact
672685
}
673686
}
674-
return owners, nil
687+
688+
return fetched, created, nil
675689
}
676690

677-
// looks up the contacts who own the given urns (which should be normalized by the caller) and returns that information as a map
691+
// looks up the contact IDs who own the given urns (which should be normalized by the caller) and returns that information as a map
678692
func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) {
679693
identityToOriginal := make(map[urns.URN]urns.URN, len(urnz))
680694
identities := make([]urns.URN, len(urnz))
@@ -705,6 +719,39 @@ func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urn
705719
return owners, nil
706720
}
707721

722+
// like contactIDsFromURNs but fetches the contacts
723+
func contactsFromURNs(ctx context.Context, db Queryer, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]*Contact, error) {
724+
ids, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz)
725+
if err != nil {
726+
return nil, err
727+
}
728+
729+
// get the ids of the contacts that exist
730+
existingIDs := make([]ContactID, 0, len(ids))
731+
for _, id := range ids {
732+
if id != NilContactID {
733+
existingIDs = append(existingIDs, id)
734+
}
735+
}
736+
737+
fetched, err := LoadContacts(ctx, db, oa, existingIDs)
738+
if err != nil {
739+
return nil, errors.Wrap(err, "error loading contacts")
740+
}
741+
742+
// and transform those into a map by URN
743+
fetchedByID := make(map[ContactID]*Contact, len(fetched))
744+
for _, c := range fetched {
745+
fetchedByID[c.ID()] = c
746+
}
747+
byURN := make(map[urns.URN]*Contact, len(ids))
748+
for urn, id := range ids {
749+
byURN[urn] = fetchedByID[id]
750+
}
751+
752+
return byURN, nil
753+
}
754+
708755
func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN, channelID ChannelID) (ContactID, bool, error) {
709756
// find current owners of these URNs
710757
owners, err := contactIDsFromURNs(ctx, db, orgID, urnz)

core/models/contacts_test.go

+37-35
Original file line numberDiff line numberDiff line change
@@ -358,59 +358,61 @@ func TestGetOrCreateContactIDsFromURNs(t *testing.T) {
358358

359359
defer testsuite.Reset(testsuite.ResetData)
360360

361+
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
362+
assert.NoError(t, err)
363+
361364
// add an orphaned URN
362365
testdata.InsertContactURN(rt, testdata.Org1, nil, urns.URN("telegram:200001"), 100)
363366

364-
contactIDSeq := models.ContactID(30000)
365-
newContact := func() models.ContactID { id := contactIDSeq; contactIDSeq++; return id }
366-
prevContact := func() models.ContactID { return contactIDSeq - 1 }
367-
368-
org, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
369-
assert.NoError(t, err)
367+
cathy, _ := testdata.Cathy.Load(rt, oa)
370368

371369
tcs := []struct {
372-
OrgID models.OrgID
373-
URNs []urns.URN
374-
ContactIDs map[urns.URN]models.ContactID
370+
orgID models.OrgID
371+
urns []urns.URN
372+
fetched map[urns.URN]*models.Contact
373+
created []urns.URN
375374
}{
376375
{
377-
testdata.Org1.ID,
378-
[]urns.URN{testdata.Cathy.URN},
379-
map[urns.URN]models.ContactID{testdata.Cathy.URN: testdata.Cathy.ID},
380-
},
381-
{
382-
testdata.Org1.ID,
383-
[]urns.URN{urns.URN(testdata.Cathy.URN.String() + "?foo=bar")},
384-
map[urns.URN]models.ContactID{urns.URN(testdata.Cathy.URN.String() + "?foo=bar"): testdata.Cathy.ID},
376+
orgID: testdata.Org1.ID,
377+
urns: []urns.URN{testdata.Cathy.URN},
378+
fetched: map[urns.URN]*models.Contact{
379+
testdata.Cathy.URN: cathy,
380+
},
381+
created: []urns.URN{},
385382
},
386383
{
387-
testdata.Org1.ID,
388-
[]urns.URN{testdata.Cathy.URN, urns.URN("telegram:100001")},
389-
map[urns.URN]models.ContactID{
390-
testdata.Cathy.URN: testdata.Cathy.ID,
391-
urns.URN("telegram:100001"): newContact(),
384+
orgID: testdata.Org1.ID,
385+
urns: []urns.URN{urns.URN(testdata.Cathy.URN.String() + "?foo=bar")},
386+
fetched: map[urns.URN]*models.Contact{
387+
urns.URN(testdata.Cathy.URN.String() + "?foo=bar"): cathy,
392388
},
389+
created: []urns.URN{},
393390
},
394391
{
395-
testdata.Org1.ID,
396-
[]urns.URN{urns.URN("telegram:100001")},
397-
map[urns.URN]models.ContactID{urns.URN("telegram:100001"): prevContact()},
392+
orgID: testdata.Org1.ID,
393+
urns: []urns.URN{testdata.Cathy.URN, urns.URN("telegram:100001")},
394+
fetched: map[urns.URN]*models.Contact{
395+
testdata.Cathy.URN: cathy,
396+
},
397+
created: []urns.URN{"telegram:100001"},
398398
},
399399
{
400-
testdata.Org1.ID,
401-
[]urns.URN{urns.URN("telegram:200001")},
402-
map[urns.URN]models.ContactID{urns.URN("telegram:200001"): newContact()}, // new contact assigned orphaned URN
400+
orgID: testdata.Org1.ID,
401+
urns: []urns.URN{urns.URN("telegram:200001")},
402+
fetched: map[urns.URN]*models.Contact{},
403+
created: []urns.URN{"telegram:200001"}, // new contact assigned orphaned URN
403404
},
404405
}
405406

406407
for i, tc := range tcs {
407-
ids, err := models.GetOrCreateContactIDsFromURNs(ctx, rt.DB, org, tc.URNs)
408+
fetched, created, err := models.GetOrCreateContactsFromURNs(ctx, rt.DB, oa, tc.urns)
408409
assert.NoError(t, err, "%d: error getting contact ids", i)
409-
assert.Equal(t, tc.ContactIDs, ids, "%d: mismatch in contact ids", i)
410+
assert.Equal(t, tc.fetched, fetched, "%d: fetched contacts mismatch", i)
411+
assert.Equal(t, tc.created, maps.Keys(created), "%d: created contacts mismatch", i)
410412
}
411413
}
412414

413-
func TestGetOrCreateContactIDsFromURNsRace(t *testing.T) {
415+
func TestGetOrCreateContactsFromURNsRace(t *testing.T) {
414416
ctx, rt := testsuite.Runtime()
415417

416418
defer testsuite.Reset(testsuite.ResetData)
@@ -427,13 +429,13 @@ func TestGetOrCreateContactIDsFromURNsRace(t *testing.T) {
427429
return nil
428430
})
429431

430-
var contacts [2]models.ContactID
432+
var contacts [2]*models.Contact
431433
var errs [2]error
432434

433435
test.RunConcurrently(2, func(i int) {
434-
var cmap map[urns.URN]models.ContactID
435-
cmap, errs[i] = models.GetOrCreateContactIDsFromURNs(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")})
436-
contacts[i] = cmap[urns.URN("telegram:100007")]
436+
var created map[urns.URN]*models.Contact
437+
_, created, errs[i] = models.GetOrCreateContactsFromURNs(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")})
438+
contacts[i] = created[urns.URN("telegram:100007")]
437439
})
438440

439441
require.NoError(t, errs[0])

core/models/starts.go

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ type Exclusions struct {
5555
NotSeenSinceDays int `json:"not_seen_since_days"` // contacts who have not been seen for more than this number of days
5656
}
5757

58+
// NoExclusions is a constant for the empty value
59+
var NoExclusions = Exclusions{}
60+
5861
// Scan supports reading exclusion values from JSON in database
5962
func (e *Exclusions) Scan(value any) error {
6063
if value == nil {

core/msgio/courier.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func QueueCourierMessages(rc redis.Conn, oa *models.OrgAssets, contactID models.
204204
if err != nil {
205205
return err
206206
}
207-
logrus.WithFields(logrus.Fields{"msgs": len(batch), "contact_id": contactID, "channel_uuid": channel.UUID(), "elapsed": time.Since(start)}).Info("msgs queued to courier")
207+
logrus.WithFields(logrus.Fields{"msgs": len(batch), "contact_id": contactID, "channel_uuid": channel.UUID(), "elapsed": time.Since(start)}).Debug("msgs queued to courier")
208208
}
209209
return nil
210210
}

core/search/groups_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestSmartGroups(t *testing.T) {
3030
testdata.Bob.ID,
3131
)
3232

33-
testsuite.ReindexElastic()
33+
testsuite.ReindexElastic(ctx)
3434

3535
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshCampaigns|models.RefreshGroups)
3636
assert.NoError(t, err)

core/search/queries.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"time"
55

66
"github.com/nyaruka/gocommon/dates"
7-
"github.com/nyaruka/gocommon/urns"
87
"github.com/nyaruka/goflow/contactql"
98
"github.com/nyaruka/goflow/envs"
109
"github.com/nyaruka/goflow/flows"
@@ -13,7 +12,7 @@ import (
1312
)
1413

1514
// BuildStartQuery builds a start query for the given flow and start options
16-
func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, urnz []urns.URN, userQuery string, excs models.Exclusions) (string, error) {
15+
func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, userQuery string, excs models.Exclusions, excGroups []*models.Group) (string, error) {
1716
var parsedQuery *contactql.ContactQuery
1817
var err error
1918

@@ -24,10 +23,10 @@ func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, groups []*models.G
2423
}
2524
}
2625

27-
return contactql.Stringify(buildStartQuery(oa.Env(), flow, groups, contactUUIDs, urnz, parsedQuery, excs)), nil
26+
return contactql.Stringify(buildStartQuery(oa.Env(), flow, groups, contactUUIDs, parsedQuery, excs, excGroups)), nil
2827
}
2928

30-
func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, urnz []urns.URN, userQuery *contactql.ContactQuery, excs models.Exclusions) contactql.QueryNode {
29+
func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.Group, contactUUIDs []flows.ContactUUID, userQuery *contactql.ContactQuery, excs models.Exclusions, excGroups []*models.Group) contactql.QueryNode {
3130
inclusions := make([]contactql.QueryNode, 0, 10)
3231

3332
for _, group := range groups {
@@ -36,10 +35,6 @@ func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.G
3635
for _, contactUUID := range contactUUIDs {
3736
inclusions = append(inclusions, contactql.NewCondition("uuid", contactql.PropertyTypeAttribute, contactql.OpEqual, string(contactUUID)))
3837
}
39-
for _, urn := range urnz {
40-
scheme, path, _, _ := urn.ToParts()
41-
inclusions = append(inclusions, contactql.NewCondition(scheme, contactql.PropertyTypeScheme, contactql.OpEqual, path))
42-
}
4338
if userQuery != nil {
4439
inclusions = append(inclusions, userQuery.Root())
4540
}
@@ -58,6 +53,9 @@ func buildStartQuery(env envs.Environment, flow *models.Flow, groups []*models.G
5853
seenSince := dates.Now().Add(-time.Hour * time.Duration(24*excs.NotSeenSinceDays))
5954
exclusions = append(exclusions, contactql.NewCondition("last_seen_on", contactql.PropertyTypeAttribute, contactql.OpGreaterThan, formatQueryDate(env, seenSince)))
6055
}
56+
for _, group := range excGroups {
57+
exclusions = append(exclusions, contactql.NewCondition("group", contactql.PropertyTypeAttribute, contactql.OpNotEqual, group.Name()))
58+
}
6159

6260
return contactql.NewBoolCombination(contactql.BoolOperatorAnd,
6361
contactql.NewBoolCombination(contactql.BoolOperatorOr, inclusions...),

core/search/queries_test.go

+11-13
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"time"
66

77
"github.com/nyaruka/gocommon/dates"
8-
"github.com/nyaruka/gocommon/urns"
98
"github.com/nyaruka/goflow/flows"
109
"github.com/nyaruka/mailroom/core/models"
1110
"github.com/nyaruka/mailroom/core/search"
@@ -29,32 +28,31 @@ func TestBuildStartQuery(t *testing.T) {
2928
testers := oa.GroupByID(testdata.TestersGroup.ID)
3029

3130
tcs := []struct {
32-
groups []*models.Group
33-
contactUUIDs []flows.ContactUUID
34-
urns []urns.URN
35-
userQuery string
36-
exclusions models.Exclusions
37-
expected string
38-
err string
31+
groups []*models.Group
32+
contactUUIDs []flows.ContactUUID
33+
userQuery string
34+
exclusions models.Exclusions
35+
excludeGroups []*models.Group
36+
expected string
37+
err string
3938
}{
4039
{
4140
groups: []*models.Group{doctors, testers},
4241
contactUUIDs: []flows.ContactUUID{testdata.Cathy.UUID, testdata.George.UUID},
43-
urns: []urns.URN{"tel:+1234567890", "telegram:9876543210"},
4442
exclusions: models.Exclusions{},
45-
expected: `group = "Doctors" OR group = "Testers" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf" OR uuid = "8d024bcd-f473-4719-a00a-bd0bb1190135" OR tel = "+1234567890" OR telegram = 9876543210`,
43+
expected: `group = "Doctors" OR group = "Testers" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf" OR uuid = "8d024bcd-f473-4719-a00a-bd0bb1190135"`,
4644
},
4745
{
4846
groups: []*models.Group{doctors},
4947
contactUUIDs: []flows.ContactUUID{testdata.Cathy.UUID},
50-
urns: []urns.URN{"tel:+1234567890"},
5148
exclusions: models.Exclusions{
5249
NonActive: true,
5350
InAFlow: true,
5451
StartedPreviously: true,
5552
NotSeenSinceDays: 90,
5653
},
57-
expected: `(group = "Doctors" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf" OR tel = "+1234567890") AND status = "active" AND flow = "" AND history != "Favorites" AND last_seen_on > "20-01-2022"`,
54+
excludeGroups: []*models.Group{testers},
55+
expected: `(group = "Doctors" OR uuid = "6393abc0-283d-4c9b-a1b3-641a035c34bf") AND status = "active" AND flow = "" AND history != "Favorites" AND last_seen_on > "20-01-2022" AND group != "Testers"`,
5856
},
5957
{
6058
contactUUIDs: []flows.ContactUUID{testdata.Cathy.UUID},
@@ -111,7 +109,7 @@ func TestBuildStartQuery(t *testing.T) {
111109
}
112110

113111
for _, tc := range tcs {
114-
actual, err := search.BuildStartQuery(oa, flow, tc.groups, tc.contactUUIDs, tc.urns, tc.userQuery, tc.exclusions)
112+
actual, err := search.BuildStartQuery(oa, flow, tc.groups, tc.contactUUIDs, tc.userQuery, tc.exclusions, tc.excludeGroups)
115113
if tc.err != "" {
116114
assert.Equal(t, "", actual)
117115
assert.EqualError(t, err, tc.err)

0 commit comments

Comments
 (0)