Skip to content

Commit b9d7988

Browse files
author
Adam Eberlin
committed
Rough STOMP adapter + associated configuration/test changes.
1 parent 73350c7 commit b9d7988

File tree

9 files changed

+284
-39
lines changed

9 files changed

+284
-39
lines changed

.travis.yml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
11
language: ruby
22

3+
bundler_args: --with-development-dependencies --jobs=3 --retry=3 --verbose
4+
5+
cache:
6+
bundler: true
7+
8+
matrix:
9+
fast_finish: true
10+
311
rvm:
412
- 1.9.3
513
- 2.0.0
6-
- 2.1.0
14+
- 2.1.8
15+
- 2.2.4
716

817
gemfile:
918
- gemfiles/Gemfile.rails-3.x
1019
- gemfiles/Gemfile.rails-4.x
1120

21+
# Set up for STOMP tests
22+
services:
23+
- rabbitmq
24+
25+
before_install:
26+
- gem update bundler
27+
28+
install:
29+
- sudo rabbitmq-plugins enable rabbitmq_web_stomp
30+
- sudo service rabbitmq-server restart
31+
1232
# Set up and start Faye Server before tests are run
1333
before_script:
34+
- bundle install
1435
- cp test/travis/sync.yml config/sync.yml
1536
- cp test/travis/sync.ru sync.ru
1637
- thin start -R sync.ru -p 9292 -t 1 2>&1 > test.log &

app/assets/javascripts/sync.coffee

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ $ = jQuery
66
readyQueue: []
77

88
init: ->
9+
return unless SyncConfig? && Sync[SyncConfig.adapter]
10+
@adapter ||= new Sync[SyncConfig.adapter]
11+
912
$ =>
10-
return unless SyncConfig? && Sync[SyncConfig.adapter]
11-
@adapter ||= new Sync[SyncConfig.adapter]
12-
return if @isReady() || !@adapter.available()
13-
@ready = true
14-
@connect()
15-
@flushReadyQueue()
16-
@bindUnsubscribe()
13+
onReadyInterval = setInterval (=>
14+
return if @isReady() || !@adapter.available()
15+
clearInterval(onReadyInterval)
16+
@ready = true
17+
@connect()
18+
), 250
1719

1820

1921
# Handle Turbolinks teardown, unsubscribe from all channels before transition
@@ -30,15 +32,27 @@ $ = jQuery
3032

3133
onConnectFailure: (error) -> #noop
3234

33-
connect: -> @adapter.connect()
35+
connect: ->
36+
@adapter.connect()
37+
38+
onConnectInterval = setInterval (=>
39+
return unless @isConnected()
40+
clearInterval(onConnectInterval)
41+
@adapter.onConnect()
42+
), 250
3443

3544
isConnected: -> @adapter.isConnected()
3645

37-
onReady: (callback) ->
38-
if @isReady()
39-
callback()
40-
else
41-
@readyQueue.push callback
46+
onDebug: (message) ->
47+
window?.console?.log(message)
48+
49+
onReady: (callbacks) ->
50+
callbackOrAddToQueue = (callback) =>
51+
return callback() if @isReady()
52+
@readyQueue.push(callback)
53+
54+
return callbackOrAddToQueue(callbacks) unless callbacks.constructor == Array
55+
callbackOrAddToQueue(callback) for callback in callbacks
4256

4357

4458
flushReadyQueue: ->
@@ -89,14 +103,23 @@ class Sync.Adapter
89103
return
90104

91105
subscribe: (channel, callback) ->
92-
@unsubscribeChannel(channel)
93-
subscription = new Sync[SyncConfig.adapter].Subscription(@client, channel, callback)
94-
@subscriptions.push(subscription)
95-
subscription
106+
onReadyCallback = (channel, callback) =>
107+
@unsubscribeChannel(channel)
108+
subscription = new Sync[SyncConfig.adapter].Subscription(@client, channel, callback)
109+
@subscriptions.push(subscription)
110+
subscription
111+
112+
if @isConnected()
113+
onReadyCallback(channel, callback)
114+
else
115+
Sync.onReady(onReadyCallback)
96116

