1
1
# EventLoop support
2
2
# Crystal Update for zmq sockets
3
- module Crystal::EventLoop
4
- def self.create_fd_write_event (sock : ZMQ ::Socket , edge_triggered : Bool = false )
5
- flags = LibEvent2 ::EventFlags ::Write
6
- flags |= LibEvent2 ::EventFlags ::Persist | LibEvent2 ::EventFlags ::ET # we always do this as ZMQ is edge triggers
7
- event = @@eb .new_event(sock.fd, flags, sock) do |_ , sflags , data |
8
- sock_ref = data.as(ZMQ ::Socket )
9
- zmq_events = sock_ref.events
10
- is_writable = zmq_events & ZMQ ::POLLOUT
11
- if is_writable && sflags.includes?(LibEvent2 ::EventFlags ::Write )
12
- sock_ref.resume_write
13
- elsif sflags.includes?(LibEvent2 ::EventFlags ::Timeout )
14
- sock_ref.resume_write(timed_out: true )
15
- end
3
+ class Crystal::LibEvent::EventLoop
4
+ def create_fd_write_event (sock : ZMQ ::Socket , edge_triggered : Bool = false )
5
+ flags = LibEvent2 ::EventFlags ::Write
6
+ flags |= LibEvent2 ::EventFlags ::Persist | LibEvent2 ::EventFlags ::ET # we always do this as ZMQ is edge triggers
7
+ event = event_base.new_event(sock.fd, flags, sock) do |_ , sflags , data |
8
+ sock_ref = data.as(ZMQ ::Socket )
9
+ zmq_events = sock_ref.events
10
+ is_writable = zmq_events & ZMQ ::POLLOUT
11
+ if is_writable && sflags.includes?(LibEvent2 ::EventFlags ::Write )
12
+ sock_ref.resume_write
13
+ elsif sflags.includes?(LibEvent2 ::EventFlags ::Timeout )
14
+ sock_ref.resume_write(timed_out: true )
16
15
end
17
- event
16
+ end
17
+ event
18
18
end
19
- def self.create_fd_read_event (sock : ZMQ :: Socket , edge_triggered : Bool = false )
20
- flags = LibEvent2 :: EventFlags :: Read
21
- flags | = LibEvent2 ::EventFlags ::Persist | LibEvent2 :: EventFlags :: ET # we always do this as ZMQ is edge triggers
22
- event = @@eb .new_event(sock.fd, flags, sock) do | _ , sflags , data |
23
- sock_ref = data.as( ZMQ :: Socket )
24
- zmq_events = sock_ref.events
25
- is_readable = zmq_events & ZMQ :: POLLIN
26
- if is_readable && sflags.includes?( LibEvent2 :: EventFlags :: Read )
27
- sock_ref.resume_read
28
- elsif sflags.includes?( LibEvent2 :: EventFlags :: Timeout )
29
- sock_ref.resume_read( timed_out: true )
30
- end
19
+
20
+ def create_fd_read_event (sock : ZMQ :: Socket , edge_triggered : Bool = false )
21
+ flags = LibEvent2 ::EventFlags ::Read
22
+ flags |= LibEvent2 :: EventFlags :: Persist | LibEvent2 :: EventFlags :: ET # we always do this as ZMQ is edge triggers
23
+ event = event_base.new_event(sock.fd, flags, sock) do | _ , sflags , data |
24
+ sock_ref = data.as( ZMQ :: Socket )
25
+ zmq_events = sock_ref.events
26
+ is_readable = zmq_events & ZMQ :: POLLIN
27
+ if is_readable && sflags.includes?( LibEvent2 :: EventFlags :: Read )
28
+ sock_ref.resume_read
29
+ elsif sflags.includes?( LibEvent2 :: EventFlags :: Timeout )
30
+ sock_ref.resume_read( timed_out: true )
31
31
end
32
- event
32
+ end
33
+ event
33
34
end
34
- def self.stop_loop
35
- @@eb .loop_break
35
+
36
+ def stop_loop
37
+ event_base.loop_break
36
38
end
37
39
end
38
40
@@ -43,8 +45,8 @@ module ZMQ
43
45
getter name : String
44
46
getter? closed
45
47
46
- @read_event = Crystal ::ThreadLocalValue (Crystal ::Event ).new
47
- @write_event = Crystal ::ThreadLocalValue (Crystal ::Event ).new
48
+ @read_event = Crystal ::ThreadLocalValue (Crystal ::EventLoop :: Event ).new
49
+ @write_event = Crystal ::ThreadLocalValue (Crystal ::EventLoop :: Event ).new
48
50
49
51
def self.create (context : Context , type : Int32 , message_type = Message ) : self
50
52
new context, type , message_type
@@ -80,13 +82,13 @@ module ZMQ
80
82
81
83
# libevent support
82
84
private def add_read_event (timeout = @read_timeout )
83
- event = @read_event .get { Crystal :: EventLoop . create_fd_read_event(self ,true ) }
85
+ event = @read_event .get { Thread .current.scheduler. @event_loop . create_fd_read_event(self , true ) }
84
86
event.add timeout
85
87
nil
86
88
end
87
89
88
90
private def add_write_event (timeout = @write_timeout )
89
- event = @write_event .get { Crystal :: EventLoop . create_fd_write_event(self ,true ) }
91
+ event = @write_event .get { Thread .current.scheduler. @event_loop . create_fd_write_event(self , true ) }
90
92
event.add timeout
91
93
nil
92
94
end
@@ -137,11 +139,11 @@ module ZMQ
137
139
message = @message_type .new
138
140
rc = LibZMQ .msg_recv(message.address, @socket , flags | ZMQ ::DONTWAIT )
139
141
if rc == -1
140
- if Util .errno == Errno ::EAGAIN .to_i
141
- wait_readable
142
- else
143
- raise Util .error_string
144
- end
142
+ if Util .errno == Errno ::EAGAIN .to_i
143
+ wait_readable
144
+ else
145
+ raise Util .error_string
146
+ end
145
147
else
146
148
return message
147
149
end
@@ -159,7 +161,7 @@ module ZMQ
159
161
def receive_strings (flags = 0 )
160
162
receive_messages(flags).map do |msg |
161
163
str = msg.to_s
162
- msg.close()
164
+ msg.close
163
165
str
164
166
end
165
167
end
@@ -182,16 +184,16 @@ module ZMQ
182
184
message = @message_type .new
183
185
rc = LibZMQ .msg_recv(message.address, @socket , flags)
184
186
if Util .resultcode_ok?(rc)
185
- messages << message
186
- return messages unless more_parts?
187
+ messages << message
188
+ return messages unless more_parts?
187
189
else
188
190
message.close
189
191
messages.map(& .close)
190
192
return messages.clear
191
193
end
192
194
end
193
195
else
194
- return messages
196
+ return messages
195
197
end
196
198
end
197
199
end
@@ -295,10 +297,12 @@ module ZMQ
295
297
def finalize
296
298
close
297
299
end
300
+
298
301
# file descriptor
299
302
def fd
300
303
get_socket_option(ZMQ ::FD ).as(Int32 )
301
304
end
305
+
302
306
# event list
303
307
def events
304
308
get_socket_option(ZMQ ::EVENTS ).as(Int32 )
0 commit comments