diff --git a/README.md b/README.md index 9326757..b92a973 100644 --- a/README.md +++ b/README.md @@ -52,13 +52,13 @@ Creates a new incoming message from the client: } ``` -### `ack_msg` +### `ack_chat` -Acknowledges receipt an outgoing message to the client: +Acknowledges receipt an outgoing chat message to the client: ```json { - "type": "ack_msg", + "type": "ack_chat", "msg_id": 46363452 } ``` @@ -96,7 +96,6 @@ A chat session for a new contact has been successfully started: ```json { "type": "chat_started", - "time": "2024-05-01T17:15:30.123456Z", "chat_id": "65vbbDAQCdPdEWlEhDGy4utO" } ``` @@ -108,34 +107,37 @@ A chat session for an existing contact has been successfully resumed: ```json { "type": "chat_resumed", - "time": "2024-05-01T17:15:30.123456Z", "chat_id": "65vbbDAQCdPdEWlEhDGy4utO", "email": "bob@nyaruka.com" } ``` -### `msg_out` +### `chat_out` -A new outgoing message has been created and should be displayed in the client: +A new outgoing chat event has been created and should be displayed. Thus far `msg_out` is the only type sent. ```json { - "type": "msg_out", - "time": "2024-05-01T17:15:30.123456Z", - "msg_id": 34634, - "text": "Thanks for contacting us!", - "origin": "flow" + "type": "chat_out", + "msg_out": { + "id": 34634, + "text": "Thanks for contacting us!", + "origin": "flow", + "time": "2024-05-01T17:15:30.123456Z" + } } ``` ```json { - "type": "msg_out", - "time": "2024-05-01T17:15:30.123456Z", - "msg_id": 34634, - "text": "How can we help?", - "origin": "chat", - "user": {"name": "Bob McTickets", "email": "bob@nyaruka.com"} + "type": "chat_out", + "msg_out": { + "id": 34634, + "text": "Thanks for contacting us!", + "origin": "chat", + "user": {"id": 234, "name": "Bob McTickets", "email": "bob@nyaruka.com", "avatar": "https://example.com/bob.jpg"}, + "time": "2024-05-01T17:15:30.123456Z" + } } ``` @@ -146,20 +148,22 @@ The client previously requested history with a `get_history` command: ```json { "type": "history", - "time": "2024-05-01T17:15:30.123456Z", "history": [ - { - "type": "msg_in", - "time": "2024-04-01T13:15:30.123456Z", - "msg_id": 34632, - "text": "I need help!" + { + "msg_in": { + "id": 34632, + "text": "I need help!", + "time": "2024-04-01T13:15:30.123456Z" + } }, { - "type": "msg_out", - "time": "2024-04-01T13:15:30.123456Z", - "msg_id": 34634, - "text": "Thanks for contacting us!", - "origin": "flow" + "msg_out": { + "id": 34634, + "text": "Thanks for contacting us!", + "origin": "chat", + "user": {"id": 234, "name": "Bob McTickets", "email": "bob@nyaruka.com", "avatar": "https://example.com/bob.jpg"}, + "time": "2024-04-01T13:15:30.123456Z" + } } ] } diff --git a/core/models/msg.go b/core/models/msg.go index 3e2ca93..17caec8 100644 --- a/core/models/msg.go +++ b/core/models/msg.go @@ -22,33 +22,34 @@ const ( MsgOriginTicket MsgOrigin = "ticket" MsgOriginChat MsgOrigin = "chat" - MsgStatusSent MsgStatus = "sent" - DirectionIn MsgDirection = "I" DirectionOut MsgDirection = "O" ) -type MsgOut struct { - ID MsgID `json:"id"` - ChannelUUID ChannelUUID `json:"channel_uuid"` - ChatID ChatID `json:"chat_id"` - Text string `json:"text"` - Attachments []string `json:"attachments,omitempty"` - Origin MsgOrigin `json:"origin"` - UserID UserID `json:"user_id,omitempty"` - Time time.Time `json:"time"` +type MsgIn struct { + ID MsgID `json:"id"` + Text string `json:"text"` + Time time.Time `json:"time"` } -func NewMsgOut(id MsgID, ch *Channel, chatID ChatID, text string, attachments []string, origin MsgOrigin, u *User, t time.Time) *MsgOut { - var userID UserID - if u != nil { - userID = u.ID - } +func NewMsgIn(id MsgID, text string, t time.Time) *MsgIn { + return &MsgIn{ID: id, Text: text, Time: t} +} - return &MsgOut{ID: id, ChannelUUID: ch.UUID, ChatID: chatID, Text: text, Origin: origin, UserID: userID, Time: t} +type MsgOut struct { + ID MsgID `json:"id"` + Text string `json:"text"` + Attachments []string `json:"attachments,omitempty"` + Origin MsgOrigin `json:"origin"` + User *User `json:"user,omitempty"` + Time time.Time `json:"time"` } -type Msg struct { +func NewMsgOut(id MsgID, text string, attachments []string, origin MsgOrigin, user *User, t time.Time) *MsgOut { + return &MsgOut{ID: id, Text: text, Attachments: attachments, Origin: origin, User: user, Time: t} +} + +type DBMsg struct { ID MsgID `json:"id"` Text string `json:"text"` Attachments []string `json:"attachments"` @@ -60,7 +61,32 @@ type Msg struct { CreatedOn time.Time `json:"created_on"` } -func (m *Msg) Origin() MsgOrigin { +func (m *DBMsg) ToMsgIn() *MsgIn { + if m.Direction != DirectionIn { + panic("can only be called on an inbound message") + } + + return NewMsgIn(m.ID, m.Text, m.CreatedOn) +} + +func (m *DBMsg) ToMsgOut(ctx context.Context, store Store) (*MsgOut, error) { + if m.Direction != DirectionOut { + panic("can only be called on an outbound message") + } + + var user *User + var err error + if m.CreatedByID != NilUserID { + user, err = store.GetUser(ctx, m.CreatedByID) + if err != nil { + return nil, fmt.Errorf("error fetching user: %w", err) + } + } + + return NewMsgOut(m.ID, m.Text, m.Attachments, m.origin(), user, m.CreatedOn), nil +} + +func (m *DBMsg) origin() MsgOrigin { if m.FlowID != NilFlowID { return MsgOriginFlow } else if m.BroadcastID != NilBroadcastID { @@ -80,17 +106,17 @@ SELECT row_to_json(r) FROM ( LIMIT $3 ) r` -func LoadContactMessages(ctx context.Context, rt *runtime.Runtime, contactID ContactID, before time.Time, limit int) ([]*Msg, error) { +func LoadContactMessages(ctx context.Context, rt *runtime.Runtime, contactID ContactID, before time.Time, limit int) ([]*DBMsg, error) { rows, err := rt.DB.QueryContext(ctx, sqlSelectContactMessages, contactID, before, limit) if err != nil { return nil, fmt.Errorf("error querying contact messages: %w", err) } defer rows.Close() - msgs := make([]*Msg, 0) + msgs := make([]*DBMsg, 0) for rows.Next() { - msg := &Msg{} + msg := &DBMsg{} if err := dbutil.ScanJSON(rows, msg); err != nil { return nil, fmt.Errorf("error scanning msg row: %w", err) } diff --git a/core/models/msg_test.go b/core/models/msg_test.go index fcf261c..8c80d64 100644 --- a/core/models/msg_test.go +++ b/core/models/msg_test.go @@ -7,6 +7,7 @@ import ( "github.com/nyaruka/chip/core/models" "github.com/nyaruka/chip/testsuite" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestLoadContactMessages(t *testing.T) { @@ -67,3 +68,36 @@ func TestLoadContactMessages(t *testing.T) { assert.NoError(t, err) assert.Len(t, msgs, 0) } + +func TestDMMsgToMsgInAndOut(t *testing.T) { + ctx, rt := testsuite.Runtime() + + defer testsuite.ResetDB() + + orgID := testsuite.InsertOrg(rt, "Nyaruka") + chanID := testsuite.InsertChannel(rt, "8291264a-4581-4d12-96e5-e9fcfa6e68d9", orgID, "CHP", "WebChat", "123", []string{"webchat"}) + bobID := testsuite.InsertContact(rt, orgID, "Bob") + bobURNID := testsuite.InsertURN(rt, orgID, bobID, "webchat:65vbbDAQCdPdEWlEhDGy4utO") + + msg1ID := testsuite.InsertIncomingMsg(rt, orgID, chanID, bobID, bobURNID, "Hello", time.Now()) + msg2ID := testsuite.InsertOutgoingMsg(rt, orgID, chanID, bobID, bobURNID, "There", time.Now()) + msgs, err := models.LoadContactMessages(ctx, rt, bobID, time.Now(), 10) + require.NoError(t, err) + msg1 := msgs[1] + msg2 := msgs[0] + + store := models.NewStore(rt) + store.Start() + defer store.Stop() + + msg1In := msg1.ToMsgIn() + assert.Equal(t, models.NewMsgIn(msg1ID, "Hello", msg1.CreatedOn), msg1In) + + msg2Out, err := msg2.ToMsgOut(ctx, store) + assert.NoError(t, err) + assert.Equal(t, models.NewMsgOut(msg2ID, "There", nil, "chat", nil, msg2.CreatedOn), msg2Out) + + // can't call ToMsgIn on an outbound message and vice versa + assert.Panics(t, func() { msg2.ToMsgIn() }) + assert.Panics(t, func() { msg1.ToMsgOut(ctx, store) }) +} diff --git a/core/models/user.go b/core/models/user.go index a50afd1..eb5c3e9 100644 --- a/core/models/user.go +++ b/core/models/user.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "strings" "github.com/nyaruka/chip/runtime" "github.com/nyaruka/gocommon/dbutil" @@ -16,25 +15,15 @@ type UserID null.Int const NilUserID = UserID(0) type User struct { - ID UserID `json:"id"` - Email string `json:"email"` - FirstName string `json:"first_name"` - LastName string `json:"last_name"` - Avatar string `json:"avatar"` -} - -func (u *User) Name() string { return strings.TrimSpace(u.FirstName + " " + u.LastName) } - -func (u *User) AvatarURL(cfg *runtime.Config) string { - if u.Avatar != "" { - return cfg.StorageURL + u.Avatar - } - return "" + ID UserID `json:"id"` + Email string `json:"email"` + Name string `json:"name"` + Avatar string `json:"avatar,omitempty"` } const sqlSelectUser = ` SELECT row_to_json(r) FROM ( - SELECT u.id, u.email, u.first_name, u.last_name, s.avatar + SELECT u.id, u.email, TRIM(CONCAT(u.first_name, ' ', u.last_name)) AS name, s.avatar FROM auth_user u INNER JOIN orgs_usersettings s ON s.user_id = u.id WHERE u.id = $1 AND u.is_active @@ -54,5 +43,10 @@ func LoadUser(ctx context.Context, rt *runtime.Runtime, id UserID) (*User, error if err := dbutil.ScanJSON(rows, u); err != nil { return nil, fmt.Errorf("error scanning user: %w", err) } + + if u.Avatar != "" { + u.Avatar = rt.Config.StorageURL + u.Avatar + } + return u, nil } diff --git a/core/models/user_test.go b/core/models/user_test.go index 920d230..84b331a 100644 --- a/core/models/user_test.go +++ b/core/models/user_test.go @@ -22,7 +22,6 @@ func TestLoadUser(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bobID, u.ID) assert.Equal(t, "bob@nyaruka.com", u.Email) - assert.Equal(t, "Bob McFlows", u.Name()) - assert.Equal(t, "avatars/1234/1234567890.webp", u.Avatar) - assert.Equal(t, "http://localhost/media/avatars/1234/1234567890.webp", u.AvatarURL(rt.Config)) + assert.Equal(t, "Bob McFlows", u.Name) + assert.Equal(t, "http://localhost/media/avatars/1234/1234567890.webp", u.Avatar) } diff --git a/core/queue/lua/outbox_read_ready.lua b/core/queue/lua/outbox_read_ready.lua index d4b2530..0b37fce 100644 --- a/core/queue/lua/outbox_read_ready.lua +++ b/core/queue/lua/outbox_read_ready.lua @@ -1,15 +1,16 @@ -local queuesKey, readyKey, queuePrefix = KEYS[1], KEYS[2], ARGV[1] +local queuesKey, readyKey, keyBase = KEYS[1], KEYS[2], ARGV[1] -local chatIDs = redis.call("ZINTER", 2, queuesKey, readyKey) +local queueIDs = redis.call("ZINTER", 2, queuesKey, readyKey) -local msgs = {} +local result = {} -- pairs of queue IDs and items -for i, chatID in ipairs(chatIDs) do - local chatMsg = redis.call("LINDEX", queuePrefix .. chatID, 0) +for i, queueID in ipairs(queueIDs) do + local item = redis.call("LINDEX", keyBase .. ":queue:" .. queueID, 0) - msgs[i] = chatMsg + table.insert(result, queueID) + table.insert(result, item) - redis.call("SREM", readyKey, chatID) + redis.call("SREM", readyKey, queueID) end -return msgs \ No newline at end of file +return result \ No newline at end of file diff --git a/core/queue/lua/outbox_record_sent.lua b/core/queue/lua/outbox_record_sent.lua index 9ccd75d..5d745c0 100644 --- a/core/queue/lua/outbox_record_sent.lua +++ b/core/queue/lua/outbox_record_sent.lua @@ -1,4 +1,4 @@ -local queuesKey, queueKey, readyKey, chatID, msgID = KEYS[1], KEYS[2], KEYS[3], ARGV[1], tonumber(ARGV[2]) +local queuesKey, queueKey, readyKey, queueID, msgID = KEYS[1], KEYS[2], KEYS[3], ARGV[1], tonumber(ARGV[2]) local thisItem = redis.call("LINDEX", queueKey, 0) if thisItem == false then @@ -20,15 +20,15 @@ local hasMore = false if nextItem == false then -- nothing more in the queue for this chat so take it out of the master set - redis.call("ZREM", queuesKey, chatID) + redis.call("ZREM", queuesKey, queueID) else -- update the score of this queue to the timestamp of its new oldest message local msg = cjson.decode(nextItem) - redis.call("ZADD", queuesKey, msg["_ts"], chatID) + redis.call("ZADD", queuesKey, msg["_ts"], queueID) hasMore = true end -- put this chat back in the ready set -redis.call("SADD", readyKey, chatID) +redis.call("SADD", readyKey, queueID) return {"success", tostring(hasMore)} \ No newline at end of file diff --git a/core/queue/outbox.go b/core/queue/outbox.go index 45cff4f..6b11e62 100644 --- a/core/queue/outbox.go +++ b/core/queue/outbox.go @@ -3,6 +3,7 @@ package queue import ( _ "embed" "fmt" + "strings" "github.com/gomodule/redigo/redis" "github.com/nyaruka/chip/core/models" @@ -17,45 +18,62 @@ var outboxReadReadyScript = redis.NewScript(2, outboxReadReady) var outboxRecordSent string var outboxRecordSentScript = redis.NewScript(3, outboxRecordSent) +type Queue struct { + ChannelUUID models.ChannelUUID + ChatID models.ChatID +} + +func (q Queue) String() string { + return fmt.Sprintf("%s@%s", q.ChatID, q.ChannelUUID) +} + type Outbox struct { KeyBase string InstanceID string } -func (o *Outbox) SetReady(rc redis.Conn, chatID models.ChatID, ready bool) error { +func (o *Outbox) SetReady(rc redis.Conn, ch *models.Channel, chatID models.ChatID, ready bool) error { + queue := Queue{ch.UUID, chatID} + var err error if ready { - _, err = rc.Do("SADD", o.readyKey(), chatID) + _, err = rc.Do("SADD", o.readyKey(), queue.String()) } else { - _, err = rc.Do("SREM", o.readyKey(), chatID) + _, err = rc.Do("SREM", o.readyKey(), queue.String()) } return err } -func (o *Outbox) AddMessage(rc redis.Conn, m *models.MsgOut) error { +func (o *Outbox) AddMessage(rc redis.Conn, ch *models.Channel, chatID models.ChatID, m *models.MsgOut) error { + queue := Queue{ch.UUID, chatID} + rc.Send("MULTI") - rc.Send("RPUSH", o.queueKey(m.ChatID), o.encodeMsg(m)) - rc.Send("ZADD", o.queuesKey(), "NX", m.Time.UnixMilli(), m.ChatID) // update only if we're first message in queue + rc.Send("RPUSH", o.queueKey(queue), encodeMsg(m)) + rc.Send("ZADD", o.queuesKey(), "NX", m.Time.UnixMilli(), queue.String()) // update only if we're first message in queue _, err := rc.Do("EXEC") return err } -func (o *Outbox) ReadReady(rc redis.Conn) ([]*models.MsgOut, error) { - items, err := redis.ByteSlices(outboxReadReadyScript.Do(rc, o.queuesKey(), o.readyKey(), o.queueKey(""))) +func (o *Outbox) ReadReady(rc redis.Conn) (map[Queue]*models.MsgOut, error) { + pairs, err := redis.ByteSlices(outboxReadReadyScript.Do(rc, o.queuesKey(), o.readyKey(), o.KeyBase)) if err != nil && err != redis.ErrNil { return nil, err } - msgs := make([]*models.MsgOut, len(items)) - for i := range items { - msgs[i] = o.decodeMsg(items[i]) + ready := make(map[Queue]*models.MsgOut, len(pairs)/2) + for i := 0; i < len(pairs); i += 2 { + queue := string(pairs[i]) + item := pairs[i+1] + ready[decodeQueue(queue)] = decodeMsg(item) } - return msgs, nil + return ready, nil } -func (o *Outbox) RecordSent(rc redis.Conn, chatID models.ChatID, msgID models.MsgID) (bool, error) { - result, err := redis.Strings(outboxRecordSentScript.Do(rc, o.queuesKey(), o.queueKey(chatID), o.readyKey(), chatID, msgID)) +func (o *Outbox) RecordSent(rc redis.Conn, ch *models.Channel, chatID models.ChatID, msgID models.MsgID) (bool, error) { + queue := Queue{ch.UUID, chatID} + + result, err := redis.Strings(outboxRecordSentScript.Do(rc, o.queuesKey(), o.queueKey(queue), o.readyKey(), queue.String(), msgID)) if err != nil { return false, err } @@ -76,8 +94,8 @@ func (o *Outbox) queuesKey() string { return fmt.Sprintf("%s:queues", o.KeyBase) } -func (o *Outbox) queueKey(chatID models.ChatID) string { - return fmt.Sprintf("%s:queue:%s", o.KeyBase, chatID) +func (o *Outbox) queueKey(queue Queue) string { + return fmt.Sprintf("%s:queue:%s", o.KeyBase, queue) } type item struct { @@ -86,13 +104,18 @@ type item struct { TS int64 `json:"_ts"` } -func (o *Outbox) encodeMsg(m *models.MsgOut) []byte { +func encodeMsg(m *models.MsgOut) []byte { i := &item{MsgOut: m, TS: m.Time.UnixMilli()} return jsonx.MustMarshal(i) } -func (o *Outbox) decodeMsg(b []byte) *models.MsgOut { +func decodeMsg(b []byte) *models.MsgOut { m := &models.MsgOut{} jsonx.MustUnmarshal(b, m) return m } + +func decodeQueue(q string) Queue { + parts := strings.Split(q, "@") + return Queue{models.ChannelUUID(parts[1]), models.ChatID(parts[0])} +} diff --git a/core/queue/outbox_test.go b/core/queue/outbox_test.go index b366fde..a4a7853 100644 --- a/core/queue/outbox_test.go +++ b/core/queue/outbox_test.go @@ -1,7 +1,6 @@ package queue_test import ( - "fmt" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/nyaruka/chip/testsuite" "github.com/nyaruka/redisx/assertredis" "github.com/stretchr/testify/assert" + "golang.org/x/exp/maps" ) func TestOutboxes(t *testing.T) { @@ -29,87 +29,87 @@ func TestOutboxes(t *testing.T) { defer rc.Close() // queue up some messages for 3 chat ids - err := o.AddMessage(rc, models.NewMsgOut(101, ch, "65vbbDAQCdPdEWlEhDGy4utO", "hi", nil, models.MsgOriginChat, bob, time.Date(2024, 1, 30, 12, 55, 0, 0, time.UTC))) + err := o.AddMessage(rc, ch, "65vbbDAQCdPdEWlEhDGy4utO", models.NewMsgOut(101, "hi", nil, models.MsgOriginChat, bob, time.Date(2024, 1, 30, 12, 55, 0, 0, time.UTC))) assert.NoError(t, err) - err = o.AddMessage(rc, models.NewMsgOut(102, ch, "65vbbDAQCdPdEWlEhDGy4utO", "how can I help", nil, models.MsgOriginChat, bob, time.Date(2024, 1, 30, 13, 1, 0, 0, time.UTC))) + err = o.AddMessage(rc, ch, "65vbbDAQCdPdEWlEhDGy4utO", models.NewMsgOut(102, "how can I help", nil, models.MsgOriginChat, bob, time.Date(2024, 1, 30, 13, 1, 0, 0, time.UTC))) assert.NoError(t, err) - err = o.AddMessage(rc, models.NewMsgOut(103, ch, "3xdF7KhyEiabBiCd3Cst3X28", "hola", nil, models.MsgOriginFlow, nil, time.Date(2024, 1, 30, 13, 32, 0, 0, time.UTC))) + err = o.AddMessage(rc, ch, "3xdF7KhyEiabBiCd3Cst3X28", models.NewMsgOut(103, "hola", nil, models.MsgOriginFlow, nil, time.Date(2024, 1, 30, 13, 32, 0, 0, time.UTC))) assert.NoError(t, err) - err = o.AddMessage(rc, models.NewMsgOut(104, ch, "65vbbDAQCdPdEWlEhDGy4utO", "ok", nil, models.MsgOriginChat, bob, time.Date(2024, 1, 30, 13, 5, 0, 0, time.UTC))) + err = o.AddMessage(rc, ch, "65vbbDAQCdPdEWlEhDGy4utO", models.NewMsgOut(104, "ok", nil, models.MsgOriginChat, bob, time.Date(2024, 1, 30, 13, 5, 0, 0, time.UTC))) assert.NoError(t, err) - err = o.AddMessage(rc, models.NewMsgOut(105, ch, "itlu4O6ZE4ZZc07Y5rHxcLoQ", "test", nil, models.MsgOriginFlow, nil, time.Date(2024, 1, 30, 13, 6, 0, 0, time.UTC))) + err = o.AddMessage(rc, ch, "itlu4O6ZE4ZZc07Y5rHxcLoQ", models.NewMsgOut(105, "test", nil, models.MsgOriginFlow, nil, time.Date(2024, 1, 30, 13, 6, 0, 0, time.UTC))) assert.NoError(t, err) - assertredis.LGetAll(t, rc, "chattest:queue:65vbbDAQCdPdEWlEhDGy4utO", []string{ - fmt.Sprintf(`{"id":101,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"65vbbDAQCdPdEWlEhDGy4utO","text":"hi","origin":"chat","user_id":%d,"time":"2024-01-30T12:55:00Z","_ts":1706619300000}`, bob.ID), - fmt.Sprintf(`{"id":102,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"65vbbDAQCdPdEWlEhDGy4utO","text":"how can I help","origin":"chat","user_id":%d,"time":"2024-01-30T13:01:00Z","_ts":1706619660000}`, bob.ID), - fmt.Sprintf(`{"id":104,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"65vbbDAQCdPdEWlEhDGy4utO","text":"ok","origin":"chat","user_id":%d,"time":"2024-01-30T13:05:00Z","_ts":1706619900000}`, bob.ID), + assertredis.LGetAll(t, rc, "chattest:queue:65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9", []string{ + `{"id":101,"text":"hi","origin":"chat","user":{"id":1,"email":"bob@nyaruka.com","name":"Bob McFlows"},"time":"2024-01-30T12:55:00Z","_ts":1706619300000}`, + `{"id":102,"text":"how can I help","origin":"chat","user":{"id":1,"email":"bob@nyaruka.com","name":"Bob McFlows"},"time":"2024-01-30T13:01:00Z","_ts":1706619660000}`, + `{"id":104,"text":"ok","origin":"chat","user":{"id":1,"email":"bob@nyaruka.com","name":"Bob McFlows"},"time":"2024-01-30T13:05:00Z","_ts":1706619900000}`, }) - assertredis.LGetAll(t, rc, "chattest:queue:3xdF7KhyEiabBiCd3Cst3X28", []string{ - `{"id":103,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"3xdF7KhyEiabBiCd3Cst3X28","text":"hola","origin":"flow","time":"2024-01-30T13:32:00Z","_ts":1706621520000}`, + assertredis.LGetAll(t, rc, "chattest:queue:3xdF7KhyEiabBiCd3Cst3X28@8291264a-4581-4d12-96e5-e9fcfa6e68d9", []string{ + `{"id":103,"text":"hola","origin":"flow","time":"2024-01-30T13:32:00Z","_ts":1706621520000}`, }) - assertredis.LGetAll(t, rc, "chattest:queue:itlu4O6ZE4ZZc07Y5rHxcLoQ", []string{ - `{"id":105,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"itlu4O6ZE4ZZc07Y5rHxcLoQ","text":"test","origin":"flow","time":"2024-01-30T13:06:00Z","_ts":1706619960000}`, + assertredis.LGetAll(t, rc, "chattest:queue:itlu4O6ZE4ZZc07Y5rHxcLoQ@8291264a-4581-4d12-96e5-e9fcfa6e68d9", []string{ + `{"id":105,"text":"test","origin":"flow","time":"2024-01-30T13:06:00Z","_ts":1706619960000}`, }) assertredis.ZGetAll(t, rc, "chattest:queues", map[string]float64{ - "65vbbDAQCdPdEWlEhDGy4utO": 1706619300000, - "3xdF7KhyEiabBiCd3Cst3X28": 1706621520000, - "itlu4O6ZE4ZZc07Y5rHxcLoQ": 1706619960000, + "65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9": 1706619300000, + "3xdF7KhyEiabBiCd3Cst3X28@8291264a-4581-4d12-96e5-e9fcfa6e68d9": 1706621520000, + "itlu4O6ZE4ZZc07Y5rHxcLoQ@8291264a-4581-4d12-96e5-e9fcfa6e68d9": 1706619960000, }) // currently no chat ids are marked ready, so reading messages should give us nothing - msgs, err := o.ReadReady(rc) + ready, err := o.ReadReady(rc) assert.NoError(t, err) - assert.Len(t, msgs, 0) + assert.Len(t, ready, 0) // mark 2 chat ids as ready - err = o.SetReady(rc, "65vbbDAQCdPdEWlEhDGy4utO", true) + err = o.SetReady(rc, ch, "65vbbDAQCdPdEWlEhDGy4utO", true) assert.NoError(t, err) - err = o.SetReady(rc, "itlu4O6ZE4ZZc07Y5rHxcLoQ", true) + err = o.SetReady(rc, ch, "itlu4O6ZE4ZZc07Y5rHxcLoQ", true) assert.NoError(t, err) - assertredis.SMembers(t, rc, "chattest:ready:foo1", []string{"65vbbDAQCdPdEWlEhDGy4utO", "itlu4O6ZE4ZZc07Y5rHxcLoQ"}) + assertredis.SMembers(t, rc, "chattest:ready:foo1", []string{"65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9", "itlu4O6ZE4ZZc07Y5rHxcLoQ@8291264a-4581-4d12-96e5-e9fcfa6e68d9"}) // reading should now give us their oldest messages - msgs, err = o.ReadReady(rc) + ready, err = o.ReadReady(rc) assert.NoError(t, err) - assert.Len(t, msgs, 2) - assert.Equal(t, models.MsgID(101), msgs[0].ID) - assert.Equal(t, models.MsgID(105), msgs[1].ID) + assert.ElementsMatch(t, []queue.Queue{{"8291264a-4581-4d12-96e5-e9fcfa6e68d9", "65vbbDAQCdPdEWlEhDGy4utO"}, {"8291264a-4581-4d12-96e5-e9fcfa6e68d9", "itlu4O6ZE4ZZc07Y5rHxcLoQ"}}, maps.Keys(ready)) + //assert.Equal(t, models.MsgID(101), msgs[0].ID) + //assert.Equal(t, models.MsgID(105), msgs[1].ID) // and remove them from the instance's ready set assertredis.SMembers(t, rc, "chattest:ready:foo1", []string{}) // nothing actual removed from any of the queues - assertredis.LLen(t, rc, "chattest:queue:65vbbDAQCdPdEWlEhDGy4utO", 3) - assertredis.LLen(t, rc, "chattest:queue:3xdF7KhyEiabBiCd3Cst3X28", 1) - assertredis.LLen(t, rc, "chattest:queue:itlu4O6ZE4ZZc07Y5rHxcLoQ", 1) + assertredis.LLen(t, rc, "chattest:queue:65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9", 3) + assertredis.LLen(t, rc, "chattest:queue:3xdF7KhyEiabBiCd3Cst3X28@8291264a-4581-4d12-96e5-e9fcfa6e68d9", 1) + assertredis.LLen(t, rc, "chattest:queue:itlu4O6ZE4ZZc07Y5rHxcLoQ@8291264a-4581-4d12-96e5-e9fcfa6e68d9", 1) - hasMore, err := o.RecordSent(rc, "65vbbDAQCdPdEWlEhDGy4utO", 101) + hasMore, err := o.RecordSent(rc, ch, "65vbbDAQCdPdEWlEhDGy4utO", 101) assert.NoError(t, err) assert.True(t, hasMore) // msg should be removed from the queue for that chat, other chat queues should be unchanged - assertredis.LGetAll(t, rc, "chattest:queue:65vbbDAQCdPdEWlEhDGy4utO", []string{ - fmt.Sprintf(`{"id":102,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"65vbbDAQCdPdEWlEhDGy4utO","text":"how can I help","origin":"chat","user_id":%d,"time":"2024-01-30T13:01:00Z","_ts":1706619660000}`, bob.ID), - fmt.Sprintf(`{"id":104,"channel_uuid":"8291264a-4581-4d12-96e5-e9fcfa6e68d9","chat_id":"65vbbDAQCdPdEWlEhDGy4utO","text":"ok","origin":"chat","user_id":%d,"time":"2024-01-30T13:05:00Z","_ts":1706619900000}`, bob.ID), + assertredis.LGetAll(t, rc, "chattest:queue:65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9", []string{ + `{"id":102,"text":"how can I help","origin":"chat","user":{"id":1,"email":"bob@nyaruka.com","name":"Bob McFlows"},"time":"2024-01-30T13:01:00Z","_ts":1706619660000}`, + `{"id":104,"text":"ok","origin":"chat","user":{"id":1,"email":"bob@nyaruka.com","name":"Bob McFlows"},"time":"2024-01-30T13:05:00Z","_ts":1706619900000}`, }) - assertredis.LLen(t, rc, "chattest:queue:3xdF7KhyEiabBiCd3Cst3X28", 1) - assertredis.LLen(t, rc, "chattest:queue:itlu4O6ZE4ZZc07Y5rHxcLoQ", 1) + assertredis.LLen(t, rc, "chattest:queue:3xdF7KhyEiabBiCd3Cst3X28@8291264a-4581-4d12-96e5-e9fcfa6e68d9", 1) + assertredis.LLen(t, rc, "chattest:queue:itlu4O6ZE4ZZc07Y5rHxcLoQ@8291264a-4581-4d12-96e5-e9fcfa6e68d9", 1) assertredis.ZGetAll(t, rc, "chattest:queues", map[string]float64{ - "65vbbDAQCdPdEWlEhDGy4utO": 1706619660000, // updated to new oldest message - "3xdF7KhyEiabBiCd3Cst3X28": 1706621520000, - "itlu4O6ZE4ZZc07Y5rHxcLoQ": 1706619960000, + "65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9": 1706619660000, // updated to new oldest message + "3xdF7KhyEiabBiCd3Cst3X28@8291264a-4581-4d12-96e5-e9fcfa6e68d9": 1706621520000, + "itlu4O6ZE4ZZc07Y5rHxcLoQ@8291264a-4581-4d12-96e5-e9fcfa6e68d9": 1706619960000, }) - // and chat ID should be back in the ready set - assertredis.SMembers(t, rc, "chattest:ready:foo1", []string{"65vbbDAQCdPdEWlEhDGy4utO"}) + // and queue ID should be back in the ready set + assertredis.SMembers(t, rc, "chattest:ready:foo1", []string{"65vbbDAQCdPdEWlEhDGy4utO@8291264a-4581-4d12-96e5-e9fcfa6e68d9"}) // try recording sent for a chat with an empty queue - _, err = o.RecordSent(rc, "A0UGLTWLLs59CrFzj6VpvMlG", 101) + _, err = o.RecordSent(rc, ch, "A0UGLTWLLs59CrFzj6VpvMlG", 101) assert.EqualError(t, err, "no messages in queue for chat A0UGLTWLLs59CrFzj6VpvMlG") // try recording sent with an incorrect message ID - _, err = o.RecordSent(rc, "65vbbDAQCdPdEWlEhDGy4utO", 999) + _, err = o.RecordSent(rc, ch, "65vbbDAQCdPdEWlEhDGy4utO", 999) assert.EqualError(t, err, "expected message id 999 in queue, found 102") } diff --git a/service.go b/service.go index bbfb962..f10abe5 100644 --- a/service.go +++ b/service.go @@ -102,7 +102,7 @@ func (s *Service) StartChat(ctx context.Context, ch *models.Channel, chatID mode } // mark chat as ready to send messages - if err := s.outbox.SetReady(rc, chatID, true); err != nil { + if err := s.outbox.SetReady(rc, ch, chatID, true); err != nil { return nil, false, fmt.Errorf("error setting chat ready: %w", err) } @@ -124,7 +124,7 @@ func (s *Service) ConfirmMsgOut(ctx context.Context, ch *models.Channel, contact // TODO send DLR to courier // mark chat as ready to send again - if err := s.outbox.SetReady(rc, contact.ChatID, true); err != nil { + if err := s.outbox.SetReady(rc, ch, contact.ChatID, true); err != nil { return fmt.Errorf("error setting chat ready: %w", err) } @@ -137,7 +137,7 @@ func (s *Service) CloseChat(ctx context.Context, ch *models.Channel, contact *mo defer rc.Close() // mark chat as no longer ready - if err := s.outbox.SetReady(rc, contact.ChatID, false); err != nil { + if err := s.outbox.SetReady(rc, ch, contact.ChatID, false); err != nil { return fmt.Errorf("error unsetting chat ready: %w", err) } @@ -145,11 +145,11 @@ func (s *Service) CloseChat(ctx context.Context, ch *models.Channel, contact *mo return nil } -func (s *Service) QueueMsgOut(ctx context.Context, ch *models.Channel, msg *models.MsgOut) error { +func (s *Service) QueueMsgOut(ctx context.Context, ch *models.Channel, contact *models.Contact, msg *models.MsgOut) error { rc := s.rt.RP.Get() defer rc.Close() - if err := s.outbox.AddMessage(rc, msg); err != nil { + if err := s.outbox.AddMessage(rc, ch, contact.ChatID, msg); err != nil { return fmt.Errorf("error queuing to outbox: %w", err) } @@ -175,33 +175,19 @@ func (s *Service) sender() { func (s *Service) send() { log := slog.With("comp", "service") - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - rc := s.rt.RP.Get() defer rc.Close() - msgs, err := s.outbox.ReadReady(rc) + ready, err := s.outbox.ReadReady(rc) if err != nil { log.Error("error reading outboxes", "error", err) return } - for _, msg := range msgs { - client := s.server.GetClient(msg.ChatID) + for outbox, msg := range ready { + client := s.server.GetClient(outbox.ChatID) if client != nil { - // TODO find logical place for this so that it can be shared with Client.onCommand - var user *events.User - if msg.UserID != models.NilUserID { - u, err := s.store.GetUser(ctx, msg.UserID) - if err != nil { - log.Error("error fetching user", "error", err) - } else { - user = events.NewUser(u.Name(), u.Email, u.AvatarURL(s.rt.Config)) - } - } - - client.Send(events.NewMsgOut(msg.Time, msg.ID, msg.Text, msg.Attachments, msg.Origin, user)) + client.Send(events.NewChatMsgOut(msg)) } } diff --git a/service_test.go b/service_test.go index 6151398..1b99d78 100644 --- a/service_test.go +++ b/service_test.go @@ -13,6 +13,13 @@ import ( "github.com/stretchr/testify/require" ) +func TestFoo(t *testing.T) { + testsuite.Runtime() + defer testsuite.ResetRedis() + defer testsuite.ResetDB() + +} + func TestService(t *testing.T) { ctx, rt := testsuite.Runtime() @@ -48,7 +55,7 @@ func TestService(t *testing.T) { assert.Equal(t, []string{"StartChat(8291264a-4581-4d12-96e5-e9fcfa6e68d9, itlu4O6ZE4ZZc07Y5rHxcLoQ)"}, mockCourier.Calls) // server should send a chat_started event back to the client - assert.JSONEq(t, `{"type":"chat_started","time":"2024-05-02T16:05:04Z","chat_id":"itlu4O6ZE4ZZc07Y5rHxcLoQ"}`, client.Read(t)) + assert.JSONEq(t, `{"type":"chat_started","chat_id":"itlu4O6ZE4ZZc07Y5rHxcLoQ"}`, client.Read(t)) client.Send(t, `{"type": "send_msg", "text": "hello"}`) @@ -69,21 +76,21 @@ func TestService(t *testing.T) { // server should send a history event back to the client assert.JSONEq(t, `{ "type": "history", - "time": "2024-05-02T16:05:06Z", "history": [ - {"type": "msg_in", "time": "2024-05-02T16:05:05Z", "msg_id":1, "text": "hello"} + {"msg_in": {"id":1, "text": "hello", "time": "2024-05-02T16:05:04Z"}} ] }`, client.Read(t)) // queue a message to be sent to the client - err = svc.QueueMsgOut(ctx, ch, models.NewMsgOut(123, ch, "itlu4O6ZE4ZZc07Y5rHxcLoQ", "welcome", nil, models.MsgOriginBroadcast, nil, dates.Now())) + err = svc.QueueMsgOut(ctx, ch, contact, models.NewMsgOut(123, "welcome", nil, models.MsgOriginBroadcast, nil, dates.Now())) assert.NoError(t, err) // and check it is sent to the client - assert.JSONEq(t, `{"type": "msg_out", "msg_id": 123, "text": "welcome", "origin": "broadcast", "time": "2024-05-02T16:05:07Z"}`, client.Read(t)) + assert.JSONEq(t, `{"type": "chat_out", "msg_out": {"id": 123, "text": "welcome", "origin": "broadcast", "time": "2024-05-02T16:05:05Z"}}`, client.Read(t)) // client acknowledges receipt of the message - client.Send(t, `{"type": "ack_msg", "msg_id": 123}`) + client.Send(t, `{"type": "ack_chat", "msg_id": 123}`) + time.Sleep(100 * time.Millisecond) client.Close(t) } diff --git a/web/client.go b/web/client.go index b9df1b8..b90cecf 100644 --- a/web/client.go +++ b/web/client.go @@ -10,7 +10,6 @@ import ( "github.com/nyaruka/chip/core/models" "github.com/nyaruka/chip/web/commands" "github.com/nyaruka/chip/web/events" - "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/uuids" @@ -83,9 +82,9 @@ func (c *Client) onCommand(cmd commands.Command) error { c.contact = contact if isNew { - c.Send(events.NewChatStarted(dates.Now(), contact.ChatID)) + c.Send(events.NewChatStarted(contact.ChatID)) } else { - c.Send(events.NewChatResumed(dates.Now(), contact.ChatID, contact.Email)) + c.Send(events.NewChatResumed(contact.ChatID, contact.Email)) } case *commands.SendMsg: @@ -98,7 +97,7 @@ func (c *Client) onCommand(cmd commands.Command) error { return fmt.Errorf("error from service: %w", err) } - case *commands.AckMsg: + case *commands.AckChat: if c.contact == nil { log.Debug("chat not started, command ignored") return nil @@ -120,26 +119,21 @@ func (c *Client) onCommand(cmd commands.Command) error { } - history := make([]events.Event, len(msgs)) + history := make([]*events.HistoryItem, len(msgs)) for i, m := range msgs { if m.Direction == models.DirectionOut { - // TODO find logical place for this so that it can be shared with Service.send - var user *events.User - if m.CreatedByID != models.NilUserID { - u, err := c.server.service.Store().GetUser(ctx, m.CreatedByID) - if err != nil { - log.Error("error fetching user", "error", err) - } else { - user = events.NewUser(u.Name(), u.Email, u.AvatarURL(c.server.rt.Config)) - } + msgOut, err := m.ToMsgOut(ctx, c.server.service.Store()) + if err != nil { + return fmt.Errorf("error converting outbound message: %w", err) } - history[i] = events.NewMsgOut(m.CreatedOn, m.ID, m.Text, m.Attachments, m.Origin(), user) + + history[i] = &events.HistoryItem{MsgOut: msgOut} } else { - history[i] = events.NewMsgIn(m.CreatedOn, m.ID, m.Text) + history[i] = &events.HistoryItem{MsgIn: m.ToMsgIn()} } } - c.Send(events.NewHistory(dates.Now(), history)) + c.Send(events.NewHistory(history)) case *commands.SetEmail: if c.contact == nil { diff --git a/web/commands/ack_msg.go b/web/commands/ack_msg.go index 5292da8..bc4018e 100644 --- a/web/commands/ack_msg.go +++ b/web/commands/ack_msg.go @@ -3,12 +3,12 @@ package commands import "github.com/nyaruka/chip/core/models" func init() { - registerType(TypeAckMsg, func() Command { return &AckMsg{} }) + registerType(TypeAckChat, func() Command { return &AckChat{} }) } -const TypeAckMsg string = "ack_msg" +const TypeAckChat string = "ack_chat" -type AckMsg struct { +type AckChat struct { baseCommand MsgID models.MsgID `json:"msg_id" validate:"required"` diff --git a/web/events/base.go b/web/events/base.go index b8f79c2..b1570c9 100644 --- a/web/events/base.go +++ b/web/events/base.go @@ -1,26 +1,11 @@ package events -import "time" - type Event interface { Type() string - Time() time.Time } type baseEvent struct { - Type_ string `json:"type"` - Time_ time.Time `json:"time"` + Type_ string `json:"type"` } -func (e *baseEvent) Type() string { return e.Type_ } -func (e *baseEvent) Time() time.Time { return e.Time_ } - -type User struct { - Name string `json:"name"` - Email string `json:"email"` - Avatar string `json:"avatar"` -} - -func NewUser(name, email, avatar string) *User { - return &User{Name: name, Email: email, Avatar: avatar} -} +func (e *baseEvent) Type() string { return e.Type_ } diff --git a/web/events/chat_out.go b/web/events/chat_out.go new file mode 100644 index 0000000..d9c8384 --- /dev/null +++ b/web/events/chat_out.go @@ -0,0 +1,18 @@ +package events + +import "github.com/nyaruka/chip/core/models" + +const TypeChatOut string = "chat_out" + +type ChatOutEvent struct { + baseEvent + + MsgOut *models.MsgOut `json:"msg_out,omitempty"` +} + +func NewChatMsgOut(MsgOut *models.MsgOut) *ChatOutEvent { + return &ChatOutEvent{ + baseEvent: baseEvent{Type_: TypeChatOut}, + MsgOut: MsgOut, + } +} diff --git a/web/events/chat_resumed.go b/web/events/chat_resumed.go index 60e9904..9149a06 100644 --- a/web/events/chat_resumed.go +++ b/web/events/chat_resumed.go @@ -1,8 +1,6 @@ package events import ( - "time" - "github.com/nyaruka/chip/core/models" ) @@ -15,6 +13,6 @@ type ChatResumed struct { Email string `json:"email"` } -func NewChatResumed(t time.Time, chatID models.ChatID, email string) *ChatResumed { - return &ChatResumed{baseEvent: baseEvent{Type_: TypeChatResumed, Time_: t}, ChatID: chatID, Email: email} +func NewChatResumed(chatID models.ChatID, email string) *ChatResumed { + return &ChatResumed{baseEvent: baseEvent{Type_: TypeChatResumed}, ChatID: chatID, Email: email} } diff --git a/web/events/chat_started.go b/web/events/chat_started.go index 22ce175..ace0c71 100644 --- a/web/events/chat_started.go +++ b/web/events/chat_started.go @@ -1,8 +1,6 @@ package events import ( - "time" - "github.com/nyaruka/chip/core/models" ) @@ -14,6 +12,6 @@ type ChatStarted struct { ChatID models.ChatID `json:"chat_id"` } -func NewChatStarted(t time.Time, chatID models.ChatID) *ChatStarted { - return &ChatStarted{baseEvent: baseEvent{Type_: TypeChatStarted, Time_: t}, ChatID: chatID} +func NewChatStarted(chatID models.ChatID) *ChatStarted { + return &ChatStarted{baseEvent: baseEvent{Type_: TypeChatStarted}, ChatID: chatID} } diff --git a/web/events/history.go b/web/events/history.go index aa34d91..2ce0ca9 100644 --- a/web/events/history.go +++ b/web/events/history.go @@ -1,36 +1,20 @@ package events -import ( - "time" - - "github.com/nyaruka/chip/core/models" -) +import "github.com/nyaruka/chip/core/models" const TypeHistory string = "history" -const TypeMsgIn string = "msg_in" - -type MsgInEvent struct { - baseEvent - - MsgID models.MsgID `json:"msg_id"` - Text string `json:"text"` -} - -func NewMsgIn(t time.Time, id models.MsgID, text string) Event { - return &MsgInEvent{ - baseEvent: baseEvent{Type_: TypeMsgIn, Time_: t}, - MsgID: id, - Text: text, - } +type HistoryItem struct { + MsgIn *models.MsgIn `json:"msg_in,omitempty"` + MsgOut *models.MsgOut `json:"msg_out,omitempty"` } type HistoryEvent struct { baseEvent - History []Event `json:"history"` + History []*HistoryItem `json:"history"` } -func NewHistory(t time.Time, history []Event) *HistoryEvent { - return &HistoryEvent{baseEvent: baseEvent{Type_: TypeHistory, Time_: t}, History: history} +func NewHistory(history []*HistoryItem) *HistoryEvent { + return &HistoryEvent{baseEvent: baseEvent{Type_: TypeHistory}, History: history} } diff --git a/web/events/msg_out.go b/web/events/msg_out.go deleted file mode 100644 index 94f7244..0000000 --- a/web/events/msg_out.go +++ /dev/null @@ -1,30 +0,0 @@ -package events - -import ( - "time" - - "github.com/nyaruka/chip/core/models" -) - -const TypeMsgOut string = "msg_out" - -type MsgOutEvent struct { - baseEvent - - MsgID models.MsgID `json:"msg_id"` - Text string `json:"text"` - Attachments []string `json:"attachments,omitempty"` - Origin models.MsgOrigin `json:"origin"` - User *User `json:"user,omitempty"` -} - -func NewMsgOut(t time.Time, id models.MsgID, text string, attachments []string, origin models.MsgOrigin, user *User) *MsgOutEvent { - return &MsgOutEvent{ - baseEvent: baseEvent{Type_: TypeMsgOut, Time_: t}, - MsgID: id, - Text: text, - Attachments: attachments, - Origin: origin, - User: user, - } -} diff --git a/web/server.go b/web/server.go index 7d55b48..0627291 100644 --- a/web/server.go +++ b/web/server.go @@ -24,7 +24,7 @@ type Service interface { CreateMsgIn(context.Context, *models.Channel, *models.Contact, string) error ConfirmMsgOut(context.Context, *models.Channel, *models.Contact, models.MsgID) error CloseChat(context.Context, *models.Channel, *models.Contact) error - QueueMsgOut(context.Context, *models.Channel, *models.MsgOut) error + QueueMsgOut(context.Context, *models.Channel, *models.Contact, *models.MsgOut) error } type Server struct { @@ -161,8 +161,13 @@ func (s *Server) handleSend(ctx context.Context, r *http.Request, w http.Respons return } + contact, err := models.LoadContact(ctx, s.rt, ch.OrgID, payload.ChatID) + if err != nil { + writeErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("error loading contact with chat id %s: %s", payload.ChatID, err)) + return + } + var user *models.User - var err error if payload.Msg.UserID != models.NilUserID { user, err = s.service.Store().GetUser(ctx, payload.Msg.UserID) if err != nil { @@ -171,7 +176,7 @@ func (s *Server) handleSend(ctx context.Context, r *http.Request, w http.Respons } } - err = s.service.QueueMsgOut(ctx, ch, models.NewMsgOut(payload.Msg.ID, ch, payload.ChatID, payload.Msg.Text, payload.Msg.Attachments, payload.Msg.Origin, user, time.Now())) + err = s.service.QueueMsgOut(ctx, ch, contact, models.NewMsgOut(payload.Msg.ID, payload.Msg.Text, payload.Msg.Attachments, payload.Msg.Origin, user, time.Now())) if err == nil { w.Write(jsonx.MustMarshal(map[string]any{"status": "queued"})) } else {