Skip to content

Commit 09e6498

Browse files
committed
pickle shelffile -> standard_address_space.sql
Refactoring the shelffile from a monolithic blob to sqlite3 db. Advantages over pickle: - Fast - Small (25% of fill_address_space code) - Not dependent on Python2/3 version - Supports transactional read/(write TODO) - Supports real-time persistence of user address-space (TODO) - Strong typed: only INTEGER, TEXT and opc-ua to_binary() BLOB - Drop-in replacement for memory-based AddressSpace Built-in integrity check on generate_address_space.py dump.
1 parent 82ba92c commit 09e6498

File tree

6 files changed

+356
-153
lines changed

6 files changed

+356
-153
lines changed

opcua/common/utils.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import functools
1010
import threading
1111
from socket import error as SocketError
12+
from collections import MutableMapping
1213

1314
try:
1415
import asyncio
@@ -206,4 +207,53 @@ def run_until_complete(self, coro):
206207
return future.result()
207208

208209

210+
class ThreadSafeDict(MutableMapping):
209211

212+
def __init__(self, cache=None):
213+
self._lock = threading.RLock() # FIXME: should use multiple reader, one writter pattern
214+
if cache is None:
215+
self._cache = {}
216+
else:
217+
assert(isinstance(cache, (dict, ThreadSafeDict)))
218+
self._cache = cache
219+
220+
def __enter__(self):
221+
return self
222+
223+
def __exit__(self, exc_type, exc_value, traceback):
224+
self._cache = None
225+
226+
def __getitem__(self, key):
227+
with self._lock:
228+
return self._cache.__getitem__(key)
229+
230+
def get(self, key, value=None):
231+
with self._lock:
232+
return self._cache.get(key, value)
233+
234+
def __setitem__(self, key, value):
235+
with self._lock:
236+
return self._cache.__setitem__(key, value)
237+
238+
def __contains__(self, key):
239+
with self._lock:
240+
return self._cache.__contains__(key)
241+
242+
def __delitem__(self, key):
243+
with self._lock:
244+
del self._cache[key]
245+
246+
def __iter__(self):
247+
with self._lock:
248+
return iter(self._cache.keys())
249+
250+
def __len__(self):
251+
return len(self._cache)
252+
253+
def keys(self):
254+
with self._lock:
255+
return self._cache.keys()
256+
257+
def empty(self):
258+
with self._lock:
259+
self._cache = {}

opcua/server/address_space.py

Lines changed: 13 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
1-
from threading import RLock
21
import logging
32
from datetime import datetime
4-
import collections
5-
import shelve
6-
try:
7-
import cPickle as pickle
8-
except:
9-
import pickle
103

114
from opcua import ua
125
from opcua.server.user_manager import UserManager
6+
from opcua.common.utils import ThreadSafeDict
137

148

159
class AttributeValue(object):
@@ -478,51 +472,30 @@ def _call(self, method):
478472
return res
479473

480474

481-
class AddressSpace(object):
475+
class AddressSpace(ThreadSafeDict):
482476

483477
"""
484478
The address space object stores all the nodes of the OPC-UA server
485479
and helper methods.
486480
The methods are thread safe
487481
"""
488482

489-
def __init__(self):
483+
def __init__(self, cache=None):
484+
super(AddressSpace, self).__init__(cache)
490485
self.logger = logging.getLogger(__name__)
491-
self._nodes = {}
492-
self._lock = RLock() # FIXME: should use multiple reader, one writter pattern
493486
self._datachange_callback_counter = 200
494487
self._handle_to_attribute_map = {}
495488
self._default_idx = 2
496489
self._nodeid_counter = {0: 20000, 1: 2000}
497490

