Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument view #40

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ mpirun -n <N> python run_backend_reducer.py
```
where `<N>` is the desired number of MPI ranks. It *must* match the number of ranks used for the backend-live-listener.

**Visualizer:**
**Visualizers for instrumentview and histogram:**
```sh
python run_instrumentvisualizer.py
python run_visualizer.py
```

Expand Down
6 changes: 6 additions & 0 deletions backend/MPIDataSplit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@


def determine_data_split(index, num_processes):
target = index % num_processes
return target

4 changes: 3 additions & 1 deletion backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from backend_event_listener import BackendEventListener
from backend_mantid_reducer import BackendMantidReducer
from result_publisher import ResultPublisher
from histogram_publisher import HistogramPublisher
from instrumentview_publisher import InstrumentViewPublisher
from spectraview_publisher import SpectraViewPublisher
from zmq_queue import ZMQQueueServer
from zmq_queue import ZMQQueueClient
11 changes: 6 additions & 5 deletions backend/backend_event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from logger import log
from backend_worker import BackendWorker

import MPIDataSplit

# master connectes to main event stream and distributes it to all
# packets are put in a queue
Expand Down Expand Up @@ -60,10 +60,10 @@ def _distribute_stream(self, what, data):
split = []
for i in range(self._comm.size):
split.append([])
for i in data:
detector_id = int(i[0])
target = detector_id % self._comm.size
split[target].append(i)
for i in data:
detector_id = int(i[0])
target = MPIDataSplit.determine_data_split(detector_id, self._comm.size)
split[target].append(i)
else:
split = None
what = self._comm.scatter([what]*self._comm.size, root=0)
Expand All @@ -72,3 +72,4 @@ def _distribute_stream(self, what, data):
return what, data
else:
return what, numpy.array(data)

20 changes: 19 additions & 1 deletion backend/backend_mantid_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from mantid_filter_transition import MantidFilterTransition
from mantid_rebin_transition import MantidRebinTransition
from gather_histogram_transition import GatherHistogramTransition
from gather_spectra_transition import GatherSpectraTransition


class BackendMantidReducer(BackendWorker):
Expand All @@ -30,6 +31,10 @@ def __init__(self, data_queue_in):
self._rebin_transition = MantidRebinTransition(self._filter_transition)
self._gather_histogram_transition = GatherHistogramTransition(self._rebin_transition)
self._gather_histogram_transition.accumulate_data = True
self._rebin_for_instrumentview_transition = MantidRebinTransition(self._create_workspace_from_events_transition)
self._rebin_for_instrumentview_transition.set_bin_parameters('0,5,10') #set to single bin
self._gather_spectra_transition = GatherSpectraTransition(self._create_workspace_from_events_transition)
self._gather_spectra_transition.set_spectra_id('1')

def _process_command(self, command):
setattr(self, command[0], command[1])
Expand Down Expand Up @@ -76,8 +81,11 @@ def get_bin_boundaries(self):
def get_bin_values(self):
return self._rebin_transition.get_checkpoint()[-1].data.readY(0)

def get_spectra_id(self):
return self._gather_spectra_transition._spectra_id

def get_parameter_dict(self):
return {'bin_parameters':'str', 'filter_interval_parameters':'str', 'filter_pulses':'bool'}
return {'bin_parameters':'str', 'filter_interval_parameters':'str', 'filter_pulses':'bool', 'spectra_id':'str'}

@property
def bin_parameters(self):
Expand All @@ -89,6 +97,16 @@ def bin_parameters(self, parameters):
self._rebin_transition.set_bin_parameters(parameters)
self._lock.release()

@property
def spectra_id(self):
return self._gather_spectra_transition._spectra_id

@spectra_id.setter
def spectra_id(self, spectra):
self._lock.acquire()
self._gather_spectra_transition.set_spectra_id(spectra)
self._lock.release()

@property
def filter_pulses(self):
return self._filter_pulses
Expand Down
36 changes: 36 additions & 0 deletions backend/gather_spectra_transition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import numpy
from mpi4py import MPI
import random
from transition import Transition
from histogram_checkpoint import HistogramCheckpoint
import MPIDataSplit

class GatherSpectraTransition(Transition):
def __init__(self, parent):
self._comm = MPI.COMM_WORLD
self._spectra_id = '1' #also set as 1 in backendreducer
super(GatherSpectraTransition, self).__init__(parents=[parent])

def _do_transition(self, data):
index = int(self._spectra_id) #workspaceIndex
target = MPIDataSplit.determine_data_split(index, self._comm.size)
data = data[0].data
x = data.readX(index)
y = data.readY(index)
if self._comm.Get_rank() == target:
packet = numpy.concatenate((x, y))
requestXY = self._comm.isend(packet, dest=0)
requestXY.Wait()
if self._comm.Get_rank() == 0:
packet = self._comm.recv(source=target)
x,y = numpy.array_split(packet, 2)
return x, y, index



def set_spectra_id(self, spectra_id):
self._spectra_id = spectra_id
self.trigger_rerun()

def _create_checkpoint(self):
return HistogramCheckpoint()
64 changes: 64 additions & 0 deletions backend/histogram_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import time
import numpy
import zmq

from logger import log
import ports
from result_publisher import ResultPublisher


class HistogramPublisher(ResultPublisher):
def __init__(self, eventListener):
super(ResultPublisher, self).__init__(type(self).__name__)
self.eventListener = eventListener
self._update_rate = 1.0
self.socket = None
self._last_count = 0

def run(self):
log.info("Starting ResultPublisher")
self.connect()

self._publish_clear()
while True:
count = len(self.eventListener._gather_histogram_transition.get_checkpoint())
if count != self._last_count:
self._publish_clear()
self._last_count = count
for i in range(count):
if self.eventListener._gather_histogram_transition.get_checkpoint()[i]:
self._publish(i)
time.sleep(self.update_rate)

def connect(self):
context = zmq.Context()
self.socket = context.socket(zmq.PUB)
uri = 'tcp://*:{0:d}'.format(ports.result_stream)
self.socket.bind(uri)
log.info('Bound to ' + uri)

def _create_header(self, command, index):
return { 'command':command, 'index':index }

def _publish_clear(self):
header = self._create_header('clear', None)
self.socket.send_json(header)

def _publish(self, index):
boundaries, values = self.eventListener._gather_histogram_transition.get_checkpoint()[index].data
packet = numpy.concatenate((boundaries, values))
# print "this boundaries " +str(boundaries)
header = self._create_header('graphData', index)
self.socket.send_json(header, flags=zmq.SNDMORE)
self.socket.send(packet)

def get_parameter_dict(self):
return {'update_rate':'float'}

@property
def update_rate(self):
return self._update_rate

@update_rate.setter
def update_rate(self, update_rate):
self._update_rate = update_rate
65 changes: 65 additions & 0 deletions backend/instrumentview_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import time
import numpy
import zmq
import mantid.simpleapi as simpleapi
from logger import log
import ports
from controllable import Controllable


class InstrumentViewPublisher(Controllable):
def __init__(self, eventListener):
super(InstrumentViewPublisher, self).__init__(type(self).__name__)
self.eventListener = eventListener
self._update_rate = 1.0
self.socket = None
self._last_count = 0

def run(self):
log.info("Starting InstrumentViewPublisher")
self.connect()

self._publish_clear()
while True:
if self.eventListener._rebin_for_instrumentview_transition.get_checkpoint():
self._publish()
time.sleep(self.update_rate)

def connect(self):
context = zmq.Context()
self.socket = context.socket(zmq.PUB)
uri = 'tcp://*:{0:d}'.format(ports.instrumentview_result_stream)
self.socket.bind(uri)
log.info('Bound to ' + uri)

def _create_header(self, command, index):
return { 'command':command, 'index':index }

def _publish_clear(self):
header = self._create_header('clear', None)
self.socket.send_json(header)

def _publish(self):
currentws = simpleapi.CloneWorkspace(self.eventListener._rebin_for_instrumentview_transition.get_checkpoint().data)
concatPacket = []
concatIndex = []
for i in range(currentws.getNumberHistograms()):
seriesX = currentws.readX(i)
seriesY = currentws.readY(i)
seriesE = currentws.readE(i)
packet = numpy.concatenate((seriesX, seriesY, seriesE)) #and index?
concatPacket= numpy.concatenate((concatPacket, packet))
header = self._create_header('instrumentData', 1)
self.socket.send_json(header, flags=zmq.SNDMORE)
self.socket.send(concatPacket)

def get_parameter_dict(self):
return {'update_rate':'float'}

@property
def update_rate(self):
return self._update_rate

@update_rate.setter
def update_rate(self, update_rate):
self._update_rate = update_rate
55 changes: 19 additions & 36 deletions backend/result_publisher.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,46 @@
import time
import numpy
import zmq

from logger import log
import ports
from controllable import Controllable


class ResultPublisher(Controllable):
def __init__(self, eventListener):
super(ResultPublisher, self).__init__(type(self).__name__)
self.eventListener = eventListener
self._update_rate = 1.0
self.socket = None
self._last_count = 0
super(ResultPublisher, self).__init__(type(self).__name__)
self.eventListener = eventListener
self._update_rate = 1.0
self.socket = None
self._last_count = 0
self.ports = None

def run(self):
log.info("Starting ResultPublisher")
self.connect()

self._publish_clear()
while True:
count = len(self.eventListener._gather_histogram_transition.get_checkpoint())
if count != self._last_count:
self._publish_clear()
self._last_count = count
for i in range(count):
if self.eventListener._gather_histogram_transition.get_checkpoint()[i]:
self._publish(i)
time.sleep(self.update_rate)
pass

def connect(self):
context = zmq.Context()
self.socket = context.socket(zmq.PUB)
uri = 'tcp://*:{0:d}'.format(ports.result_stream)
self.socket.bind(uri)
log.info('Bound to ' + uri)
context = zmq.Context()
self.socket = context.socket(zmq.PUB)
uri = 'tcp://*:{0:d}'.format(self.ports)
self.socket.bind(uri)
log.info('Bound to ' + uri)

def _create_header(self, command, index):
return { 'command':command, 'index':index }
def _create_header(self,command,index):
return {'command':command, 'index':index}

def _publish_clear(self):
header = self._create_header('clear', None)
self.socket.send_json(header)
header = self._create_header('clear', None)
self.socket.send_json(header)

def _publish(self, index):
boundaries, values = self.eventListener._gather_histogram_transition.get_checkpoint()[index].data
packet = numpy.concatenate((boundaries, values))
header = self._create_header('data', index)
self.socket.send_json(header, flags=zmq.SNDMORE)
self.socket.send(packet)
pass

def get_parameter_dict(self):
return {'update_rate':'float'}
return {'update_rate':'float'}

@property
def update_rate(self):
return self._update_rate

@update_rate.setter
def update_rate(self, update_rate):
self._update_rate = update_rate
self._update_rate = update_rate
Loading