117+
onConnect: ->
118+
Sync.flushReadyQueue()
119+
Sync.bindUnsubscribe()
97120

98-
class Sync.Faye extends Sync.Adapter
99121

122+
class Sync.Faye extends Sync.Adapter
100123
subscriptions: []
101124

102125
available: ->
@@ -126,8 +149,8 @@ class Sync.Pusher extends Sync.Adapter
126149
!!window.Pusher
127150

128151
connect: ->
129-
opts =
130-
encrypted: SyncConfig.pusher_encrypted
152+
# Pusher doesn't properly transform native boolean flags in options.
153+
opts = { encrypted: SyncConfig.encryption_flag.toString() }
131154

132155
opts.wsHost = SyncConfig.pusher_ws_host if SyncConfig.pusher_ws_host
133156
opts.wsPort = SyncConfig.pusher_ws_port if SyncConfig.pusher_ws_port
@@ -137,12 +160,6 @@ class Sync.Pusher extends Sync.Adapter
137160

138161
isConnected: -> @client?.connection.state is "connected"
139162

140-
subscribe: (channel, callback) ->
141-
@unsubscribeChannel(channel)
142-
subscription = new Sync.Pusher.Subscription(@client, channel, callback)
143-
@subscriptions.push(subscription)
144-
subscription
145-
146163

147164
class Sync.Pusher.Subscription
148165
constructor: (@client, channel, callback) ->
@@ -154,6 +171,60 @@ class Sync.Pusher.Subscription
154171
cancel: ->
155172
@client.unsubscribe(@channel) if @client.channel(@channel)?
156173

174+
class Sync.Stomp extends Sync.Adapter
175+
176+
subscriptions: []
177+
178+
client: null
179+
socket: null
180+
181+
headers: {
182+
login: SyncConfig.api_key,
183+
passcode: SyncConfig.auth_token
184+
}
185+
186+
available: ->
187+
!!window.Stomp
188+
189+
connect: ->
190+
@socket = new window.SockJS(SyncConfig.websocket)
191+
@client = window.Stomp.over(@socket)
192+
193+
# SockJS does not support heart-beat: disable heart-beats
194+
@client.heartbeat.outgoing = 0
195+
@client.heartbeat.incoming = 0
196+
@client.debug = Sync.onDebug if SyncConfig.debug_flag
197+
198+
@client.connect(
199+
@headers['login'],
200+
@headers['passcode'],
201+
@onConnect,
202+
@onError,
203+
'/'
204+
)
205+
206+
onError: (error) ->
207+
console.log({ error: error })
208+
209+
isConnected: ->
210+
try
211+
@client.connected
212+
catch error
213+
false
214+
215+
216+
class Sync.Stomp.Subscription
217+
constructor: (client, channel, callback) ->
218+
@channel = channel
219+
@client = client
220+
# @subscription = @client.subscribe(channel, callback)
221+
@subscription = @client.subscribe channel, ( (e) ->
222+
callback(JSON.parse(e.body))
223+
)
224+
225+
cancel: ->
226+
@subscription.unsubscribe()
227+
157228

158229
class Sync.View
159230

lib/generators/sync/templates/sync.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ development:
1515
# adapter: "Pusher"
1616
# async: true
1717

18+
# Stomp
19+
# development:
20+
# server: 'stomp://YOUR_STOMP_SERVER_LOGIN:YOUR_STOMP_SERVER_PASSCODE@HOST:STOMP_PORT'
21+
# api_key: 'YOUR_STOMP_CLIENT_LOGIN'
22+
# auth_token: 'YOUR_STOMP_CLIENT_PASSCODE'
23+
# adapter: 'Stomp'
24+
# encryption: false
25+
# async: true
26+
# websocket: 'http://HOST:WEBSOCKET_PORT/stomp'
27+
# destination: '/topic/sync'
28+
1829
# Disabled
1930
# development:
2031
# adapter: false

