diff --git a/opcua/common/sqlite3_backend.py b/opcua/common/sqlite3_backend.py new file mode 100644 index 000000000..b2375cde7 --- /dev/null +++ b/opcua/common/sqlite3_backend.py @@ -0,0 +1,110 @@ + +import time +import sqlite3 +import threading +from multiprocessing import Lock + +class SQLite3Backend(object): + CHECKP_INTERVAL = 90 # [sec] WAL checkpoint + + def __init__(self, sqlFile = None, readonly=True): + assert(isinstance(sqlFile, str)) + assert(isinstance(readonly, bool)) + self._sqlFile = sqlFile # Path to database file. + self._readonly = bool(readonly) + self._lock = Lock() # Database lock. + self._conn = {} # Database connection. + self._lastCheckP = int(0) # Epoch of last checkpoint. + + def __enter__(self): + self._lastCheckP = time.time() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._db_disconnect() + + def __str__(self): + return self._sqlFile + + @property + def readonly(self): + return self._readonly + + # PUBLIC METHODS + def execute_read(self, dbCmd = None, params = (), CB = None): + with self._lock: + c = self._getConn().cursor() + for row in c.execute(dbCmd, params): + CB(row) + + def execute_write(self, dbCmd = None, params = ()): + with self._lock: + c = self._getConn().cursor() + c.execute(dbCmd, params) + + def commit(self): + with self._lock: + self._getConn().commit() + self._wal_throttled() + + def wal_checkpoint(self): + """ + Store checkpoint: forces database modifications to be persistent. + Automatically done when sqlite cache runs over the 1000 pages threshold. + IMPORTANT: slow operation, manual syncs are only useful for sporadic + transactions that you really want to survive a power loss. + """ + self._lastCheckP = time.time() + c = self._getConn().cursor() + c.execute('PRAGMA wal_checkpoint') + + # PRIVATE METHODS + def _wal_throttled(self): + # commits still require a wal_checkpoint to become persistent. + if abs(time.time() - self._lastCheckP) < self.CHECKP_INTERVAL: + return + self.wal_checkpoint() + + def _db_connect(self): + CID = SQLite3Backend._getCID() + # PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs + assert CID not in self._conn + self._conn[CID] = sqlite3.connect( + self._sqlFile, + detect_types = sqlite3.PARSE_DECLTYPES, + check_same_thread = False + ) + c = self._getConn().cursor() + if self.readonly is True: + c.execute('PRAGMA query_only=1') + else: + c.execute('PRAGMA journal_mode=WAL') + c.execute('PRAGMA synchronous=NORMAL') + + def _db_disconnect(self): + # Commit, checkpoint. + if self.readonly is False: + with self._lock: + self._getConn().commit() + self.wal_checkpoint() + # Close all connections to database. + for CID in self._conn: + self._conn[CID].close() + # Remove all items from dict. + self._conn.clear() + + def _getConn(self): + if self._lock.acquire(False) is True: + self._lock.release() + raise Exception('Forgot to lock?') + # sqlite3 multithreading: http://beets.io/blog/sqlite-nightmare.html + CID = SQLite3Backend._getCID() + try: + return self._conn[CID] + except KeyError: + self._db_connect() + return self._conn[CID] + + @staticmethod + def _getCID(): + return threading.current_thread().ident diff --git a/opcua/server/address_space_sqlite.py b/opcua/server/address_space_sqlite.py index 4c4a3135a..dbf6b1f70 100644 --- a/opcua/server/address_space_sqlite.py +++ b/opcua/server/address_space_sqlite.py @@ -1,12 +1,81 @@ -import sqlite3 +import time import datetime +from struct import pack from opcua import ua from opcua.ua.uatypes import NumericNodeId, NodeIdType from opcua.common.utils import Buffer from opcua.server.address_space import NodeData, AddressSpace, AttributeValue +class ReadOnlyException(Exception): + pass + + +class MonitoredAttribute(AttributeValue): + + def __init__(self, attr, onchange_cb): + self._value = attr.value + self.value_callback = attr.value_callback + self.datachange_callbacks = attr.datachange_callbacks + self.onchange_cb = onchange_cb + + @property + def value(self): + return self._value + + @value.setter + def value(self, newVal): + self._value = newVal + self.onchange_cb() + +class MonitoredNode(object): + + def __init__(self, aspace, ndata): + self._aspace = aspace + self._nodeid = AddressSpaceSQLite._nodeid_to_numeric(ndata.nodeid) + + @property + def aspace(self): + return self._aspace + + @property + def nodeid(self): + return self._nodeid + + +class MonitoredAttributeDict(MonitoredNode, dict): + + def __init__(self, aspace, ndata): + MonitoredNode.__init__(self, aspace, ndata) + for attrId, attr in ndata.attributes.items(): + self[attrId] = attr + + def __setitem__(self, attrId, attr): + def onchange_cb(): + self.aspace._insert_attribute_threadsafe(self.nodeid, attrId, self[attrId]) + mAttr = MonitoredAttribute(attr, onchange_cb) + dict.__setitem__(self, attrId, mAttr) + mAttr.onchange_cb() + + def __delitem__(self, attrId): + raise NotImplementedError + + +class MonitoredReferenceList(MonitoredNode, list): + + def __init__(self, aspace, ndata): + MonitoredNode.__init__(self, aspace, ndata) + list.__init__(self, ndata.references) + + def append(self, ref): + list.append(self, ref) + self._aspace._insert_reference_threadsafe(self.nodeid, ref) + + def remove(self, ref): + raise NotImplementedError + + class AddressSpaceSQLite(AddressSpace): """ Load the standard address space nodes from a SQLite database. @@ -14,47 +83,67 @@ class AddressSpaceSQLite(AddressSpace): """ ATTR_TABLE_NAME = 'Attributes' REFS_TABLE_NAME = 'ReferenceDescription' + CUR_TIME_NODEID = NumericNodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime, 0) - def __init__(self, cache=None, sqlFile=None): + def __init__(self, backend, cache=None): super(AddressSpaceSQLite, self).__init__(cache) - assert(isinstance(sqlFile, str)) - self._sqlFile = sqlFile - self._conn = None + self._backend = backend def __enter__(self): super(AddressSpaceSQLite, self).__enter__() - assert(self._conn is None) - self._conn = sqlite3.connect(self._sqlFile, \ - detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False) + AddressSpaceSQLite._create_attr_table(self.backend) + AddressSpaceSQLite._create_refs_table(self.backend) return self - def __exit__(self, exc_type, exc_value, traceback): - if self._conn: - self._conn.close() - self._conn = None - super(AddressSpaceSQLite, self).__exit__(exc_type, exc_value, traceback) + def __str__(self): + return str(self.backend) + + @property + def backend(self): + return self._backend + + @property + def readonly(self): + return self.backend.readonly def __getitem__(self, nodeid): with self._lock: - (nodeData, fromDisk) = self._getitem_sqlite(nodeid) + (nodeData, fromDisk) = self._getitem_backend(nodeid) return nodeData - def _getitem_sqlite(self, nodeid): + def _getitem_backend(self, nodeid): try: - if not hasattr(self._cache, '_getitem_sqlite'): + if not hasattr(self._cache, '_getitem_backend'): (nodeData, fromDisk) = (self._cache.__getitem__(nodeid), False) else: - (nodeData, fromDisk) = self._cache._getitem_sqlite(nodeid) + (nodeData, fromDisk) = self._cache._getitem_backend(nodeid) if fromDisk: - self._read_nodedata(self._conn, nodeid, nodeData) + AddressSpaceSQLite._read_nodedata(self.backend, nodeid, nodeData) + if not (self.readonly is True or hasattr(nodeData.attributes, 'aspace')): + self._monitor_nodedata(nodeData) except KeyError: (nodeData, fromDisk) = (NodeData(nodeid), True) - AddressSpaceSQLite._read_nodedata(self._conn, nodeid, nodeData) + AddressSpaceSQLite._read_nodedata(self.backend, nodeid, nodeData) if len(nodeData.attributes) is 0: raise + elif self.readonly is False: + self._monitor_nodedata(nodeData) self._cache[nodeid] = nodeData return (nodeData, fromDisk) + def _monitor_nodedata(self, ndata): + if self.readonly is True: + raise ReadOnlyException(ndata.nodeid) + elif hasattr(ndata.attributes, 'aspace') and ndata.attributes.aspace is not self: + other = str(ndata.attributes.aspace) + raise Exception('Node {:s} is monitored by {:s}'.format(str(ndata.nodeid), other)) + elif hasattr(ndata.references, 'aspace') and ndata.references.aspace is not self: + other = str(ndata.attributes.aspace) + raise Exception('Node {:s} is monitored by {:s}'.format(str(ndata.nodeid), other)) + else: + ndata.attributes = MonitoredAttributeDict(self, ndata) + ndata.references = MonitoredReferenceList(self, ndata) + def get(self, nodeid, value=None): try: return self[nodeid] @@ -64,10 +153,31 @@ def get(self, nodeid, value=None): def __contains__(self, nodeid): return self.get(nodeid) is not None - def __setitem__(self, key, value): - # TODO the item in the database it is not updated - super(AddressSpaceSQLite, self).__setitem__(key, value) - + def __setitem__(self, nodeid, ndata): + self._cache.__setitem__(nodeid, ndata) + if self.readonly is True: + return + with self._lock: + self._setitem_backend(nodeid, ndata) + + def _setitem_backend(self, nodeid, ndata): + if not hasattr(ndata.attributes, 'aspace'): + self._monitor_nodedata(ndata) + + if ndata.attributes.aspace is self: + self._write_nodedata(ndata) + self.backend.commit() + + @staticmethod + def _nodeid_to_numeric(nodeid): + assert(isinstance(nodeid, ua.uatypes.NodeId)) + # For database lookups, map TwoByte and FourByte onto NumericNodeId. + if nodeid.NodeIdType == NodeIdType.Numeric: + return nodeid + if nodeid.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte): + return NumericNodeId(nodeid.Identifier, nodeid.NamespaceIndex) + else: + raise Exception('NodeIdType {:d} is not supported for backend lookups.'.format(nodeid.NodeIdType)) def keys(self): raise Exception("dict.keys() is not supported for performance. Use iterator.") @@ -86,110 +196,123 @@ def __len__(self): def dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX): """ - Dump address space into an sqlite database; note that server must be stopped for this method to work + Dump address space into a database; note that server must be stopped for this method to work Note 1: DO NOT DUMP AN ADDRESS SPACE RESTORED FROM DATABASE, ONLY CACHED NODES WILL GET DUMPED! Note 2: If a NodeData instance holds a reference to a method call, it is not preserved. Note 3: numeric nodeid's are required for database searches. """ - # 1. Create tables. - AddressSpaceSQLite._create_attr_table(self._conn) - AddressSpaceSQLite._create_refs_table(self._conn) - with self._lock: - # 2. Populate. - for nodeid, ndata in self._cache.items(): - if nodeid.NamespaceIndex != namespaceidx: - for ref in ndata.references: - if ref.NodeId.NamespaceIndex == namespaceidx: - keyNodeId = AddressSpaceSQLite._nodeid_to_key(ndata.nodeid) - AddressSpaceSQLite._insert_reference(self._conn, keyNodeId, ref) - print('INTER_NAMESPACE REF {:s}/{:d}->{:d}'.format(str(nodeid.Identifier), nodeid.NamespaceIndex, ref.NodeId.NamespaceIndex)) - continue - assert(nodeid == ndata.nodeid) - assert(isinstance(ndata, NodeData)) - AddressSpaceSQLite._write_nodedata(self._conn, ndata) + self._dump(namespaceidx) + self.backend.commit() + print("Export to {:s} completed".format(str(self.backend))) - # 3. Integrity checks. - for nodeid, ndata in self._cache.items(): - if nodeid.NamespaceIndex != namespaceidx: + def _dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX): + # 1. Create tables. + AddressSpaceSQLite._create_attr_table(self.backend, drop=True) + AddressSpaceSQLite._create_refs_table(self.backend, drop=True) + + # 2. Populate. + a = time.time() + for nodeid, ndata in self._cache.items(): + assert(nodeid == ndata.nodeid) + assert(isinstance(ndata, NodeData)) + if nodeid.NamespaceIndex == namespaceidx: + self._write_nodedata(ndata) + continue + # inter-namespace references. + for ref in ndata.references: + if ref.NodeId.NamespaceIndex != namespaceidx: continue - ndata2 = NodeData(nodeid) - AddressSpaceSQLite._read_nodedata(self._conn, nodeid, ndata2) - AddressSpaceSQLite._cmp_nodedata(ndata, ndata2) - - self._conn.commit() - print("Export to {:s} completed".format(self._sqlFile)) + numNodeId = AddressSpaceSQLite._nodeid_to_numeric(ndata.nodeid) + self._insert_reference(numNodeId, ref) + print(time.time() - a) + + # 3. Integrity checks. + for nodeid, ndata in self._cache.items(): + if nodeid.NamespaceIndex != namespaceidx: + continue + ndata2 = NodeData(nodeid) + AddressSpaceSQLite._read_nodedata(self.backend, nodeid, ndata2) + AddressSpaceSQLite._cmp_nodedata(ndata, ndata2) # Write NodeData to database - @staticmethod - def _write_nodedata(conn, ndata): - assert(isinstance(ndata.nodeid, ua.uatypes.NodeId)) - keyNodeId = AddressSpaceSQLite._nodeid_to_key(ndata.nodeid) + def _write_nodedata(self, ndata): + numNodeId = AddressSpaceSQLite._nodeid_to_numeric(ndata.nodeid) + self._write_attributes(numNodeId, ndata) + self._write_references(numNodeId, ndata) - # Add attributes to database + def _write_attributes(self, nodeid, ndata): + assert(nodeid.NodeIdType == NodeIdType.Numeric) assert(isinstance(ndata.attributes, dict)) for attrId, attr in ndata.attributes.items(): - AddressSpaceSQLite._insert_attribute(conn, keyNodeId, attrId, attr) + AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr) - # Add references to database + def _write_references(self, nodeid, ndata): + assert(nodeid.NodeIdType == NodeIdType.Numeric) assert(isinstance(ndata.references, list)) for ref in ndata.references: - AddressSpaceSQLite._insert_reference(conn, keyNodeId, ref) - - @staticmethod - def _nodeid_to_key(nodeid): - # For database lookups, map TwoByte and FourByte onto NumericNodeId. - if nodeid.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte): - return NumericNodeId(nodeid.Identifier, nodeid.NamespaceIndex) - return nodeid + AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref) # Read NodeData from database @staticmethod - def _read_nodedata(conn, nodeid, nodeData, attrTable=ATTR_TABLE_NAME, refsTable=REFS_TABLE_NAME): - - _c_read = conn.cursor() - + def _read_nodedata(backend, nodeid, ndata): # Search key = numeric nodeid in opc-ua binary format - keyNodeId = AddressSpaceSQLite._nodeid_to_key(nodeid) - hexNodeId = ua.ua_binary.nodeid_to_binary(keyNodeId).hex() + numNodeId = AddressSpaceSQLite._nodeid_to_numeric(nodeid) + hexNodeId = ua.ua_binary.nodeid_to_binary(numNodeId).hex() - cmd1 = 'SELECT * FROM "{tn}" WHERE NodeId = x\'{h}\''.format(tn=attrTable, h=hexNodeId) - for row in _c_read.execute(cmd1): + AddressSpaceSQLite._read_attributes(backend, hexNodeId, ndata) + AddressSpaceSQLite._read_references(backend, hexNodeId, ndata) + + @staticmethod + def _read_attributes(backend, hexNodeId, ndata, attrTable=ATTR_TABLE_NAME): + cmd = 'SELECT * FROM "{tn}" WHERE NodeId = x\'{h}\''.format(tn=attrTable, h=hexNodeId) + def CB(row): (attrId, attr) = AddressSpaceSQLite._read_attribute_row(row) - nodeData.attributes[attrId] = attr + ndata.attributes[attrId] = attr + backend.execute_read(cmd, CB=CB) - cmd2 = 'SELECT * FROM "{tn}" WHERE NodeId = x\'{h}\''.format(tn=refsTable, h=hexNodeId) - referred_nodeids = [r.NodeId for r in nodeData.references] - for row in _c_read.execute(cmd2): + @staticmethod + def _read_references(backend, hexNodeId, ndata, refsTable=REFS_TABLE_NAME): + cmd = 'SELECT * FROM "{tn}" WHERE NodeId = x\'{h}\''.format(tn=refsTable, h=hexNodeId) + def CB(row): ref = AddressSpaceSQLite._read_reference_row(row) - #try: - # idx = referred_nodeids.index(ref.NodeId) - # print('DUPLICATE {:s}'.format(str(ref.NodeId))) - # nodeData.references[idx] = ref - #except ValueError: - nodeData.references.append(ref) + ndata.references.append(ref) + backend.execute_read(cmd, CB=CB) # Read and write from attribute table @staticmethod - def _create_attr_table(conn, table=ATTR_TABLE_NAME): + def _create_attr_table(backend, table=ATTR_TABLE_NAME, drop=False): ATTR_TABLE_COLS = [ - '_Id INTEGER PRIMARY KEY NOT NULL', # 0 - 'NodeId BLOB', # 1 - 'AttributeId INTEGER', # 2 - 'ServerTimestamp TIMESTAMP', # 3 - 'ServerPicoseconds INTEGER', # 4 - 'SourceTimestamp TIMESTAMP', # 5 - 'SourcePicoseconds INTEGER', # 6 - 'StatusCode INTEGER', # 7 - 'Variant BLOB', # 8 + '_Id BLOB PRIMARY KEY NOT NULL', # 0 + 'NodeId BLOB', # 1 + 'AttributeId INTEGER', # 2 + 'ServerTimestamp TIMESTAMP', # 3 + 'ServerPicoseconds INTEGER', # 4 + 'SourceTimestamp TIMESTAMP', # 5 + 'SourcePicoseconds INTEGER', # 6 + 'StatusCode INTEGER', # 7 + 'Variant BLOB', # 8 + 'Description STRING', # 9 ] - _c_new = conn.cursor() - _c_new.execute('DROP TABLE IF EXISTS "{tn}"'.format(tn=table)) - _c_new.execute('CREATE TABLE "{tn}" ({c})'.format(tn=table, c=', '.join(ATTR_TABLE_COLS))) + if drop is True: + dropCmd = 'DROP TABLE IF EXISTS "{tn}"'.format(tn=table) + backend.execute_write(dropCmd) + cmd = 'CREATE TABLE IF NOT EXISTS "{tn}" ({c})'.format(tn=table, c=', '.join(ATTR_TABLE_COLS)) + backend.execute_write(cmd) + + def _insert_attribute_threadsafe(self, nodeid, attrId, attr, table=ATTR_TABLE_NAME): + with self._lock: + if nodeid == AddressSpaceSQLite.CUR_TIME_NODEID: + pass # Prevents SD-card wear: don't write the time. + else: + AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr, table) + # CurrentTime-node updates result in commits at COMMIT_INTERVAL sec. + # Commits without previous actual transactions don't touch the file. + self.backend.commit() @staticmethod - def _insert_attribute(conn, nodeid, attrId, attr, table=ATTR_TABLE_NAME): - assert(isinstance(nodeid, ua.uatypes.NodeId)) + def _insert_attribute(backend, nodeid, attrId, attr, table=ATTR_TABLE_NAME): + assert(nodeid.NodeIdType == NodeIdType.Numeric) assert(isinstance(attrId, ua.AttributeIds)) assert(isinstance(attr, AttributeValue)) # Callback methods are not supported. @@ -211,18 +334,23 @@ def _insert_attribute(conn, nodeid, attrId, attr, table=ATTR_TABLE_NAME): assert(isinstance(attr.value.StatusCode, ua.uatypes.StatusCode)) assert(isinstance(attr.value.Value, ua.uatypes.Variant)) - _c_sub = conn.cursor() - _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL{q})'.format(tn=table, q=', ?'*8), - ( sqlite3.Binary(ua.ua_binary.nodeid_to_binary(nodeid)), - int(attrId), - attr.value.ServerTimestamp, - None if attr.value.ServerPicoseconds is None else int(attr.value.ServerPicoseconds), - attr.value.SourceTimestamp, - None if attr.value.SourcePicoseconds is None else int(attr.value.SourcePicoseconds), - int(attr.value.StatusCode.value), - sqlite3.Binary(ua.ua_binary.variant_to_binary(attr.value.Value)) - ) + binNodeId = ua.ua_binary.nodeid_to_binary(nodeid) + primaryKey = binNodeId + pack(">B", int(attrId)) + + cmd = 'INSERT OR REPLACE INTO "{tn}" VALUES ({q})'.format(tn=table, q=', '.join('?'*10)) + params = ( + memoryview(primaryKey), + memoryview(binNodeId), + int(attrId), + attr.value.ServerTimestamp, + None if attr.value.ServerPicoseconds is None else int(attr.value.ServerPicoseconds), + attr.value.SourceTimestamp, + None if attr.value.SourcePicoseconds is None else int(attr.value.SourcePicoseconds), + int(attr.value.StatusCode.value), + memoryview(ua.ua_binary.variant_to_binary(attr.value.Value)), + str(nodeid) ) + backend.execute_write(cmd, params=params) @staticmethod def _read_attribute_row(row): @@ -243,13 +371,13 @@ def _read_attribute_row(row): # Read and write from references table @staticmethod - def _create_refs_table(conn, table='ReferenceDescription'): + def _create_refs_table(backend, table='ReferenceDescription', drop=False): REFS_TABLE_COLS = [ - '_Id INTEGER PRIMARY KEY NOT NULL', # 0 + '_Id BLOB PRIMARY KEY NOT NULL', # 0 'NodeId BLOB', # 1 = the nodeid of this ReferenceDescription 'ReferenceTypeId BLOB', # 2 'IsForward INTEGER', # 3 - 'ReferenceNodeId BLOB', # 4 = referred nodeid of ReferenceDescription + 'ReferredNodeId BLOB', # 4 = referred nodeid of ReferenceDescription 'BrowseName_NamespaceIndex INTEGER', # 5 'BrowseName_Name TEXT', # 6 'DisplayName_Text TEXT', # 7 @@ -257,14 +385,23 @@ def _create_refs_table(conn, table='ReferenceDescription'): 'DisplayName_Encoding INTEGER', # 9 'NodeClass INTEGER', # 10 'TypeDefinition BLOB', # 11 + 'Description STRING' # 12 ] - _c_new = conn.cursor() - _c_new.execute('DROP TABLE IF EXISTS "{tn}"'.format(tn=table)) - _c_new.execute('CREATE TABLE "{tn}" ({c})'.format(tn=table, c=', '.join(REFS_TABLE_COLS))) + if drop is True: + dropCmd = 'DROP TABLE IF EXISTS "{tn}"'.format(tn=table) + backend.execute_write(dropCmd) + cmd = 'CREATE TABLE IF NOT EXISTS "{tn}" ({c})'.format(tn=table, c=', '.join(REFS_TABLE_COLS)) + backend.execute_write(cmd) + + def _insert_reference_threadsafe(self, nodeid, ref, table=REFS_TABLE_NAME): + with self._lock: + AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref, table) + self.backend.commit() @staticmethod - def _insert_reference(conn, nodeid, ref, table=REFS_TABLE_NAME): - assert(isinstance(nodeid, ua.uatypes.NodeId)) + def _insert_reference(backend, nodeid, ref, table=REFS_TABLE_NAME): + # NumericNodeId is required for searching. + assert(nodeid.NodeIdType == NodeIdType.Numeric) assert(isinstance(ref, ua.uaprotocol_auto.ReferenceDescription)) assert(isinstance(ref.ReferenceTypeId, ua.uatypes.NodeId)) assert(isinstance(ref.IsForward, bool)) @@ -282,21 +419,27 @@ def _insert_reference(conn, nodeid, ref, table=REFS_TABLE_NAME): assert(isinstance(ref.NodeClass, (int, ua.uaprotocol_auto.NodeClass))) assert(isinstance(ref.TypeDefinition, ua.uatypes.NodeId)) - _c_sub = conn.cursor() - _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL{q})'.format(tn=table, q=', ?'*11), - ( sqlite3.Binary(ua.ua_binary.nodeid_to_binary(nodeid)), - sqlite3.Binary(ua.ua_binary.nodeid_to_binary(ref.ReferenceTypeId)), - int(bool(ref.IsForward)), - sqlite3.Binary(ua.ua_binary.nodeid_to_binary(ref.NodeId)), - int(ref.BrowseName.NamespaceIndex), - None if ref.BrowseName.Name is None else str(ref.BrowseName.Name), - None if ref.DisplayName.Text is None else str(ref.DisplayName.Text), - None if ref.DisplayName.Locale is None else str(ref.DisplayName.Locale), - int(ref.DisplayName.Encoding), - int(ref.NodeClass), - sqlite3.Binary(ua.ua_binary.nodeid_to_binary(ref.TypeDefinition)), - ) + binNodeId = ua.ua_binary.nodeid_to_binary(nodeid) # Our own nodeid + refNodeId = ua.ua_binary.nodeid_to_binary(ref.NodeId) # Referred nodeid + primaryKey = binNodeId + refNodeId + pack(">B", int(ref.IsForward)) + + cmd = 'INSERT OR REPLACE INTO "{tn}" VALUES ({q})'.format(tn=table, q=', '.join('?'*13)) + params = ( + memoryview(primaryKey), + memoryview(binNodeId), + memoryview(ua.ua_binary.nodeid_to_binary(ref.ReferenceTypeId)), + int(bool(ref.IsForward)), + memoryview(refNodeId), + int(ref.BrowseName.NamespaceIndex), + None if ref.BrowseName.Name is None else str(ref.BrowseName.Name), + None if ref.DisplayName.Text is None else str(ref.DisplayName.Text), + None if ref.DisplayName.Locale is None else str(ref.DisplayName.Locale), + int(ref.DisplayName.Encoding), + int(ref.NodeClass), + memoryview(ua.ua_binary.nodeid_to_binary(ref.TypeDefinition)), + str(nodeid) ) + backend.execute_write(cmd, params=params) @staticmethod def _read_reference_row(row): diff --git a/opcua/server/standard_address_space/standard_address_space.sql b/opcua/server/standard_address_space/standard_address_space.sql index 81372a676..434da3bb6 100644 Binary files a/opcua/server/standard_address_space/standard_address_space.sql and b/opcua/server/standard_address_space/standard_address_space.sql differ diff --git a/opcua/ua/ua_binary.py b/opcua/ua/ua_binary.py index b01a720cb..6207666b3 100644 --- a/opcua/ua/ua_binary.py +++ b/opcua/ua/ua_binary.py @@ -148,7 +148,8 @@ def unpack_array(self, data, length): return None if length == 0: return () - return struct.unpack(self._fmt.format(length), data.read(self.size * length)) + fmt = bytes(self._fmt.format(length), 'utf-8') + return struct.unpack(fmt, data.read(self.size * length)) class Primitives1(object): diff --git a/schemas/generate_address_space.py b/schemas/generate_address_space.py index 8aea00e46..4dc35788e 100644 --- a/schemas/generate_address_space.py +++ b/schemas/generate_address_space.py @@ -328,7 +328,8 @@ def save_aspace_to_disk(): path = os.path.join("..", "opcua", "server", "standard_address_space", "standard_address_space.sql") print("Saving standard address space to:", path) from opcua.server.address_space_sqlite import AddressSpaceSQLite - with AddressSpaceSQLite(cache=aspace, sqlFile=path) as aspace_sql: + from opcua.common.sqlite3_backend import SQLite3Backend + with SQLite3Backend(sqlFile=path, readonly=False) as backend, AddressSpaceSQLite(backend, cache=aspace) as aspace_sql: aspace_sql.dump(namespaceidx=0) if __name__ == "__main__":