-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathmqtt.go
147 lines (132 loc) · 5.64 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"encoding/json"
"fmt"
"log"
"net/url"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/probonopd/go-appimage/internal/helpers"
)
func connect(clientId string, uri *url.URL) mqtt.Client {
opts := createClientOptions(clientId, uri)
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
helpers.PrintError("MQTT", err) // We land here in "horror network" situations, so this must not be fatal
}
return client
}
func createClientOptions(clientId string, uri *url.URL) *mqtt.ClientOptions {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s", uri.Host))
opts.SetUsername(uri.User.Username())
password, _ := uri.User.Password()
opts.SetPassword(password)
opts.SetClientID(clientId)
return opts
}
// UnSubscribeMQTT unubscribe from receiving update notifications for updateinformation
// TODO: Keep track of what we have already subscribed, and remove from that list
func UnSubscribeMQTT(client mqtt.Client, updateinformation string) {
queryEscapedUpdateInformation := url.QueryEscape(updateinformation)
if queryEscapedUpdateInformation == "" {
return
}
client.Unsubscribe(queryEscapedUpdateInformation)
}
// SubscribeMQTT subscribes to receive update notifications for updateinformation
// TODO: Keep track of what we have already subscribed, and don't subscribe again
func SubscribeMQTT(client mqtt.Client, updateinformation string) {
if helpers.SliceContains(subscribedMQTTTopics, updateinformation) {
// We have already subscribed to this; so nothing to do here
return
}
// Need to do this immediately here, otherwise it comes too late
subscribedMQTTTopics = helpers.AppendIfMissing(subscribedMQTTTopics, updateinformation)
time.Sleep(time.Second * 10) // We get retained messages immediately when we subscribe;
// at this point our AppImage may not be integrated yet...
// Also it's better user experience not to be bombarded with updates immediately at startup.
// 10 seconds should be plenty of time.
queryEscapedUpdateInformation := url.QueryEscape(updateinformation)
if queryEscapedUpdateInformation == "" {
return
}
topic := helpers.MQTTNamespace + "/" + queryEscapedUpdateInformation + "/#"
if *verbosePtr {
log.Println("mqtt: Waiting for messages on topic", helpers.MQTTNamespace+"/"+queryEscapedUpdateInformation+"/version")
} else {
log.Println("Subscribing to updates for", updateinformation)
}
client.Subscribe(topic, 0, func(_ mqtt.Client, msg mqtt.Message) {
// log.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload()))
// log.Println(topic)
short := strings.Replace(msg.Topic(), helpers.MQTTNamespace+"/", "", -1)
parts := strings.Split(short, "/")
log.Println("mqtt: received:", parts)
if len(parts) < 2 {
return
}
if parts[1] == "version" {
// version := string(msg.Payload())
// Decode incoming JSON
var data helpers.PubSubData
err := json.Unmarshal(msg.Payload(), &data)
if err != nil {
helpers.PrintError("mqtt unmarshal", err)
}
version := data.Version
if version == "" {
return
}
queryEscapedUpdateInformation := parts[0]
log.Println("mqtt:", queryEscapedUpdateInformation, "reports version", version)
unescapedui, _ := url.QueryUnescape(queryEscapedUpdateInformation)
if unescapedui == thisai.updateinformation {
log.Println("++++++++++++++++++++++++++++++++++++++++++++++++++")
log.Println("+ Update available for this AppImage.")
log.Println("+ Something special should happen here: Selfupdate")
log.Println("+ To be imlpemented.")
log.Println("++++++++++++++++++++++++++++++++++++++++++++++++++")
sendDesktopNotification("Update available", "An update for the AppImage daemon is available; I could update myself now...", 0)
}
mostRecent := FindMostRecentAppImageWithMatchingUpdateInformation(unescapedui)
ai, _ := NewAppImage(mostRecent)
fstime := ai.ModTime()
log.Println("mqtt:", updateinformation, "reports version", version, "with FSTime", data.FSTime.Unix(), "- we have", mostRecent, "with FSTime", fstime.Unix())
// FIXME: Only notify if the version is newer than what we already have.
// More precisely, if the AppImage being offered is *different* from the one we already have
// even despite version numbers being the same.
// Blocked by https://github.com/AppImage/AppImageSpec/issues/29,
// in the meantime we are using "-fstime" from unsquashfs to
// check whether two AppImages are "different". Note that we are
// not using this to determine whether which one is newer,
// since we don't trust that timestamp enough.
// We just determine what is the newest AppImage on the local system
// and if that one is deemed "different" from what was received over PubPub,
// then we assume we should offer to update.
// This mechanism should be more robust against wrong timestamps.
if fstime.Unix() != data.FSTime.Unix() {
ui, err := helpers.NewUpdateInformationFromString(updateinformation)
if err != nil {
helpers.PrintError("mqtt: NewUpdateInformationFromString:", err)
} else {
msg, err := helpers.GetCommitMessageForLatestCommit(ui)
// changelog_url, _ := helpers.GetReleaseURL(ui)
if err != nil {
helpers.PrintError("mqtt: GetCommitMessageForLatestCommit:", err)
} else {
// The following could not be tested yet
go sendUpdateDesktopNotification(ai, version, msg)
//sendDesktopNotification("Update available for "+ai.niceName, "It can be updated to version "+version+". \n"+msg, 120000)
}
}
} else {
log.Println("mqtt: Not taking action on", ai.Name, "because FStime is identical")
}
}
})
}