-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathtask.go
91 lines (76 loc) · 2.54 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package rapidpro
import (
"fmt"
"time"
"github.com/gomodule/redigo/redis"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/jsonx"
)
func queueMsgHandling(rc redis.Conn, c *Contact, m *Msg) error {
channel := m.Channel().(*Channel)
body := map[string]any{
"channel_id": channel.ID_,
"msg_id": m.ID_,
"msg_uuid": m.UUID(),
"msg_external_id": m.ExternalID(),
"urn": m.URN().String(),
"urn_id": m.ContactURNID_,
"text": m.Text(),
"attachments": m.Attachments(),
"new_contact": c.IsNew_,
}
return queueMailroomTask(rc, "msg_received", m.OrgID_, m.ContactID_, body)
}
func queueEventHandling(rc redis.Conn, c *Contact, e *ChannelEvent) error {
body := map[string]any{
"event_id": e.ID_,
"event_type": e.EventType_,
"urn_id": e.ContactURNID_,
"channel_id": e.ChannelID_,
"extra": e.Extra(),
"new_contact": c.IsNew_,
"occurred_on": e.OccurredOn_,
"created_on": e.CreatedOn_,
}
if e.OptInID_ != 0 {
body["optin_id"] = e.OptInID_
}
return queueMailroomTask(rc, "event_received", e.OrgID_, e.ContactID_, body)
}
func queueMsgDeleted(rc redis.Conn, ch *Channel, msgID courier.MsgID, contactID ContactID) error {
return queueMailroomTask(rc, "msg_deleted", ch.OrgID_, contactID, map[string]any{"msg_id": msgID})
}
// queueMailroomTask queues the passed in task to mailroom. Mailroom processes both messages and
// channel event tasks through the same ordered queue.
func queueMailroomTask(rc redis.Conn, taskType string, orgID OrgID, contactID ContactID, body map[string]any) (err error) {
// create our event task
eventJSON := jsonx.MustMarshal(mrTask{
Type: taskType,
Task: body,
QueuedOn: time.Now(),
})
// create our org task
contactJSON := jsonx.MustMarshal(mrTask{
Type: "handle_contact_event",
Task: mrContactTask{ContactID: contactID},
QueuedOn: time.Now(),
})
now := time.Now().UTC()
epochFloat := float64(now.UnixNano()) / float64(time.Second)
// we do all our queueing in a transaction
contactQueue := fmt.Sprintf("c:%d:%d", orgID, contactID)
rc.Send("MULTI")
rc.Send("RPUSH", contactQueue, eventJSON)
rc.Send("ZADD", fmt.Sprintf("tasks:handler:%d", orgID), fmt.Sprintf("%.5f", epochFloat-10000000), contactJSON)
rc.Send("ZINCRBY", "tasks:handler:active", 0, orgID)
_, err = rc.Do("EXEC")
return err
}
type mrContactTask struct {
ContactID ContactID `json:"contact_id"`
}
type mrTask struct {
Type string `json:"type"`
Task any `json:"task"`
QueuedOn time.Time `json:"queued_on"`
}