Skip to content

Commit ac9e5fd

Browse files
author
Adam Eberlin
committed
Rough STOMP adapter + associated configuration/test changes.
1 parent 9b6276f commit ac9e5fd

File tree

9 files changed

+281
-34
lines changed

9 files changed

+281
-34
lines changed

.travis.yml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
11
language: ruby
22

3+
bundler_args: --with-development-dependencies --jobs=3 --retry=3 --verbose
4+
5+
cache:
6+
bundler: true
7+
8+
matrix:
9+
fast_finish: true
10+
311
rvm:
412
- 1.9.3
513
- 2.0.0
6-
- 2.1.0
14+
- 2.1.8
15+
- 2.2.4
716

817
gemfile:
918
- gemfiles/Gemfile.rails-3.x
1019
- gemfiles/Gemfile.rails-4.x
1120

21+
# Set up for STOMP tests
22+
services:
23+
- rabbitmq
24+
25+
before_install:
26+
- gem update bundler
27+
28+
install:
29+
- sudo rabbitmq-plugins enable rabbitmq_web_stomp
30+
- sudo service rabbitmq-server restart
31+
1232
# Set up and start Faye Server before tests are run
1333
before_script:
34+
- bundle install
1435
- cp test/travis/sync.yml config/sync.yml
1536
- cp test/travis/sync.ru sync.ru
1637
- thin start -R sync.ru -p 9292 -t 1 2>&1 > test.log &

app/assets/javascripts/sync.coffee.erb

Lines changed: 103 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@ $ = jQuery
44

55
ready: false
66
readyQueue: []
7-
FAYE_HOST: "<%= Sync.server %>"
8-
PUSHER_API_KEY: "<%= Sync.api_key %>"
7+
8+
SERVER: "<%= Sync.server %>"
9+
API_KEY: "<%= Sync.api_key %>"
10+
AUTH_TOKEN: "<%= Sync.auth_token %>"
911
CLIENT_ADAPTER: "<%= Sync.adapter %>"
10-
ENCRYPTION_FLAG: "<%= Sync.encryption_flag %>"
12+
ENCRYPTION_FLAG: <%= Sync.encryption_flag %>
13+
WEBSOCKET: "<%= Sync.websocket %>"
14+
DEBUG_FLAG: <%= Sync.debug_flag %>
1115

1216
init: ->
1317
return unless Sync[@CLIENT_ADAPTER]
1418
@adapter = new Sync[@CLIENT_ADAPTER]
19+
1520
$ =>
16-
return if @isReady() || [email protected]()
17-
@ready = true
18-
@connect()
19-
@flushReadyQueue()
20-
@bindUnsubscribe()
21+
onReadyInterval = setInterval (=>
22+
return if @isReady() || [email protected]()
23+
clearInterval(onReadyInterval)
24+
@ready = true
25+
@connect()
26+
), 250
2127

2228

2329
# Handle Turbolinks teardown, unsubscribe from all channels before transition
@@ -33,15 +39,27 @@ $ = jQuery
3339

3440
onConnectFailure: (error) -> #noop
3541

36-
connect: -> @adapter.connect()
42+
connect: ->
43+
@adapter.connect()
44+
45+
onConnectInterval = setInterval (=>
46+
return unless @isConnected()
47+
clearInterval(onConnectInterval)
48+
@adapter.onConnect()
49+
), 250
3750

3851
isConnected: -> @adapter.isConnected()
3952

40-
onReady: (callback) ->
41-
if @isReady()
42-
callback()
43-
else
44-
@readyQueue.push callback
53+
onDebug: (message) ->
54+
window?.console?.log(message)
55+
56+
onReady: (callbacks) ->
57+
callbackOrAddToQueue = (callback) =>
58+
return callback() if @isReady()
59+
@readyQueue.push(callback)
60+
61+
return callbackOrAddToQueue(callbacks) unless callbacks.constructor == Array
62+
callbackOrAddToQueue(callback) for callback in callbacks
4563

4664

4765
flushReadyQueue: ->
@@ -92,21 +110,30 @@ class Sync.Adapter
92110
return
93111

