Skip to content

Commit d144a49

Browse files
author
Adam Eberlin
committed
Rough STOMP adapter + associated configuration/test changes.
1 parent 9b6276f commit d144a49

File tree

9 files changed

+270
-33
lines changed

9 files changed

+270
-33
lines changed

.travis.yml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
11
language: ruby
22

3+
bundler_args: --with-development-dependencies --jobs=3 --retry=3 --verbose
4+
5+
cache:
6+
bundler: true
7+
8+
matrix:
9+
fast_finish: true
10+
311
rvm:
412
- 1.9.3
513
- 2.0.0
6-
- 2.1.0
14+
- 2.1.8
15+
- 2.2.4
716

817
gemfile:
918
- gemfiles/Gemfile.rails-3.x
1019
- gemfiles/Gemfile.rails-4.x
1120

21+
# Set up for STOMP tests
22+
services:
23+
- rabbitmq
24+
25+
before_install:
26+
- gem update bundler
27+
28+
install:
29+
- sudo rabbitmq-plugins enable rabbitmq_web_stomp
30+
- sudo service rabbitmq-server restart
31+
1232
# Set up and start Faye Server before tests are run
1333
before_script:
34+
- bundle install
1435
- cp test/travis/sync.yml config/sync.yml
1536
- cp test/travis/sync.ru sync.ru
1637
- thin start -R sync.ru -p 9292 -t 1 2>&1 > test.log &

app/assets/javascripts/sync.coffee.erb

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@ $ = jQuery
44

55
ready: false
66
readyQueue: []
7-
FAYE_HOST: "<%= Sync.server %>"
8-
PUSHER_API_KEY: "<%= Sync.api_key %>"
7+
8+
SERVER: "<%= Sync.server %>"
9+
API_KEY: "<%= Sync.api_key %>"
10+
AUTH_TOKEN: "<%= Sync.auth_token %>"
911
CLIENT_ADAPTER: "<%= Sync.adapter %>"
1012
ENCRYPTION_FLAG: "<%= Sync.encryption_flag %>"
13+
WEBSOCKET: "<%= Sync.websocket %>"
14+
DEBUG: <%= defined?(::Rails) && !::Rails.env.production? ? 'true' : 'false' %>
1115

1216
init: ->
1317
return unless Sync[@CLIENT_ADAPTER]
1418
@adapter = new Sync[@CLIENT_ADAPTER]
19+
1520
$ =>
16-
return if @isReady() || [email protected]()
17-
@ready = true
18-
@connect()
19-
@flushReadyQueue()
20-
@bindUnsubscribe()
21+
onReadyInterval = setInterval (=>
22+
return if @isReady() || [email protected]()
23+
clearInterval(onReadyInterval)
24+
@ready = true
25+
@connect()
26+
), 250
2127

2228

2329
# Handle Turbolinks teardown, unsubscribe from all channels before transition
@@ -33,15 +39,24 @@ $ = jQuery
3339

3440
onConnectFailure: (error) -> #noop
3541

36-
connect: -> @adapter.connect()
42+
connect: ->
43+
@adapter.connect()
44+
45+
onConnectInterval = setInterval (=>
46+
return unless @isConnected()
47+
clearInterval(onConnectInterval)
48+
@adapter.onConnect()
49+
), 250
3750

3851
isConnected: -> @adapter.isConnected()
3952

40-
onReady: (callback) ->
41-
if @isReady()
42-
callback()
43-
else
44-
@readyQueue.push callback
53+
onReady: (callbacks) ->
54+
callbackOrAddToQueue = (callback) =>
55+
return callback() if @isReady()
56+
@readyQueue.push(callback)
57+
58+
return callbackOrAddToQueue(callbacks) unless callbacks.constructor == Array
59+
callbackOrAddToQueue(callback) for callback in callbacks
4560

4661

4762
flushReadyQueue: ->
@@ -92,21 +107,30 @@ class Sync.Adapter
92107
return
93108

94109
subscribe: (channel, callback) ->
95-
@unsubscribeChannel(channel)
96-
subscription = new Sync[Sync.CLIENT_ADAPTER].Subscription(@client, channel, callback)
97-
@subscriptions.push(subscription)
98-
subscription
110+
onReadyCallback = (channel, callback) =>
111+
@unsubscribeChannel(channel)
112+
subscription = new Sync[Sync.CLIENT_ADAPTER].Subscription(@client, channel, callback)
113+
@subscriptions.push(subscription)
114+
subscription
115+
116+
if @isConnected()
117+
onReadyCallback(channel, callback)
118+
else
119+
Sync.onReady(onReadyCallback)
99120

121+
onConnect: ->
122+
Sync.flushReadyQueue()
123+
Sync.bindUnsubscribe()
100124

101-
class Sync.Faye extends Sync.Adapter
102125

126+
class Sync.Faye extends Sync.Adapter
103127
subscriptions: []
104128

105129
available: ->
106130
!!window.Faye
107131

108132
connect: ->
109-
@client = new window.Faye.Client(Sync.FAYE_HOST)
133+
@client = new window.Faye.Client(Sync.SERVER)
110134

111135
isConnected: -> @client?.getState() is "CONNECTED"
112136

@@ -129,16 +153,10 @@ class Sync.Pusher extends Sync.Adapter
129153
!!window.Pusher
130154

131155
connect: ->
132-
@client = new window.Pusher(Sync.PUSHER_API_KEY, encrypted: Sync.ENCRYPTION_FLAG)
156+
@client = new window.Pusher(Sync.API_KEY, encrypted: Sync.ENCRYPTION_FLAG)
133157

134158
isConnected: -> @client?.connection.state is "connected"
135159

