Skip to content

Commit bf76fe6

Browse files
authored
Merge pull request #831 from nyaruka/msg_session
Start using new session field on queued messages
2 parents 622261d + 9a50a5b commit bf76fe6

File tree

9 files changed

+43
-19
lines changed

9 files changed

+43
-19
lines changed

Diff for: backends/rapidpro/backend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status
515515

516516
// if message was successfully sent, and we have a session timeout, update it
517517
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
518-
if wasSuccess && dbMsg.SessionTimeout_ != 0 {
518+
if wasSuccess && dbMsg.Session_ != nil && dbMsg.Session_.Timeout > 0 {
519519
if err := b.insertTimeoutFire(ctx, dbMsg); err != nil {
520520
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
521521
}

Diff for: backends/rapidpro/backend_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,12 @@ func (ts *BackendTestSuite) TestSessionTimeout() {
13521352
"urn": "telegram:3527065",
13531353
"created_on": "2017-07-21T19:22:23.242757Z",
13541354
"high_priority": true,
1355+
"session": {
1356+
"uuid": "79c1dbc6-4200-4333-b17a-1f996273a4cb",
1357+
"status": "W",
1358+
"sprint_uuid": "0897c392-8b08-43c4-b9d9-e75d332a2c58",
1359+
"timeout": 3600
1360+
},
13551361
"session_id": 12345,
13561362
"session_timeout": 3600,
13571363
"session_modified_on": "2025-01-28T20:43:34.157379218Z"
@@ -1363,12 +1369,14 @@ func (ts *BackendTestSuite) TestSessionTimeout() {
13631369
err := ts.b.insertTimeoutFire(ctx, msg)
13641370
ts.NoError(err)
13651371

1366-
assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`).
1372+
assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, session_uuid::text, sprint_uuid::text, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`).
13671373
Columns(map[string]any{
13681374
"org_id": int64(1),
13691375
"contact_id": int64(100),
13701376
"fire_type": "T",
13711377
"scope": "",
1378+
"session_uuid": "79c1dbc6-4200-4333-b17a-1f996273a4cb",
1379+
"sprint_uuid": "0897c392-8b08-43c4-b9d9-e75d332a2c58",
13721380
"session_id": "12345",
13731381
"session_modified_on": "2025-01-28T20:43:34.157379218Z",
13741382
})

Diff for: backends/rapidpro/msg.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,17 @@ type Msg struct {
8585
UserID_ courier.UserID `json:"user_id"`
8686
Origin_ courier.MsgOrigin `json:"origin"`
8787
ContactLastSeenOn_ *time.Time `json:"contact_last_seen_on"`
88-
89-
// extra fields used to allow courier to update a session's timeout to *after* the message has been sent
90-
SessionID_ SessionID `json:"session_id"`
91-
SessionTimeout_ int `json:"session_timeout"`
92-
SessionStatus_ string `json:"session_status"`
93-
SessionModifiedOn_ *time.Time `json:"session_modified_on"`
88+
Session_ *courier.Session `json:"session"`
9489

9590
ContactName_ string `json:"contact_name"`
9691
URNAuthTokens_ map[string]string `json:"auth_tokens"`
9792
channel *Channel
9893
workerToken queue.WorkerToken
9994
alreadyWritten bool
95+
96+
// deprecated
97+
SessionID_ SessionID `json:"session_id"`
98+
SessionModifiedOn_ *time.Time `json:"session_modified_on"`
10099
}
101100

102101
// newMsg creates a new DBMsg object with the passed in parameters
@@ -162,7 +161,7 @@ func (m *Msg) IsResend() bool { return m.IsResend_ }
162161
func (m *Msg) Flow() *courier.FlowReference { return m.Flow_ }
163162
func (m *Msg) OptIn() *courier.OptInReference { return m.OptIn_ }
164163
func (m *Msg) UserID() courier.UserID { return m.UserID_ }
165-
func (m *Msg) SessionStatus() string { return m.SessionStatus_ }
164+
func (m *Msg) Session() *courier.Session { return m.Session_ }
166165
func (m *Msg) HighPriority() bool { return m.HighPriority_ }
167166

168167
// incoming specific

Diff for: backends/rapidpro/schema.sql

+3-1
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ CREATE TABLE IF NOT EXISTS contacts_contactfire (
6363
contact_id integer references contacts_contact(id) on delete cascade,
6464
fire_type character varying(1) NOT NULL,
6565
scope character varying(128) NOT NULL,
66-
extra jsonb,
6766
fire_on timestamp with time zone NOT NULL,
67+
session_uuid uuid,
68+
sprint_uuid uuid,
69+
extra jsonb,
6870
UNIQUE (contact_id, fire_type, scope)
6971
);
7072

Diff for: backends/rapidpro/timeouts.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ import (
1313
type SessionID int64
1414

1515
const sqlInsertTimeoutFire = `
16-
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, extra, fire_on)
17-
VALUES($1, $2, 'T', '', $3, $4)
16+
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, fire_on, session_uuid, sprint_uuid, extra)
17+
VALUES($1, $2, 'T', '', $3, $4, $5, $6)
1818
ON CONFLICT DO NOTHING`
1919

2020
// insertTimeoutFire inserts a timeout fire for the session associated with the given msg
2121
func (b *backend) insertTimeoutFire(ctx context.Context, m *Msg) error {
2222
extra := map[string]any{"session_id": m.SessionID_, "session_modified_on": m.SessionModifiedOn_}
23-
timeoutOn := dates.Now().Add(time.Duration(m.SessionTimeout_) * time.Second)
23+
timeoutOn := dates.Now().Add(time.Duration(m.Session_.Timeout) * time.Second)
2424

25-
_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, jsonx.MustMarshal(extra), timeoutOn)
25+
_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, timeoutOn, m.Session_.UUID, m.Session_.SprintUUID, jsonx.MustMarshal(extra))
2626
if err != nil {
2727
return fmt.Errorf("error inserting session timeout contact fire for session #%d: %w", m.SessionID_, err)
2828
}

Diff for: handlers/external/handler.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,10 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, res *courier.Sen
298298
"from": channel.Address(),
299299
"from_no_plus": strings.TrimPrefix(channel.Address(), "+"),
300300
"channel": string(channel.UUID()),
301-
"session_status": msg.SessionStatus(),
301+
"session_status": "",
302+
}
303+
if msg.Session() != nil {
304+
form["session_status"] = msg.Session().Status
302305
}
303306

304307
useNationalStr := channel.ConfigForKey(courier.ConfigUseNational, false)

Diff for: handlers/firebase/handler.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ func (h *handler) sendWithAPIKey(msg courier.MsgOut, res *courier.SendResult, cl
199199
payload.Data.Title = title
200200
payload.Data.Message = part
201201
payload.Data.MessageID = int64(msg.ID())
202-
payload.Data.SessionStatus = msg.SessionStatus()
202+
if msg.Session() != nil {
203+
payload.Data.SessionStatus = msg.Session().Status
204+
}
203205

204206
// include any quick replies on the last piece we send
205207
if i == len(msgParts)-1 {
@@ -281,7 +283,9 @@ func (h *handler) sendWithCredsJSON(msg courier.MsgOut, res *courier.SendResult,
281283
payload.Message.Data.Title = title
282284
payload.Message.Data.Message = part
283285
payload.Message.Data.MessageID = msg.ID().String()
284-
payload.Message.Data.SessionStatus = msg.SessionStatus()
286+
if msg.Session() != nil {
287+
payload.Message.Data.SessionStatus = msg.Session().Status
288+
}
285289

286290
if i == len(msgParts)-1 {
287291
if msg.QuickReplies() != nil {

Diff for: msg.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ type Templating struct {
7474
ExternalID string `json:"external_id"`
7575
}
7676

77+
type Session struct {
78+
UUID string `json:"uuid"`
79+
Status string `json:"status"`
80+
SprintUUID string `json:"sprint_uuid"`
81+
Timeout int `json:"timeout"`
82+
}
83+
7784
//-----------------------------------------------------------------------------
7885
// Msg interface
7986
//-----------------------------------------------------------------------------
@@ -110,8 +117,8 @@ type MsgOut interface {
110117
Flow() *FlowReference
111118
OptIn() *OptInReference
112119
UserID() UserID
113-
SessionStatus() string
114120
HighPriority() bool
121+
Session() *Session
115122
}
116123

117124
// MsgIn is our interface to represent an incoming

Diff for: test/msg.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type MockMsg struct {
3131
metadata json.RawMessage
3232
alreadyWritten bool
3333
isResend bool
34+
session *courier.Session
3435

3536
flow *courier.FlowReference
3637
optIn *courier.OptInReference
@@ -75,7 +76,7 @@ func (m *MockMsg) IsResend() bool { return m.isResend }
7576
func (m *MockMsg) Flow() *courier.FlowReference { return m.flow }
7677
func (m *MockMsg) OptIn() *courier.OptInReference { return m.optIn }
7778
func (m *MockMsg) UserID() courier.UserID { return m.userID }
78-
func (m *MockMsg) SessionStatus() string { return "" }
79+
func (m *MockMsg) Session() *courier.Session { return m.session }
7980
func (m *MockMsg) HighPriority() bool { return m.highPriority }
8081

8182
// incoming specific

0 commit comments

Comments
 (0)