import opclabs_quickopc
from OpcLabs.EasyOpc.UA.Discovery import *
from OpcLabs.EasyOpc.UA.Gds import *
from OpcLabs.EasyOpc.UA.Engine import *
from OpcLabs.EasyOpc.UA import *
from OpcLabs.EasyOpc.UA.Application import *
from OpcLabs.EasyOpc.UA.Application.Extensions import *
from OpcLabs.EasyOpc.UA.Extensions import *
from OpcLabs.EasyOpc.UA.OperationModel import *
from OpcLabs.EasyOpc.UA import EasyUAClient, UAEndpointDescriptor, UANodeDescriptor, EasyUAClientCore
from OpcLabs.EasyOpc.UA.Engine import UAEngineException
from OpcLabs.EasyOpc.UA import UAServiceException
def setup_client(opc_server_url):
client = EasyUAClient()
try:
shared_parameters = client.SharedParameters
engine_parameters = shared_parameters.EngineParameters
certificate_acceptance_policy = engine_parameters.CertificateAcceptancePolicy
certificate_acceptance_policy.TrustedEndpointUrlStrings.Add(opc_server_url)
except Exception as ex:
handleError(f'Client Setup Error for {opc_server_url}', ex)
raise
return client
def readOpcUA(tags):
results = []
try:
EasyUAApplication.Instance.ApplicationParameters.ApplicationManifest.ApplicationName = \
'PlantSharpEdgeGateway'
arranged_tags = arrange_tags_by_url(tags)
for opc_server_url, server_info in arranged_tags.items():
try:
ci_print(f"Processing OPC UA server: {opc_server_url}")
user_name = server_info.get("OpcServerUserName")
password = 'xxxxxxx'
# Define which GDS we will work with.
gdsEndpointDescriptor = UAEndpointDescriptor(opc_server_url)
gdsEndpointDescriptor = UAEndpointDescriptorExtension.WithUserNameIdentity(gdsEndpointDescriptor, user_name, password)
node_ids = [tag['TagAddress'] for tag in server_info['tags']]
nodes = [UANodeDescriptor(node_id) for node_id in node_ids]
time = str(datetime.now(tzlocal.get_localzone()))
client = setup_client(opc_server_url)
try:
attributeDataResultArray = IEasyUAClientExtension.ReadMultiple(client, gdsEndpointDescriptor, nodes)
except UAEngineException as ua_ex:
handleError(f'OPC-UA Engine Error for {opc_server_url} - {user_name}', ua_ex)
except UAServiceException as service_ex:
handleError(f'OPC-UA Service Error for {opc_server_url} - {user_name}', service_ex)
except Exception as ex:
handleError(f'General Error during ReadMultiple for {opc_server_url} - {user_name}', ex)
except Exception as ex:
handleError(f'Error processing server {opc_server_url} - {user_name}', ex)
return results
except Exception as ex:
handleError('Error in readOpcUA function', ex)
return results