lib/sync.rb

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
require 'sync/resource'
2828
require 'sync/clients/faye'
2929
require 'sync/clients/pusher'
30+
require 'sync/clients/stomp'
3031
require 'sync/clients/dummy'
3132
require 'sync/reactor'
3233
if defined? Rails
@@ -46,13 +47,16 @@ def config
4647
def config_json
4748
@config_json ||= begin
4849
{
49-
server: server,
50+
adapter: adapter,
5051
api_key: api_key,
52+
auth_token: auth_token,
53+
debug_flag: debug_flag,
54+
encryption_flag: encryption_flag,
5155
pusher_ws_host: pusher_ws_host,
5256
pusher_ws_port: pusher_ws_port,
5357
pusher_wss_port: pusher_wss_port,
54-
pusher_encrypted: pusher_encrypted,
55-
adapter: adapter
58+
server: server,
59+
websocket: websocket
5660
}.reject { |k, v| v.nil? }.to_json
5761
end
5862
end
@@ -101,6 +105,16 @@ def server
101105
config[:server]
102106
end
103107

108+
def websocket
109+
config[:websocket]
110+
end
111+
112+
def destination
113+
config[:destination] ||= '/'
114+
return "/#{config[:destination]}" unless config[:destination][0] == '/'
115+
config[:destination]
116+
end
117+
104118
def adapter_javascript_url
105119
config[:adapter_javascript_url]
106120
end
@@ -145,14 +159,19 @@ def pusher_wss_port
145159
config[:pusher_wss_port]
146160
end
147161

148-
def pusher_encrypted
149-
if config[:pusher_encrypted].nil?
162+
def encryption_flag
163+
if config[:encryption].nil?
150164
true
151165
else
152-
config[:pusher_encrypted]
166+
config[:encryption]
153167
end
154168
end
155169

170+
def debug_flag
171+
return config[:debug] unless config.fetch(:debug, nil).nil?
172+
defined?(::Rails) && !::Rails.env.production?
173+
end
174+
156175
def reactor
157176
@reactor ||= Reactor.new
158177
end

lib/sync/clients/stomp.rb

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
module Sync
2+
module Clients
3+
class Stomp
4+
attr_reader :adapter
5+
6+
def setup
7+
require 'stomp'
8+
9+
@adapter ||= ::Stomp::Client.new(Sync.server)
10+
end
11+
12+
def batch_publish(*args)
13+
Message.batch_publish(@adapter, *args)
14+
end
15+
16+
def build_message(*args)
17+
Message.new(*args)
18+
end
19+
20+
# Public: Normalize channel to adapter supported format
21+
#
22+
# channel - The string channel name
23+
#
24+
# Returns The normalized channel prefixed with supported format for Stomp
25+
def normalize_channel(channel)
26+
return "#{Sync.destination}#{channel}" unless Sync.destination[-1] == '/'
27+
"#{Sync.destination}/#{channel}"
28+
end
29+
30+
31+
class Message
32+
33+
attr_accessor :channel, :client, :data
34+
35+
def self.batch_publish(client, messages)
36+
messages.each do |message|
37+
message.client = client
38+
message.publish
39+
end
40+
end
41+
42+
def initialize(channel, data)
43+
self.channel = channel
44+
self.data = data
45+
end
46+
47+
def publish
48+
if Sync.async?
49+
publish_asynchronous
50+
else
51+
publish_synchronous
52+
end
53+
end
54+
55+
def publish_synchronous
56+
client.publish(channel, data.to_json)
57+
end
58+
59+
def publish_asynchronous
60+
Sync.reactor.perform do
61+
client.publish(channel, data.to_json)
62+
end
63+
end
64+
end
65+
end
66+
end
67+
end

sync.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Gem::Specification.new do |s|
2323
s.add_development_dependency 'pry'
2424
s.add_development_dependency 'minitest', '< 5.0.0'
2525
s.add_development_dependency 'codeclimate-test-reporter'
26+
s.add_development_dependency 'stomp', '~> 1.3'
2627

2728
s.required_rubygems_version = ">= 1.3.4"
2829
end

test/fixtures/sync_stomp.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Stomp
2+
test:
3+
server: 'stomp://guest:guest@localhost:61613'
4+
api_key: 'guest'
5+
auth_token: 'guest'
6+
adapter: 'Stomp'
7+
encryption: false
8+
async: true
9+
websocket: 'http://localhost:15674/stomp'
10+
destination: '/topic/sync-'

0 commit comments

Comments
 (0)