136-
subscribe: (channel, callback) ->
137-
@unsubscribeChannel(channel)
138-
subscription = new Sync.Pusher.Subscription(@client, channel, callback)
139-
@subscriptions.push(subscription)
140-
subscription
141-
142160

143161
class Sync.Pusher.Subscription
144162
constructor: (@client, channel, callback) ->
@@ -150,6 +168,59 @@ class Sync.Pusher.Subscription
150168
cancel: ->
151169
@client.unsubscribe(@channel) if @client.channel(@channel)?
152170

171+
class Sync.Stomp extends Sync.Adapter
172+
173+
subscriptions: []
174+
175+
client: null
176+
socket: null
177+
178+
headers: {
179+
login: Sync.API_KEY,
180+
passcode: Sync.AUTH_TOKEN
181+
}
182+
183+
available: ->
184+
!!window.Stomp
185+
186+
connect: ->
187+
@socket = new SockJS(Sync.WEBSOCKET)
188+
@client = window.Stomp.over(self.socket)
189+
190+
# SockJS does not support heart-beat: disable heart-beats
191+
@client.heartbeat.outgoing = 0
192+
@client.heartbeat.incoming = 0
193+
194+
@client.connect(
195+
@headers['login'],
196+
@headers['passcode'],
197+
@onConnect,
198+
@onError,
199+
'/'
200+
)
201+
202+
onError: (error) ->
203+
console.log({ error: error })
204+
205+
isConnected: ->
206+
try
207+
@client.connected
208+
catch error
209+
false
210+
211+
212+
class Sync.Stomp.Subscription
213+
constructor: (client, channel, callback) ->
214+
@channel = channel
215+
@client = client
216+
# @subscription = @client.subscribe(channel, callback)
217+
@subscription = @client.subscribe channel, ( (e) ->
218+
callback(JSON.parse(e.body))
219+
)
220+
221+
cancel: ->
222+
@subscription.unsubscribe()
223+
153224

154225
class Sync.View
155226

lib/generators/sync/templates/sync.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,17 @@ development:
1616
# encryption: true
1717
# async: true
1818

19+
# Stomp
20+
# development:
21+
# server: 'stomp://YOUR_STOMP_SERVER_LOGIN:YOUR_STOMP_SERVER_PASSCODE@HOST:STOMP_PORT'
22+
# api_key: 'YOUR_STOMP_CLIENT_LOGIN'
23+
# auth_token: 'YOUR_STOMP_CLIENT_PASSCODE'
24+
# adapter: 'Stomp'
25+
# encryption: false
26+
# async: true
27+
# websocket: 'http://HOST:WEBSOCKET_PORT/stomp'
28+
# destination: '/topic/sync'
29+
1930
# Disabled
2031
# development:
2132
# adapter: false

lib/sync.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
require 'sync/resource'
2828
require 'sync/clients/faye'
2929
require 'sync/clients/pusher'
30+
require 'sync/clients/stomp'
3031
require 'sync/clients/dummy'
3132
require 'sync/reactor'
3233
if defined? Rails
@@ -86,6 +87,16 @@ def server
8687
config[:server]
8788
end
8889

90+
def websocket
91+
config[:websocket]
92+
end
93+
94+
def destination
95+
config[:destination] ||= '/'
96+
return "/#{config[:destination]}" unless config[:destination][0] == '/'
97+
config[:destination]
98+
end
99+
89100
def adapter_javascript_url
90101
config[:adapter_javascript_url]
91102
end

lib/sync/clients/stomp.rb

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
module Sync
2+
module Clients
3+
class Stomp
4+
attr_reader :adapter
5+
6+
def setup
7+
require 'stomp'
8+
9+
@adapter ||= ::Stomp::Client.new(Sync.server)
10+
end
11+
12+
def batch_publish(*args)
13+
Message.batch_publish(@adapter, *args)
14+
end
15+
16+
def build_message(*args)
17+
Message.new(*args)
18+
end
19+
20+
# Public: Normalize channel to adapter supported format
21+
#
22+
# channel - The string channel name
23+
#
24+
# Returns The normalized channel prefixed with supported format for Stomp
25+
def normalize_channel(channel)
26+
return "#{Sync.destination}#{channel}" unless Sync.destination[-1] == '/'
27+
"#{Sync.destination}/#{channel}"
28+
end
29+
30+
31+
class Message
32+
33+
attr_accessor :channel, :client, :data
34+
35+
def self.batch_publish(client, messages)
36+
messages.each do |message|
37+
message.client = client
38+
message.publish
39+
end
40+
end
41+
42+
def initialize(channel, data)
43+
self.channel = channel
44+
self.data = data
45+
end
46+
47+
def publish
48+
if Sync.async?
49+
publish_asynchronous
50+
else
51+
publish_synchronous
52+
end
53+
end
54+
55+
def publish_synchronous
56+
raise NotImplementedError
57+
end
58+
59+
def publish_asynchronous
60+
Sync.reactor.perform do
61+
client.publish(channel, data.to_json)
62+
end
63+
end
64+
end
65+
end
66+
end
67+
end

sync.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Gem::Specification.new do |s|
2323
s.add_development_dependency 'pry'
2424
s.add_development_dependency 'minitest', '< 5.0.0'
2525
s.add_development_dependency 'codeclimate-test-reporter'
26+
s.add_development_dependency 'stomp', '~> 1.3'
2627

2728
s.required_rubygems_version = ">= 1.3.4"
2829
end

test/fixtures/sync_stomp.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Stomp
2+
test:
3+
server: 'stomp://guest:guest@localhost:61613'
4+
api_key: 'guest'
5+
auth_token: 'guest'
6+
adapter: 'Stomp'
7+
encryption: false
8+
async: true
9+
websocket: 'http://localhost:15674/stomp'
10+
destination: '/topic/sync-'

0 commit comments

Comments
 (0)