Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/floatpane/matcha/daemonrpc"
"github.com/floatpane/matcha/fetcher"
"github.com/floatpane/matcha/notify"
"github.com/floatpane/matcha/sender"
)

const inboxFolder = "INBOX"
Expand Down Expand Up @@ -46,6 +47,15 @@ type Daemon struct {

shutdown chan struct{}
done chan struct{}

outbox map[string]*OutboxEntry
outboxMu sync.Mutex
}

type OutboxEntry struct {
ID string
Params daemonrpc.SendEmailParams
SendAt time.Time
}

// New creates a daemon with the given config.
Expand All @@ -59,6 +69,7 @@ func New(cfg *config.Config) *Daemon {
idleUpdates: idleUpdates,
shutdown: make(chan struct{}),
done: make(chan struct{}),
outbox: make(map[string]*OutboxEntry),
}

d.server = udsrpc.NewServer()
Expand Down Expand Up @@ -92,6 +103,8 @@ func (d *Daemon) registerHandlers() {
d.server.Handle(daemonrpc.MethodRefreshFolder, d.handleRefreshFolder)
d.server.Handle(daemonrpc.MethodSubscribe, d.handleSubscribe)
d.server.Handle(daemonrpc.MethodUnsubscribe, d.handleUnsubscribe)
d.server.Handle(daemonrpc.MethodQueueEmail, d.handleQueueEmail)
d.server.Handle(daemonrpc.MethodCancelEmail, d.handleCancelEmail)
}

// Run starts the daemon: creates providers, starts the socket listener,
Expand Down Expand Up @@ -157,6 +170,8 @@ func (d *Daemon) Run() error {
d.syncCancel = cancel
go d.backgroundSync(ctx)

go d.processOutbox(ctx)

// Serve client connections via the shared RPC server. Canceling serveCtx
// closes the listener and unblocks Serve.
serveCtx, serveCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -506,3 +521,62 @@ func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []con
// Save merged cache
return config.SaveFolderEmailCache(folderName, merged)
}

func (d *Daemon) processOutbox(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
d.outboxMu.Lock()
for id, entry := range d.outbox {
if time.Now().After(entry.SendAt) {
delete(d.outbox, id)
go d.sendOutboxEntry(entry)
}
}
d.outboxMu.Unlock()
}
}
}

func (d *Daemon) sendOutboxEntry(entry *OutboxEntry) {
acct := d.getAccount(entry.Params.AccountID)
if acct == nil {
log.Printf("daemon: outbox send failed, no account for %s", entry.Params.AccountID)
return
}

rawMsg, err := sender.SendEmail(
acct,
entry.Params.To,
entry.Params.Cc,
entry.Params.Bcc,
entry.Params.Subject,
entry.Params.Body,
entry.Params.HTMLBody,
nil,
entry.Params.Attachments,
entry.Params.InReplyTo,
entry.Params.References,
entry.Params.SignSMIME,
entry.Params.EncryptSMIME,
entry.Params.SignPGP,
entry.Params.EncryptPGP,
)
if err != nil {
log.Printf("daemon: outbox send failed for %s: %v", entry.ID, err)
return
}

if acct.ServiceProvider != "gmail" {
if err := fetcher.AppendToSentMailbox(acct, rawMsg); err != nil {
log.Printf("daemon: append to sent failed for %s: %v", entry.ID, err)
}
}

log.Printf("daemon: outbox sent email %s", entry.ID)
}
44 changes: 44 additions & 0 deletions daemon/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/floatpane/matcha/daemonrpc"
"github.com/google/uuid"
)

// Per-handler timeouts. fetchTimeout covers reads against the upstream IMAP
Expand Down Expand Up @@ -338,3 +339,46 @@ func (d *Daemon) handleUnsubscribe(_ context.Context, conn *daemonrpc.Conn, para

return true, nil
}

func (d *Daemon) handleQueueEmail(_ context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
args, err := decodeParams[daemonrpc.QueueEmailParams](params)
if err != nil {
return nil, parseError(err)
}

id := uuid.New().String()
entry := &OutboxEntry{
ID: id,
Params: args.Email,
SendAt: time.Now().Add(time.Duration(args.DelaySeconds) * time.Second),
}

d.outboxMu.Lock()
d.outbox[id] = entry
d.outboxMu.Unlock()

log.Printf("daemon: queued email %s, sending in %ds", id, args.DelaySeconds)

return daemonrpc.QueueEmailResult{JobID: id}, nil
}

func (d *Daemon) handleCancelEmail(_ context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
args, err := decodeParams[daemonrpc.CancelEmailParams](params)
if err != nil {
return nil, parseError(err)
}

d.outboxMu.Lock()
_, exists := d.outbox[args.JobID]
if exists {
delete(d.outbox, args.JobID)
}
d.outboxMu.Unlock()

if !exists {
return nil, fmt.Errorf("job %s not found", args.JobID)
}

log.Printf("daemon: cancelled email %s", args.JobID)
return true, nil
}
75 changes: 75 additions & 0 deletions daemonclient/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package daemonclient