94112
subscribe: (channel, callback) ->
95-
@unsubscribeChannel(channel)
96-
subscription = new Sync[Sync.CLIENT_ADAPTER].Subscription(@client, channel, callback)
97-
@subscriptions.push(subscription)
98-
subscription
113+
onReadyCallback = (channel, callback) =>
114+
@unsubscribeChannel(channel)
115+
subscription = new Sync[Sync.CLIENT_ADAPTER].Subscription(@client, channel, callback)
116+
@subscriptions.push(subscription)
117+
subscription
118+
119+
if @isConnected()
120+
onReadyCallback(channel, callback)
121+
else
122+
Sync.onReady(onReadyCallback)
99123

124+
onConnect: ->
125+
Sync.flushReadyQueue()
126+
Sync.bindUnsubscribe()
100127

101-
class Sync.Faye extends Sync.Adapter
102128

129+
class Sync.Faye extends Sync.Adapter
103130
subscriptions: []
104131

105132
available: ->
106133
!!window.Faye
107134

108135
connect: ->
109-
@client = new window.Faye.Client(Sync.FAYE_HOST)
136+
@client = new window.Faye.Client(Sync.SERVER)
110137

111138
isConnected: -> @client?.getState() is "CONNECTED"
112139

@@ -129,16 +156,11 @@ class Sync.Pusher extends Sync.Adapter
129156
!!window.Pusher
130157

131158
connect: ->
132-
@client = new window.Pusher(Sync.PUSHER_API_KEY, encrypted: Sync.ENCRYPTION_FLAG)
159+
# Pusher doesn't properly transform native boolean flags in options.
160+
@client = new window.Pusher(Sync.API_KEY, encrypted: Sync.ENCRYPTION_FLAG.toString())
133161

134162
isConnected: -> @client?.connection.state is "connected"
135163

136-
subscribe: (channel, callback) ->
137-
@unsubscribeChannel(channel)
138-
subscription = new Sync.Pusher.Subscription(@client, channel, callback)
139-
@subscriptions.push(subscription)
140-
subscription
141-
142164

143165
class Sync.Pusher.Subscription
144166
constructor: (@client, channel, callback) ->
@@ -150,6 +172,60 @@ class Sync.Pusher.Subscription
150172
cancel: ->
151173
@client.unsubscribe(@channel) if @client.channel(@channel)?
152174

175+
class Sync.Stomp extends Sync.Adapter
176+
177+
subscriptions: []
178+
179+
client: null
180+
socket: null
181+
182+
headers: {
183+
login: Sync.API_KEY,
184+
passcode: Sync.AUTH_TOKEN
185+
}
186+
187+
available: ->
188+
!!window.Stomp
189+
190+
connect: ->
191+
@socket = new window.SockJS(Sync.WEBSOCKET)
192+
@client = window.Stomp.over(@socket)
193+
194+
# SockJS does not support heart-beat: disable heart-beats
195+
@client.heartbeat.outgoing = 0
196+
@client.heartbeat.incoming = 0
197+
@client.debug = Sync.onDebug if Sync.DEBUG_FLAG
198+
199+
@client.connect(
200+
@headers['login'],
201+
@headers['passcode'],
202+
@onConnect,
203+
@onError,
204+
'/'
205+
)
206+
207+
onError: (error) ->
208+
console.log({ error: error })
209+
210+
isConnected: ->
211+
try
212+
@client.connected
213+
catch error
214+
false
215+
216+
217+
class Sync.Stomp.Subscription
218+
constructor: (client, channel, callback) ->
219+
@channel = channel
220+
@client = client
221+
# @subscription = @client.subscribe(channel, callback)
222+
@subscription = @client.subscribe channel, ( (e) ->
223+
callback(JSON.parse(e.body))
224+
)
225+
226+
cancel: ->
227+
@subscription.unsubscribe()
228+
153229

154230
class Sync.View
155231

lib/generators/sync/templates/sync.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,17 @@ development:
1616
# encryption: true
1717
# async: true
1818

