Skip to content

Commit 7a4de88

Browse files
committed
Insert contact fire for session timeout
1 parent 34ecc58 commit 7a4de88

File tree

6 files changed

+74
-47
lines changed

6 files changed

+74
-47
lines changed

Diff for: backends/rapidpro/backend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status
516516
// if message was successfully sent, and we have a session timeout, update it
517517
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
518518
if wasSuccess && dbMsg.SessionTimeout_ != 0 {
519-
if err := updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionModifiedOn_, dbMsg.SessionTimeout_); err != nil {
519+
if err := b.insertTimeoutFire(ctx, dbMsg); err != nil {
520520
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
521521
}
522522
}

Diff for: backends/rapidpro/backend_test.go

+32-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/nyaruka/courier/queue"
2727
"github.com/nyaruka/courier/test"
2828
"github.com/nyaruka/courier/utils/clogs"
29+
"github.com/nyaruka/gocommon/dates"
2930
"github.com/nyaruka/gocommon/dbutil/assertdb"
3031
"github.com/nyaruka/gocommon/httpx"
3132
"github.com/nyaruka/gocommon/i18n"
@@ -1337,15 +1338,40 @@ func (ts *BackendTestSuite) TestChannelEvent() {
13371338
func (ts *BackendTestSuite) TestSessionTimeout() {
13381339
ctx := context.Background()
13391340

1340-
// parse from an iso date
1341-
t, err := time.Parse("2006-01-02 15:04:05.000000-07", "2018-12-04 11:52:20.900234-08")
1342-
ts.NoError(err)
1341+
dates.SetNowFunc(dates.NewSequentialNow(time.Date(2025, 1, 28, 20, 43, 34, 157379218, time.UTC), time.Second))
1342+
defer dates.SetNowFunc(time.Now)
1343+
1344+
msgJSON := `{
1345+
"uuid": "54c893b9-b026-44fc-a490-50aed0361c3f",
1346+
"id": 204,
1347+
"org_id": 1,
1348+
"text": "Test message 21",
1349+
"contact_id": 100,
1350+
"contact_urn_id": 14,
1351+
"channel_uuid": "f3ad3eb6-d00d-4dc3-92e9-9f34f32940ba",
1352+
"urn": "telegram:3527065",
1353+
"created_on": "2017-07-21T19:22:23.242757Z",
1354+
"high_priority": true,
1355+
"session_id": 12345,
1356+
"session_timeout": 3600,
1357+
"session_modified_on": "2025-01-28T20:43:34.157379218Z"
1358+
}`
1359+
1360+
msg := &Msg{}
1361+
jsonx.MustUnmarshal([]byte(msgJSON), msg)
13431362

1344-
err = updateSessionTimeout(ctx, ts.b, SessionID(1), t, 300)
1363+
err := ts.b.insertTimeoutFire(ctx, msg)
13451364
ts.NoError(err)
13461365

1347-
// make sure that took
1348-
assertdb.Query(ts.T(), ts.b.db, `SELECT count(*) from flows_flowsession WHERE timeout_on > NOW()`).Returns(1)
1366+
assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`).
1367+
Columns(map[string]any{
1368+
"org_id": int64(1),
1369+
"contact_id": int64(100),
1370+
"fire_type": "T",
1371+
"scope": "",
1372+
"session_id": "12345",
1373+
"session_modified_on": "2025-01-28T20:43:34.157379218Z",
1374+
})
13491375
}
13501376

13511377
func (ts *BackendTestSuite) TestMailroomEvents() {

Diff for: backends/rapidpro/schema.sql

+11-9
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ CREATE TABLE contacts_contacturn (
5656
UNIQUE (org_id, identity)
5757
);
5858

59+
DROP TABLE IF EXISTS contacts_contactfire CASCADE;
60+
CREATE TABLE IF NOT EXISTS contacts_contactfire (
61+
id serial primary key,
62+
org_id integer NOT NULL,
63+
contact_id integer references contacts_contact(id) on delete cascade,
64+
fire_type character varying(1) NOT NULL,
65+
scope character varying(128) NOT NULL,
66+
extra jsonb,
67+
fire_on timestamp with time zone NOT NULL
68+
);
69+
5970
DROP TABLE IF EXISTS msgs_optin CASCADE;
6071
CREATE TABLE msgs_optin (
6172
id serial primary key,
@@ -128,15 +139,6 @@ CREATE TABLE channels_channelevent (
128139
log_uuids uuid[]
129140
);
130141

131-
DROP TABLE IF EXISTS flows_flowsession CASCADE;
132-
CREATE TABLE flows_flowsession (
133-
id serial primary key,
134-
status character varying(1) NOT NULL,
135-
modified_on timestamp with time zone NOT NULL,
136-
timeout_on timestamp with time zone NULL,
137-
wait_started_on timestamp with time zone
138-
);
139-
140142
DROP TABLE IF EXISTS msgs_media CASCADE;
141143
CREATE TABLE IF NOT EXISTS msgs_media (
142144
id serial primary key,

Diff for: backends/rapidpro/sessions.go

-26
This file was deleted.

Diff for: backends/rapidpro/testdata.sql

-5
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,3 @@ INSERT INTO msgs_media("id", "uuid", "org_id", "content_type", "url", "path", "s
6666
INSERT INTO msgs_media("id", "uuid", "org_id", "content_type", "url", "path", "size", "duration", "width", "height", "original_id")
6767
VALUES(102, '514c552c-e585-40e2-938a-fe9450172da8', 1, 'audio/mp4', 'http://nyaruka.s3.com/orgs/1/media/514c/514c552c-e585-40e2-938a-fe9450172da8/test.m4a', '/orgs/1/media/514c/514c552c-e585-40e2-938a-fe9450172da8/test.m4a', 114, 500, 0, 0, 101);
6868

69-
/** Simple session */
70-
DELETE from flows_flowsession;
71-
INSERT INTO flows_flowsession("id", "status", "modified_on", "wait_started_on")
72-
VALUES(1, 'W', '2018-12-04 11:52:20.900234-08', '2018-12-04 11:52:20.900123-08'),
73-
(2, 'C', '2018-12-04 11:52:20.900456-08', '2018-12-04 11:52:20.900345-08');

Diff for: backends/rapidpro/timeouts.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package rapidpro
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/nyaruka/gocommon/dates"
9+
"github.com/nyaruka/gocommon/jsonx"
10+
)
11+
12+
// SessionID is our type for RapidPro session ids
13+
type SessionID int64
14+
15+
const sqlInsertTimeoutFire = `
16+
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, extra, fire_on)
17+
VALUES($1, $2, 'T', '', $3, $4)
18+
ON CONFLICT DO NOTHING`
19+
20+
// insertTimeoutFire inserts a timeout fire for the session associated with the given msg
21+
func (b *backend) insertTimeoutFire(ctx context.Context, m *Msg) error {
22+
extra := map[string]any{"session_id": m.SessionID_, "session_modified_on": m.SessionModifiedOn_}
23+
timeoutOn := dates.Now().Add(time.Duration(m.SessionTimeout_) * time.Second)
24+
25+
_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, jsonx.MustMarshal(extra), timeoutOn)
26+
if err != nil {
27+
return fmt.Errorf("error inserting session timeout contact fire for session #%d: %w", m.SessionID_, err)
28+
}
29+
return nil
30+
}

0 commit comments

Comments
 (0)