498-
def __getitem__(self, nodeid):
499-
with self._lock:
500-
return self._nodes.__getitem__(nodeid)
501-
502-
def get(self, nodeid):
503-
with self._lock:
504-
return self._nodes.get(nodeid, None)
505-
506-
def __setitem__(self, nodeid, value):
507-
with self._lock:
508-
return self._nodes.__setitem__(nodeid, value)
509-
510-
def __contains__(self, nodeid):
511-
with self._lock:
512-
return self._nodes.__contains__(nodeid)
513-
514-
def __delitem__(self, nodeid):
515-
with self._lock:
516-
self._nodes.__delitem__(nodeid)
517-
518491
def generate_nodeid(self, idx=None):
519492
if idx is None:
520493
idx = self._default_idx
521494
if idx in self._nodeid_counter:
522495
self._nodeid_counter[idx] += 1
523496
else:
524497
# get the biggest identifier number from the existed nodes in address space
525-
identifier_list = sorted([nodeid.Identifier for nodeid in self._nodes.keys()
498+
identifier_list = sorted([nodeid.Identifier for nodeid in self.keys()
526499
if nodeid.NamespaceIndex == idx and nodeid.NodeIdType
527500
in (ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte)])
528501
if identifier_list:
@@ -532,111 +505,19 @@ def generate_nodeid(self, idx=None):
532505
nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
533506
with self._lock: # OK since reentrant lock
534507
while True:
535-
if nodeid in self._nodes:
508+
if nodeid in self.keys():
536509
nodeid = self.generate_nodeid(idx)
537510
else:
538511
return nodeid
539512

540-
def keys(self):
541-
with self._lock:
542-
return self._nodes.keys()
543-
544-
def empty(self):
545-
"""
546-
Delete all nodes in address space
547-
"""
548-
with self._lock:
549-
self._nodes = {}
550-
551-
def dump(self, path):
552-
"""
553-
Dump address space as binary to file; note that server must be stopped for this method to work
554-
DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
555-
"""
556-
# prepare nodes in address space for being serialized
557-
for nodeid, ndata in self._nodes.items():
558-
# if the node has a reference to a method call, remove it so the object can be serialized
559-
if ndata.call is not None:
560-
self._nodes[nodeid].call = None
561-
562-
with open(path, 'wb') as f:
563-
pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
564-
565-
def load(self, path):
566-
"""
567-
Load address space from a binary file, overwriting everything in the current address space
568-
"""
569-
with open(path, 'rb') as f:
570-
self._nodes = pickle.load(f)
571-
572-
def make_aspace_shelf(self, path):
573-
"""
574-
Make a shelf for containing the nodes from the standard address space; this is typically only done on first
575-
start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
576-
by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
577-
is currently NOT supported, it is only used for the default OPC UA standard address space
578-
579-
Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
580-
"""
581-
s = shelve.open(path, "n", protocol=pickle.HIGHEST_PROTOCOL)
582-
for nodeid, ndata in self._nodes.items():
583-
s[nodeid.to_string()] = ndata
584-
s.close()
585-
586-
def load_aspace_shelf(self, path):
587-
"""
588-
Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
589-
The dump() method can no longer be used if the address space is being loaded from a shelf
590-
591-
Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
592-
"""
593-
class LazyLoadingDict(collections.MutableMapping):
594-
"""
595-
Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
596-
shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
597-
is currently NOT supported
598-
"""
599-
def __init__(self, source):
600-
self.source = source # python shelf
601-
self.cache = {} # internal dict
602-
603-
def __getitem__(self, key):
604-
# try to get the item (node) from the cache, if it isn't there get it from the shelf
605-
try:
606-
return self.cache[key]
607-
except KeyError:
608-
node = self.cache[key] = self.source[key.to_string()]
609-
return node
610-
611-
def __setitem__(self, key, value):
612-
# add a new item to the cache; if this item is in the shelf it is not updated
613-
self.cache[key] = value
614-
615-
def __contains__(self, key):
616-
return key in self.cache or key.to_string() in self.source
617-
618-
def __delitem__(self, key):
619-
# only deleting items from the cache is allowed
620-
del self.cache[key]
621-
622-
def __iter__(self):
623-
# only the cache can be iterated over
624-
return iter(self.cache.keys())
625-
626-
def __len__(self):
627-
# only returns the length of items in the cache, not unaccessed items in the shelf
628-
return len(self.cache)
629-
630-
self._nodes = LazyLoadingDict(shelve.open(path, "r"))
631-
632513
def get_attribute_value(self, nodeid, attr):
633514
with self._lock:
634515
self.logger.debug("get attr val: %s %s", nodeid, attr)
635-
if nodeid not in self._nodes:
516+
if nodeid not in self.keys():
636517
dv = ua.DataValue()
637518
dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
638519
return dv
639-
node = self._nodes[nodeid]
520+
node = self[nodeid]
640521
if attr not in node.attributes:
641522
dv = ua.DataValue()
642523
dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
@@ -649,7 +530,7 @@ def get_attribute_value(self, nodeid, attr):
649530
def set_attribute_value(self, nodeid, attr, value):
650531
with self._lock:
651532
self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
652-
node = self._nodes.get(nodeid, None)
533+
node = self.get(nodeid, None)
653534
if node is None:
654535
return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
655536
attval = node.attributes.get(attr, None)
@@ -673,9 +554,9 @@ def set_attribute_value(self, nodeid, attr, value):
673554
def add_datachange_callback(self, nodeid, attr, callback):
674555
with self._lock:
675556
self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
676-
if nodeid not in self._nodes:
557+
if nodeid not in self.keys():
677558
return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
678-
node = self._nodes[nodeid]
559+
node = self[nodeid]
679560
if attr not in node.attributes:
680561
return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
681562
attval = node.attributes[attr]
@@ -689,9 +570,9 @@ def delete_datachange_callback(self, handle):
689570
with self._lock:
690571
if handle in self._handle_to_attribute_map:
691572
nodeid, attr = self._handle_to_attribute_map.pop(handle)
692-
self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
573+
self[nodeid].attributes[attr].datachange_callbacks.pop(handle)
693574

694575
def add_method_callback(self, methodid, callback):
695576
with self._lock:
696-
node = self._nodes[methodid]
577+
node = self[methodid]
697578
node.call = callback

0 commit comments

Comments
 (0)