import (
"context"
"fmt"
"log"
"os"
"os/exec"
Expand All @@ -10,6 +11,8 @@ import (
"github.com/floatpane/matcha/backend"
"github.com/floatpane/matcha/config"
"github.com/floatpane/matcha/daemonrpc"
"github.com/floatpane/matcha/fetcher"
"github.com/floatpane/matcha/sender"
)

// Service abstracts daemon-backed vs direct email operations.
Expand All @@ -24,6 +27,8 @@ type Service interface {
MoveEmails(accountID string, uids []uint32, src, dst string) error
MarkRead(accountID, folder string, uids []uint32) error
MarkUnread(accountID, folder string, uids []uint32) error
QueueEmail(accountID string, to, cc, bcc []string, subject, body, htmlBody string, attachments map[string][]byte, inReplyTo string, references []string, signSMIME, encryptSMIME, signPGP, encryptPGP bool, delaySeconds int) (string, error)
CancelEmail(jobID string) error
FetchFolders(accountID string) ([]backend.Folder, error)
RefreshFolder(accountID, folder string) error
Subscribe(accountID, folder string) error
Expand Down Expand Up @@ -171,6 +176,36 @@ func (s *daemonService) MarkUnread(accountID, folder string, uids []uint32) erro
}, nil)
}

func (s *daemonService) QueueEmail(accountID string, to, cc, bcc []string, subject, body, htmlBody string, attachments map[string][]byte, inReplyTo string, references []string, signSMIME, encryptSMIME, signPGP, encryptPGP bool, delaySeconds int) (string, error) {
var result daemonrpc.QueueEmailResult
err := s.client.Call(daemonrpc.MethodQueueEmail, daemonrpc.QueueEmailParams{
Email: daemonrpc.SendEmailParams{
AccountID: accountID,
To: to,
Cc: cc,
Bcc: bcc,
Subject: subject,
Body: body,
HTMLBody: htmlBody,
Attachments: attachments,
InReplyTo: inReplyTo,
References: references,
SignSMIME: signSMIME,
EncryptSMIME: encryptSMIME,
SignPGP: signPGP,
EncryptPGP: encryptPGP,
},
DelaySeconds: delaySeconds,
}, &result)
return result.JobID, err
}

func (s *daemonService) CancelEmail(jobID string) error {
return s.client.Call(daemonrpc.MethodCancelEmail, daemonrpc.CancelEmailParams{
JobID: jobID,
}, nil)
}

func (s *daemonService) FetchFolders(accountID string) ([]backend.Folder, error) {
var folders []backend.Folder
err := s.client.Call(daemonrpc.MethodFetchFolders, daemonrpc.FetchFoldersParams{
Expand Down Expand Up @@ -366,3 +401,43 @@ func (s *directService) Close() error {
close(s.events)
return nil
}

func (s *directService) QueueEmail(accountID string, to, cc, bcc []string, subject, body, htmlBody string, attachments map[string][]byte, inReplyTo string, references []string, signSMIME, encryptSMIME, signPGP, encryptPGP bool, _ int) (string, error) {
acct := s.cfg.GetAccountByID(accountID)
if acct == nil {
return "", fmt.Errorf("no account for %s", accountID)
}

rawMsg, err := sender.SendEmail(
acct,
to,
cc,
bcc,
subject,
body,
htmlBody,
nil,
attachments,
inReplyTo,
references,
signSMIME,
encryptSMIME,
signPGP,
encryptPGP,
)
if err != nil {
return "", err
}

if acct.ServiceProvider != "gmail" {
if err := fetcher.AppendToSentMailbox(acct, rawMsg); err != nil {
log.Printf("direct: append to sent failed: %v", err)
}
}

return "", nil
}

func (s *directService) CancelEmail(_ string) error {
return nil
}
15 changes: 15 additions & 0 deletions daemonrpc/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
MethodGetCachedEmails = "GetCachedEmails"
MethodGetCachedBody = "GetCachedBody"
MethodExportContacts = "ExportContacts"
MethodQueueEmail = "QueueEmail"
MethodCancelEmail = "CancelEmail"
)

// Event type names.
Expand Down Expand Up @@ -93,6 +95,19 @@ type FetchEmailBodyParams struct {
UID uint32 `json:"uid"`
}

type QueueEmailParams struct {
Email SendEmailParams `json:"email"`
DelaySeconds int `json:"delay_seconds"`
}

type QueueEmailResult struct {
JobID string `json:"job_id"`
}

type CancelEmailParams struct {
JobID string `json:"job_id"`
}

type FetchEmailBodyResult struct {
Body string `json:"body"`
BodyMIMEType string `json:"body_mime_type,omitempty"`
Expand Down
Loading
Loading