-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgcloud-pubsub-ascoltatore.js
More file actions
104 lines (91 loc) · 2.98 KB
/
gcloud-pubsub-ascoltatore.js
File metadata and controls
104 lines (91 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
var Ascoltatori = require("ascoltatori");
var defer = Ascoltatori.util.defer;
function Ascoltatore(gcloudConfig) {
this.pubsub = require("@google-cloud/pubsub")(gcloudConfig);
this.subscriptions = {};
}
// Extend AbstractAscoltatore!
Ascoltatore.prototype = Object.create(
Ascoltatori.AbstractAscoltatore.prototype
);
Ascoltatore.prototype.constructor = Ascoltatore;
Ascoltatore.prototype.transformResourceName = function(topic) {
return "a" + topic.replace(/[^a-zA-Z0-9\-_\.\~\+\%]/g, "-");
};
Ascoltatore.prototype.subscribe = function(topicName, callback, done) {
var self = this;
this.pubsub.topic(this.transformResourceName(topicName)).get(
{autoCreate: true}, function(err, topic, apiResponse) {
topic.subscribe(function(err, subscription) {
if (subscription) {
(self.subscriptions[topicName] = self.subscriptions[topicName] || [])
.push(subscription);
subscription.on("message", function(message) {
// Rework into UTF-8
var arr = message.data.value.data, str = "";
for (var i = 0; i < arr.length; i++) {
str += "%" + ("0" + arr[i].toString(16)).slice(-2);
}
str = decodeURIComponent(str);
callback(
message.data.topic,
str,
message.data.options
);
console.log(
"Received message", str,
"for topic", message.data.topic,
"and subscription", subscription.name
);
message.ack(function() {
console.log("Acknowledged message ID", message.id);
});
});
subscription.on("error", function(error) {
console.error(
"Error", "'" + error + "''", " subscribing to", subscription
);
});
defer(done);
console.log("Subscribed to", topicName);
} else {
console.error("Failed to create Subscription!");
}
});
}
);
};
Ascoltatore.prototype.publish = function(topicName, payload, options, done) {
this.pubsub.topic(this.transformResourceName(topicName)).get(
{autoCreate: true}, function(err, topic, apiResponse) {
topic.publish({
value: payload,
topic: topicName,
options: options
}, function(error) {
if (error) {
console.error(
"Error ", "'" + error + "'", " publishing to", topic
);
}
defer(done);
console.log("Published to", topicName);
});
}
);
};
Ascoltatore.prototype.unsubscribe = function(topicName, callback, done) {
this.subscriptions[topicName].forEach(function(item) {
console.log("Deleting subscription", item.name);
item.delete(function() {
defer(done);
});
});
};
Ascoltatore.prototype.close = function(done) {
console.warn(
"GcloudPubsubAscoltatore#close is unimplemented since PubSub",
"doesn't necessitate it"
);
};
module.exports = Ascoltatore;