19+
# Stomp
20+
# development:
21+
# server: 'stomp://YOUR_STOMP_SERVER_LOGIN:YOUR_STOMP_SERVER_PASSCODE@HOST:STOMP_PORT'
22+
# api_key: 'YOUR_STOMP_CLIENT_LOGIN'
23+
# auth_token: 'YOUR_STOMP_CLIENT_PASSCODE'
24+
# adapter: 'Stomp'
25+
# encryption: false
26+
# async: true
27+
# websocket: 'http://HOST:WEBSOCKET_PORT/stomp'
28+
# destination: '/topic/sync'
29+
1930
# Disabled
2031
# development:
2132
# adapter: false

lib/sync.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
require 'sync/resource'
2828
require 'sync/clients/faye'
2929
require 'sync/clients/pusher'
30+
require 'sync/clients/stomp'
3031
require 'sync/clients/dummy'
3132
require 'sync/reactor'
3233
if defined? Rails
@@ -86,6 +87,16 @@ def server
8687
config[:server]
8788
end
8889

90+
def websocket
91+
config[:websocket]
92+
end
93+
94+
def destination
95+
config[:destination] ||= '/'
96+
return "/#{config[:destination]}" unless config[:destination][0] == '/'
97+
config[:destination]
98+
end
99+
89100
def adapter_javascript_url
90101
config[:adapter_javascript_url]
91102
end
@@ -110,6 +121,11 @@ def encryption_flag
110121
config[:encryption] || false
111122
end
112123

124+
def debug_flag
125+
return config[:debug] unless config.fetch(:debug, nil).nil?
126+
defined?(::Rails) && !::Rails.env.production?
127+
end
128+
113129
def reactor
114130
@reactor ||= Reactor.new
115131
end

lib/sync/clients/stomp.rb

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
module Sync
2+
module Clients
3+
class Stomp
4+
attr_reader :adapter
5+
6+
def setup
7+
require 'stomp'
8+
9+
@adapter ||= ::Stomp::Client.new(Sync.server)
10+
end
11+
12+
def batch_publish(*args)
13+
Message.batch_publish(@adapter, *args)
14+
end
15+
16+
def build_message(*args)
17+
Message.new(*args)
18+
end
19+
20+
# Public: Normalize channel to adapter supported format
21+
#
22+
# channel - The string channel name
23+
#
24+
# Returns The normalized channel prefixed with supported format for Stomp
25+
def normalize_channel(channel)
26+
return "#{Sync.destination}#{channel}" unless Sync.destination[-1] == '/'
27+
"#{Sync.destination}/#{channel}"
28+
end
29+
30+
31+
class Message
32+
33+
attr_accessor :channel, :client, :data
34+
35+
def self.batch_publish(client, messages)
36+
messages.each do |message|
37+
message.client = client
38+
message.publish
39+
end
40+
end
41+
42+
def initialize(channel, data)
43+
self.channel = channel
44+
self.data = data
45+
end
46+
47+
def publish
48+
if Sync.async?
49+
publish_asynchronous
50+
else
51+
publish_synchronous
52+
end
53+
end
54+
55+
def publish_synchronous
56+
client.publish(channel, data.to_json)
57+
end
58+
59+
def publish_asynchronous
60+
Sync.reactor.perform do
61+
client.publish(channel, data.to_json)
62+
end
63+
end
64+
end
65+
end
66+
end
67+
end

sync.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Gem::Specification.new do |s|
2323
s.add_development_dependency 'pry'
2424
s.add_development_dependency 'minitest', '< 5.0.0'
2525
s.add_development_dependency 'codeclimate-test-reporter'
26+
s.add_development_dependency 'stomp', '~> 1.3'
2627

2728
s.required_rubygems_version = ">= 1.3.4"
2829
end

test/fixtures/sync_stomp.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Stomp
2+
test:
3+
server: 'stomp://guest:guest@localhost:61613'
4+
api_key: 'guest'
5+
auth_token: 'guest'
6+
adapter: 'Stomp'
7+
encryption: false
8+
async: true
9+
websocket: 'http://localhost:15674/stomp'
10+
destination: '/topic/sync-'

0 commit comments

Comments
 (0)