Skip to content

Commit 98c6ebc

Browse files
mrakitinTST workflow account
authored andcommitted
WIP changes from 6/30/25 pair coding
1 parent db3aec1 commit 98c6ebc

File tree

3 files changed

+23
-49
lines changed

3 files changed

+23
-49
lines changed

src/bluesky/callbacks/nohup.out

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Connecting...
2+
Receiving on port ipc:///tmp/input.sock:5555; publishing to port ipc:///tmp/output.sock:5555.
3+
Use Ctrl+C to exit.

src/bluesky/callbacks/zmq.py

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ class Bluesky0MQDecodeError(Exception):
2828
...
2929

3030

31-
class Protocols(Enum):
32-
TCP = "tcp"
33-
IPC = "ipc"
34-
35-
3631
class Publisher:
3732
"""
3833
A callback that publishes documents to a 0MQ proxy.
@@ -65,7 +60,7 @@ class Publisher:
6560
>>> RE.subscribe(publisher)
6661
"""
6762

68-
def __init__(self, address, *, prefix=b"", RE=None, zmq=None, serializer=pickle.dumps, protocol=Protocols.TCP):
63+
def __init__(self, address, *, prefix=b"", RE=None, zmq=None, serializer=pickle.dumps):
6964
if RE is not None:
7065
warnings.warn( # noqa: B028
7166
"The RE argument to Publisher is deprecated and "
@@ -81,18 +76,13 @@ def __init__(self, address, *, prefix=b"", RE=None, zmq=None, serializer=pickle.
8176
if zmq is None:
8277
import zmq
8378

84-
if isinstance(address, tuple):
85-
url = f"{protocol.value}://{address[0]}:{address[1]}"
86-
else:
87-
url = f"{protocol.value}://address"
88-
89-
self.address = url
79+
self.address = address
9080
self.RE = RE
9181

9282
self._prefix = bytes(prefix)
9383
self._context = zmq.Context()
9484
self._socket = self._context.socket(zmq.PUB)
95-
self._socket.connect(url)
85+
self._socket.connect(address)
9686
if RE:
9787
self._subscription_token = RE.subscribe(self)
9888
self._serializer = serializer
@@ -157,40 +147,28 @@ class Proxy:
157147
>>> proxy.start() # runs until interrupted
158148
"""
159149

160-
def __init__(self, in_port=None, out_port=None, *, zmq=None, protocol=Protocols.TCP):
150+
def __init__(self, in_address=None, out_address=None, *, zmq=None):
161151
if zmq is None:
162152
import zmq
163153
self.zmq = zmq
164154
self.closed = False
165-
self.input_path = None
166-
self.output_path = None
167155
try:
168156
context = zmq.Context(1)
169157
# Socket facing clients
170158
frontend = context.socket(zmq.SUB)
171-
if protocol is Protocols.TCP:
172-
if in_port is None:
173-
in_port = frontend.bind_to_random_port(f"{protocol.value}://*")
174-
else:
175-
frontend.bind(f"{protocol.value}://*:{in_port}")
176-
elif protocol is Protocols.IPC:
177-
if in_port is None:
178-
in_port = f"/tmp/{uuid.uuid4()}"
179-
frontend.bind(f"{protocol.value}://{in_port}")
159+
if in_address is None:
160+
in_port = frontend.bind_to_random_port(f"tcp://*")
161+
else:
162+
in_port = frontend.bind(in_address)
180163

181164
frontend.setsockopt_string(zmq.SUBSCRIBE, "")
182165

183166
# Socket facing services
184167
backend = context.socket(zmq.PUB)
185-
if protocol is Protocols.TCP:
186-
if out_port is None:
187-
out_port = backend.bind_to_random_port(f"{protocol.value}://*")
188-
else:
189-
backend.bind(f"{protocol.value}://*:{out_port}")
190-
elif protocol is Protocols.IPC:
191-
if out_port is None:
192-
out_port = f"/tmp/{uuid.uuid4()}"
193-
frontend.bind(f"{protocol.value}://{out_port}")
168+
if out_address is None:
169+
out_port = backend.bind_to_random_port(f"tcp://*")
170+
else:
171+
out_port = backend.bind(out_address)
194172

195173
except BaseException:
196174
# Clean up whichever components we have defined so far.
@@ -271,7 +249,6 @@ def __init__(
271249
zmq_asyncio=None,
272250
deserializer=pickle.loads,
273251
strict=False,
274-
protocol=Protocols.TCP,
275252
):
276253
if isinstance(prefix, str):
277254
raise ValueError("prefix must be bytes, not string")
@@ -282,12 +259,8 @@ def __init__(
282259
import zmq
283260
if zmq_asyncio is None:
284261
import zmq.asyncio as zmq_asyncio
285-
if isinstance(address, tuple):
286-
url = f"{protocol.value}://{address[0]}:{address[1]}"
287-
else:
288-
url = f"{protocol.value}://address"
289262
self._deserializer = deserializer
290-
self.address = url
263+
self.address = address
291264

292265
if loop is None:
293266
loop = asyncio.new_event_loop()

src/bluesky/commandline/zmq_proxy.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
logger = logging.getLogger("bluesky")
88

99

10-
def start_dispatcher(host, port, logfile=None):
10+
def start_dispatcher(out_address, logfile=None):
1111
"""The dispatcher function
1212
Parameters
1313
----------
1414
logfile : string
1515
string come from user command. ex --logfile=temp.log
1616
logfile will be "temp.log". logfile could be empty.
1717
"""
18-
dispatcher = RemoteDispatcher((host, port))
18+
dispatcher = RemoteDispatcher(out_address)
1919
if logfile is not None:
2020
raise ValueError(
2121
"Parameter 'logfile' is deprecated and will be removed in future releases. "
@@ -41,8 +41,8 @@ def log_writer(name, doc):
4141
def main():
4242
DESC = "Start a 0MQ proxy for publishing bluesky documents over a network."
4343
parser = argparse.ArgumentParser(description=DESC)
44-
parser.add_argument("in_port", type=int, nargs=1, help="port that RunEngines should broadcast to")
45-
parser.add_argument("out_port", type=int, nargs=1, help="port that subscribers should subscribe to")
44+
parser.add_argument("--in-address", help="port that RunEngines should broadcast to")
45+
parser.add_argument("--out-address", help="port that subscribers should subscribe to")
4646
parser.add_argument(
4747
"--verbose",
4848
"-v",
@@ -51,8 +51,6 @@ def main():
5151
)
5252
parser.add_argument("--logfile", type=str, help="Redirect logging output to a file on disk.")
5353
args = parser.parse_args()
54-
in_port = args.in_port[0]
55-
out_port = args.out_port[0]
5654

5755
if args.verbose:
5856
from bluesky.log import config_bluesky_logging
@@ -64,11 +62,11 @@ def main():
6462
else:
6563
config_bluesky_logging(level=level)
6664
# Set daemon to kill all threads upon IPython exit
67-
threading.Thread(target=start_dispatcher, args=("localhost", out_port), daemon=True).start()
65+
threading.Thread(target=start_dispatcher, args=(args.out_address), daemon=True).start()
6866

6967
print("Connecting...")
70-
proxy = Proxy(in_port, out_port)
71-
print("Receiving on port %d; publishing to port %d." % (in_port, out_port))
68+
proxy = Proxy(args.in_address, args.out_address)
69+
print("Receiving on address %s; publishing to address %s." % (args.in_address, args.out_address))
7270
print("Use Ctrl+C to exit.")
7371
try:
7472
proxy.start()

0 commit comments

Comments
 (0)