diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index bcf89c00a..0c26f1e54 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -516,7 +516,7 @@ func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status // if message was successfully sent, and we have a session timeout, update it wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead if wasSuccess && dbMsg.SessionTimeout_ != 0 { - if err := updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionModifiedOn_, dbMsg.SessionTimeout_); err != nil { + if err := b.insertTimeoutFire(ctx, dbMsg); err != nil { slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_) } } diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index cd7e6262b..77330c883 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -26,6 +26,7 @@ import ( "github.com/nyaruka/courier/queue" "github.com/nyaruka/courier/test" "github.com/nyaruka/courier/utils/clogs" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/dbutil/assertdb" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/i18n" @@ -1337,15 +1338,46 @@ func (ts *BackendTestSuite) TestChannelEvent() { func (ts *BackendTestSuite) TestSessionTimeout() { ctx := context.Background() - // parse from an iso date - t, err := time.Parse("2006-01-02 15:04:05.000000-07", "2018-12-04 11:52:20.900234-08") + dates.SetNowFunc(dates.NewSequentialNow(time.Date(2025, 1, 28, 20, 43, 34, 157379218, time.UTC), time.Second)) + defer dates.SetNowFunc(time.Now) + + msgJSON := `{ + "uuid": "54c893b9-b026-44fc-a490-50aed0361c3f", + "id": 204, + "org_id": 1, + "text": "Test message 21", + "contact_id": 100, + "contact_urn_id": 14, + "channel_uuid": "f3ad3eb6-d00d-4dc3-92e9-9f34f32940ba", + "urn": "telegram:3527065", + "created_on": "2017-07-21T19:22:23.242757Z", + "high_priority": true, + "session_id": 12345, + "session_timeout": 3600, + "session_modified_on": "2025-01-28T20:43:34.157379218Z" + }` + + msg := &Msg{} + jsonx.MustUnmarshal([]byte(msgJSON), msg) + + err := ts.b.insertTimeoutFire(ctx, msg) ts.NoError(err) - err = updateSessionTimeout(ctx, ts.b, SessionID(1), t, 300) + assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`). + Columns(map[string]any{ + "org_id": int64(1), + "contact_id": int64(100), + "fire_type": "T", + "scope": "", + "session_id": "12345", + "session_modified_on": "2025-01-28T20:43:34.157379218Z", + }) + + // if there's a conflict (e.g. in this case trying to add same timeout again), it should be ignored + err = ts.b.insertTimeoutFire(ctx, msg) ts.NoError(err) - // make sure that took - assertdb.Query(ts.T(), ts.b.db, `SELECT count(*) from flows_flowsession WHERE timeout_on > NOW()`).Returns(1) + assertdb.Query(ts.T(), ts.b.db, `SELECT count(*) FROM contacts_contactfire`).Returns(1) } func (ts *BackendTestSuite) TestMailroomEvents() { diff --git a/backends/rapidpro/schema.sql b/backends/rapidpro/schema.sql index b7885ae7e..1e0e99d62 100644 --- a/backends/rapidpro/schema.sql +++ b/backends/rapidpro/schema.sql @@ -56,6 +56,18 @@ CREATE TABLE contacts_contacturn ( UNIQUE (org_id, identity) ); +DROP TABLE IF EXISTS contacts_contactfire CASCADE; +CREATE TABLE IF NOT EXISTS contacts_contactfire ( + id serial primary key, + org_id integer NOT NULL, + contact_id integer references contacts_contact(id) on delete cascade, + fire_type character varying(1) NOT NULL, + scope character varying(128) NOT NULL, + extra jsonb, + fire_on timestamp with time zone NOT NULL, + UNIQUE (contact_id, fire_type, scope) +); + DROP TABLE IF EXISTS msgs_optin CASCADE; CREATE TABLE msgs_optin ( id serial primary key, @@ -128,15 +140,6 @@ CREATE TABLE channels_channelevent ( log_uuids uuid[] ); -DROP TABLE IF EXISTS flows_flowsession CASCADE; -CREATE TABLE flows_flowsession ( - id serial primary key, - status character varying(1) NOT NULL, - modified_on timestamp with time zone NOT NULL, - timeout_on timestamp with time zone NULL, - wait_started_on timestamp with time zone -); - DROP TABLE IF EXISTS msgs_media CASCADE; CREATE TABLE IF NOT EXISTS msgs_media ( id serial primary key, diff --git a/backends/rapidpro/sessions.go b/backends/rapidpro/sessions.go deleted file mode 100644 index 92a7ae1ae..000000000 --- a/backends/rapidpro/sessions.go +++ /dev/null @@ -1,26 +0,0 @@ -package rapidpro - -import ( - "context" - "time" -) - -// SessionID is our type for RapidPro session ids -type SessionID int64 - -const updateSessionTimeoutSQL = ` - UPDATE - flows_flowsession - SET - timeout_on = NOW() + $3 * interval '1 second' - WHERE - id = $1 AND - extract(epoch from modified_on) = extract(epoch from $2::timestamptz) AND - status = 'W' -` - -// updateSessionTimeout updates the timeout_on value on a db session if our session's wait on hasn't changed -func updateSessionTimeout(ctx context.Context, b *backend, sessionID SessionID, sessionModifiedOn time.Time, timeoutSeconds int) error { - _, err := b.db.ExecContext(ctx, updateSessionTimeoutSQL, sessionID, sessionModifiedOn.In(time.UTC), timeoutSeconds) - return err -} diff --git a/backends/rapidpro/testdata.sql b/backends/rapidpro/testdata.sql index 3c9e3b44b..71f57f0d3 100644 --- a/backends/rapidpro/testdata.sql +++ b/backends/rapidpro/testdata.sql @@ -66,8 +66,3 @@ INSERT INTO msgs_media("id", "uuid", "org_id", "content_type", "url", "path", "s INSERT INTO msgs_media("id", "uuid", "org_id", "content_type", "url", "path", "size", "duration", "width", "height", "original_id") VALUES(102, '514c552c-e585-40e2-938a-fe9450172da8', 1, 'audio/mp4', 'http://nyaruka.s3.com/orgs/1/media/514c/514c552c-e585-40e2-938a-fe9450172da8/test.m4a', '/orgs/1/media/514c/514c552c-e585-40e2-938a-fe9450172da8/test.m4a', 114, 500, 0, 0, 101); -/** Simple session */ -DELETE from flows_flowsession; -INSERT INTO flows_flowsession("id", "status", "modified_on", "wait_started_on") - VALUES(1, 'W', '2018-12-04 11:52:20.900234-08', '2018-12-04 11:52:20.900123-08'), - (2, 'C', '2018-12-04 11:52:20.900456-08', '2018-12-04 11:52:20.900345-08'); diff --git a/backends/rapidpro/timeouts.go b/backends/rapidpro/timeouts.go new file mode 100644 index 000000000..f3fbb25b2 --- /dev/null +++ b/backends/rapidpro/timeouts.go @@ -0,0 +1,30 @@ +package rapidpro + +import ( + "context" + "fmt" + "time" + + "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/jsonx" +) + +// SessionID is our type for RapidPro session ids +type SessionID int64 + +const sqlInsertTimeoutFire = ` +INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, extra, fire_on) + VALUES($1, $2, 'T', '', $3, $4) +ON CONFLICT DO NOTHING` + +// insertTimeoutFire inserts a timeout fire for the session associated with the given msg +func (b *backend) insertTimeoutFire(ctx context.Context, m *Msg) error { + extra := map[string]any{"session_id": m.SessionID_, "session_modified_on": m.SessionModifiedOn_} + timeoutOn := dates.Now().Add(time.Duration(m.SessionTimeout_) * time.Second) + + _, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, jsonx.MustMarshal(extra), timeoutOn) + if err != nil { + return fmt.Errorf("error inserting session timeout contact fire for session #%d: %w", m.SessionID_, err) + } + return nil +}