Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions brubeck/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ class Connection(object):
response is necessary.
"""

def __init__(self, incoming=None, outgoing=None):
def __init__(self, incoming=None, outgoing=None, logger=None):
"""The base `__init__()` function configures a unique ID and assigns
the incoming and outgoing mechanisms to a name.

`in_sock` and `out_sock` feel like misnomers at this time but they are
preserved for a transition period.

logger may be a zeromq PUB socket.
"""
self.sender_id = uuid4().hex
self.in_sock = incoming
self.out_sock = outgoing
if logging:
self.logger = logger

def _unsupported(self, name):
"""Simple function that raises an exception.
Expand Down Expand Up @@ -130,7 +134,7 @@ class Mongrel2Connection(Connection):
"""
MAX_IDENTS = 100

def __init__(self, pull_addr, pub_addr):
def __init__(self, pull_addr, pub_addr, logger=None):
"""sender_id = uuid.uuid4() or anything unique
pull_addr = pull socket used for incoming messages
pub_addr = publish socket used for outgoing messages
Expand All @@ -144,7 +148,7 @@ def __init__(self, pull_addr, pub_addr):
in_sock = ctx.socket(zmq.PULL)
out_sock = ctx.socket(zmq.PUB)

super(Mongrel2Connection, self).__init__(in_sock, out_sock)
super(Mongrel2Connection, self).__init__(in_sock, out_sock, logger=logger)
self.in_addr = pull_addr
self.out_addr = pub_addr

Expand All @@ -171,6 +175,8 @@ def recv(self):
zeromq socket and return whatever is found.
"""
zmq_msg = self.in_sock.recv()
if self.logger:
self.logger.send(zmq_msg)
return zmq_msg

def recv_forever_ever(self, application):
Expand Down
5 changes: 3 additions & 2 deletions demos/m2reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import zmq

ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.connect("ipc://127.0.0.1:9999")
s = ctx.socket(zmq.SUB)
s.connect("tcp://127.0.0.1:9955")
s.setsockopt(zmq.SUBSCRIBE, "")

while True:
msg = s.recv()
Expand Down