2022年 11月 8日

Python与数据库的交互方法–pyDAL库

前言,如果能把这篇文章坚持看完,那关于pyDAL,以及python与数据库的交互就完全掌握了!!! 

1.安pyDAL库

pip install pyDAL

2.pyDAL库的强大之处在于,它可以和各种数据库进行交互(不限制于mysql),而且代码在不同数据库之间非常容易移植。

连接各个数据库的方法如下:(在如下表中,我们统一假设数据库登录的账户为username,登录密码为password,主机地址为(host address)localhost,登录后数据库的名字为test)。详细用法见如下链接。

web2py – The database abstraction layer

具体使用可以看下面的例子

SQLite sqlite://storage.sqlite
MySQL mysql://username:password@localhost/test?set_encoding=utf8mb4
PostgreSQL postgres://username:password@localhost/test
MSSQL (legacy) mssql://username:password@localhost/test
MSSQL (>=2005) mssql3://username:password@localhost/test
MSSQL (>=2012) mssql4://username:password@localhost/test
FireBird firebird://username:password@localhost/test
Oracle oracle://username/password@test
DB2 db2://username:password@test
Ingres ingres://username:password@localhost/test
Sybase sybase://username:password@localhost/test
Informix informix://username:password@test
Teradata teradata://DSN=dsn;UID=user;PWD=pass;DATABASE=test
Cubrid cubrid://username:password@localhost/test
SAPDB sapdb://username:password@localhost/test
IMAP imap://user:password@server:port
MongoDB mongodb://username:password@localhost/test
Google/SQL google:sql://project:instance/database
Google/NoSQL google:datastore
Google/NoSQL/NDB google:datastore+ndb

3.本文以MySQL为例子实现对数据库的交互:

        首先连接MySQL数据库,下图是用MySQL数据库的数据库交互平台,这些平台都大同小异,任意选一个下载安装即可。数据库的平台在主机上搭建一个server,然后其他电脑就可以通过client客户端访问了。我这里用的是HeidiSQL客户端:(如下可知,我的登录的账户为root,登录密码为*****,主机地址为192.168.0.45

 登录后数据库的名字为pqlongtermtest,然后我在下面建立了许多个table,

话不多说,直接上代码,代码里面的每一步都会具体解释。代码主要做的事情是建立数据库表格,然后当运行自动化代码上传数据到表格,并且能够用每个表格自动生成的主键id,互相引用,也即实现表与表之间的交互。

  1. import re #正则表达式库
  2. from datetime import datetime #时间库
  3. import numpy as np #数学计算库
  4. import pydal #我们需要的Python与数据库交互的万能库
  5. import requests #接口库,访问URL用的
  6. from robot.api import logger #便于直接logger打印信息
  7. from robot.libraries.BuiltIn import BuiltIn #这里BuiltIn是Robot Framework的交互库
  8. from selenium.webdriver.common.keys import Keys #selenium里面操作鼠标键盘的库
  9. from selenium.webdriver.remote.webelement import WebElement #selenium里面获取元素位置的库
  10. class PQDatabase(object):
  11. def __init__(self, username, password, host, dbname):
  12. self.conn = None #声明,用于连接MySQL
  13. self.deviceinfotab = 'device_information' #对应数据库里面新建的表
  14. self.test_run_history = 'test_run_history' #对应数据库里面另外新建的表
  15. self.measurandtab = 'measurement_test_result_history' #对应数据库里面另外另外新建的表
  16. self.open_database_connection(username, password, host, dbname)
  17. #将连接数据库函数放到__init__里面调用这个PQDatabase类就可以连接数据库
  18. #region Basic Operation
  19. '''
  20. 注意在前面使用例子中MySQL的连接方式为
  21. mysql://username:password@localhost/test?set_encoding=utf8mb4
  22. '''
  23. def open_database_connection(self, username, password, host, dbname):
  24. if self.conn is None:
  25. self.conn = pydal.DAL('mysql://{!s}:{!s}@{!s}/{!s}?set_encoding=utf8'
  26. .format(username, password, host, dbname), fake_migrate=True)
  27. '''
  28. 在上述连接中用到了fake_migrate=True,
  29. (详细解释也可以看上面的英文链接,在链接里面讲到:
  30. 当数据库中有两个重复行只是类型一个为string一个为datetime类型时是不能兼容的,
  31. 但fake_migrate=True可以重构使之不报错,仍然能正常连接上数据库);
  32. 大体作用是因为数据库的一些特性会导致链接出错,
  33. 用fake_migrate=True可以减少连接错误。
  34. '''
  35. def close_database_connection(self): # 断开连接
  36. self.conn.close()
  37. #endregion
  38. #region create Table
  39. '''
  40. 以下都是创建数据库中表格的方法,先开辟出一片田地Field,
  41. 相当于在数据库中建立了一张Excel表格用于后续数据库赋值存数据,
  42. 其中self.conn已经在__init__中调用的open_database_connection进行了赋值,
  43. 而pydal.DAL(...).define_table(para1,*para2)则是建表,
  44. pydal.DAL(...)也即self.conn;参数para1为所建表的名称,参数*para2则为所建立表的column
  45. '''
  46. def create_device_info_table(self):
  47. self.conn.define_table(
  48. self.deviceinfotab,
  49. pydal.Field('ip', type='string', length=50),
  50. pydal.Field('device_type', type='string', length=50),
  51. pydal.Field('serial_no', type='string', length=50),
  52. pydal.Field('mlfb', type='string', length=50),
  53. pydal.Field('mac', type='string', length=50),
  54. pydal.Field('hw_version', type='string', length=50),
  55. pydal.Field('sd_card_vendor', type='string', length=50),
  56. )
  57. def create_test_run_history_table(self):
  58. self.conn.define_table(
  59. self.test_run_history,
  60. pydal.Field('device_information_id', type='integer', length=50),
  61. pydal.Field('firmware_version', type='string', length=50),
  62. pydal.Field('bootloader_version', type='string', length=50),
  63. pydal.Field('parameter_set_version', type='string', length=50),
  64. pydal.Field('firmware_package_version', type='string', length=50),
  65. pydal.Field('test_start_time', type='datetime', length=50),
  66. pydal.Field('signal_equipment', type='string', length=50),
  67. pydal.Field('signal_id', type='string', length=50),
  68. pydal.Field('network_type', type='string', length=50),
  69. pydal.Field('u_din', type='double', length=50),
  70. pydal.Field('i_din', type='double', length=50),
  71. pydal.Field('f', type='string', length=50),
  72. pydal.Field('pt', type='string', length=50),
  73. pydal.Field('ct', type='string', length=50),
  74. pydal.Field('case_version', type='string', length=50),
  75. pydal.Field('uncertainty_version', type='string', length=50),
  76. pydal.Field('audit_status', type='string', length=50),
  77. pydal.Field('create_time', type='datetime', length=50),
  78. )
  79. def create_measurand_test_table(self):
  80. self.conn.define_table(
  81. self.measurandtab,
  82. pydal.Field('Test_Run_History_Id', type='string', length=11),
  83. pydal.Field('Case_Version', type='string', length=20),
  84. pydal.Field('Case_Id', type='string', length=20),
  85. pydal.Field('Uncertainty_Version', type='string', length=20),
  86. pydal.Field('Uncertainty_Id', type='string', length=20),
  87. pydal.Field('Measurand', type='string', length=20),
  88. pydal.Field('Expected_Value', type='string', length=20),
  89. pydal.Field('Allowed_Uncertainty', type='string', length=20),
  90. pydal.Field('Lower_Threshold', type='string', length=20),
  91. pydal.Field('Upper_Threshold', type='string', length=25),
  92. pydal.Field('Min_Value', type='string', length=25),
  93. pydal.Field('Max_Value', type='string', length=25),
  94. pydal.Field('Average_Value', type='string', length=25),
  95. pydal.Field('Actual_Uncertainty', type='string', length=20),
  96. pydal.Field('Test_Result', type='string', length=254),
  97. )
  98. #endregion
  99. #region store log
  100. '''
  101. 上面建立了表,这里就是往表的column里面添加元素,这里用self.conn[整张表名].bulk_insert(),
  102. '''
  103. def store_measurand_log_entries(self, data):
  104. try:
  105. for row in data:
  106. '''
  107. 这里data为一个以字典为元素的列表,row即为字典,
  108. 注意bulk_insert([{其中为key:value的模式,key即为上面建立的column,而value为row里面的value}])
  109. '''
  110. self.conn[self.measurandtab].bulk_insert([{
  111. 'Test_Run_History_Id': '{}'.format(row['Test_Run_History_Id']),
  112. 'Case_Version': '{}'.format(row['Case_Version']),
  113. 'Case_Id': '{}'.format(row['Case_Id']),
  114. 'Uncertainty_Version': '{}'.format(row['Uncertainty_Version']),
  115. 'Uncertainty_Id': '{}'.format(row['Uncertainty_Id']),
  116. 'Measurand': '{}'.format(row['Measurand']),
  117. 'Expected_Value': '{}'.format(row['Expected_Value']),
  118. 'Allowed_Uncertainty': '{}'.format(row['Allowed_Uncertainty']),
  119. 'Lower_Threshold': '{}'.format(row['Lower_Threshold']),
  120. 'Upper_Threshold': '{}'.format(row['Upper_Threshold']),
  121. 'Min_Value': '{}'.format(row['Min_Value']),
  122. 'Max_Value': '{}'.format(row['Max_Value']),
  123. 'Average_Value': '{}'.format(row['Average_Value']),
  124. 'Actual_Uncertainty': '{}'.format(row['Actual_Uncertainty']),
  125. 'Test_Result': '{}'.format(row['Test_Result']),
  126. }])
  127. self.conn.commit()
  128. '''
  129. insert完数值后要commit提交,因为数据库本身的原因insert, truncate, delete, and update都要commit才能真正提交,而create和drop则可以即时执行。
  130. '''
  131. except Exception as e:
  132. '''
  133. 接收错误赋给e
  134. '''
  135. self.conn.rollback()
  136. '''
  137. rollback的作用是回退到上个commit,上个commit之后的操作全部忽略。
  138. 这里放在except后,就是当前面commit出错后,清除这些commit。
  139. '''
  140. raise Exception('Insert measurand result failed: {!s}'.format(e)) #抛出错误e
  141. def store_device_information(self, data):
  142. try:
  143. self.conn[self.deviceinfotab].bulk_insert([{
  144. 'ip': '{}'.format(data['IP']),
  145. 'device_type': '{}'.format(data['DeviceType']),
  146. 'serial_no': '{}'.format(data['SerialNo']),
  147. 'mlfb': '{}'.format(data['MLFB']),
  148. 'mac': '{}'.format(data['MAC']),
  149. 'hw_version': '{}'.format(data['HWVersion']),
  150. 'sd_card_vendor': '{}'.format(data['SDCardVendor']),
  151. }])
  152. self.conn.commit()
  153. except Exception as e:
  154. self.conn.rollback()
  155. raise Exception('Insert device information failed: {!s}'.format(e))
  156. def store_test_run_history(self, data):
  157. try:
  158. for row in data:
  159. self.conn[self.test_run_history].bulk_insert([{
  160. 'device_information_id': '{}'.format(row['device_information_id']),
  161. 'firmware_version': '{}'.format(row['FirmwareVersion']),
  162. 'bootloader_version': '{}'.format(row['BootloaderVersion']),
  163. 'parameter_set_version': '{}'.format(row['ParameterSetVersion']),
  164. 'firmware_package_version': '{}'.format(row['FirmwarePackageVersion']),
  165. 'test_start_time': '{}'.format(row['test_start_time']),
  166. 'signal_equipment': '{}'.format(row['signalEquipment']),
  167. 'signal_id': '{}'.format(row['signalId']),
  168. 'network_type': '{}'.format(row['networkType']),
  169. 'u_din': '{}'.format(row['Udin']),
  170. 'i_din': '{}'.format(row['Idin']),
  171. 'f': '{}'.format(row['frequency']),
  172. 'pt': '{}'.format(row['pt']),
  173. 'ct': '{}'.format(row['ct']),
  174. 'case_version': '{}'.format(row['caseVersion']),
  175. 'uncertainty_version': '{}'.format(row['uncertaintyVersion']),
  176. }])
  177. self.conn.commit()
  178. test_run_history_id = self.get_latest_test_run_history_id_from_table()
  179. except Exception as e:
  180. self.conn.rollback()
  181. raise Exception('Insert device information failed: {!s}'.format(e))
  182. return test_run_history_id
  183. #endregion
  184. '''
  185. 如果可能的话,最好在表中包含一个id类型的字段作为主键
  186. (如果不指定这样的字段,DAL会自动包含一个名为“id”的字段)。
  187. 上面建立表后,表中会有一个自动生成的column,这个column为主键id,这个id的值是自动递增的。
  188. '''
  189. '''
  190. 如下函数是通过select进行筛选,它是用于选择表行,
  191. 用法是self.conn(定出筛选范围,相当于初步筛选).select(进一步筛选出表行),
  192. 在下面的初步筛选中用到了self.conn[self.deviceinfotab]['ip'] == data['IP'],这是在self.deviceinfotab表中找到'ip'的那一列,
  193. 然后这一栏中找到与输入的IP(入参)相等的那些行,
  194. 并且后面加了很多&,是一条一条逐列比对,直到找到所有符合这些特征的行;
  195. 最后将得到一个由字典作为元素的列表,然后用[0]索引出这个字典。
  196. 做这些的目的就是取出这行元素后,再从其中找到主键id的值,便于其他表格引用。
  197. '''
  198. def get_device_id_from_device_info_table(self, data):
  199. deviceInfo = self.conn((self.conn[self.deviceinfotab]['ip'] == data['IP'])
  200. & (self.conn[self.deviceinfotab]['device_type'] == data['DeviceType'])
  201. & (self.conn[self.deviceinfotab]['serial_no'] == data['SerialNo'])
  202. & (self.conn[self.deviceinfotab]['mlfb'] == data['MLFB'])
  203. & (self.conn[self.deviceinfotab]['mac'] == data['MAC'])
  204. & (self.conn[self.deviceinfotab]['hw_version'] == data['HWVersion']) & (self.conn[self.deviceinfotab]['sd_card_vendor']
  205. == data['SDCardVendor'])).select(self.conn[self.deviceinfotab].ALL)[0]
  206. return deviceInfo["id"]
  207. '''
  208. 下面的函数首先初步筛选出表格。
  209. 然后用select,其中第一个入参是全选整张表格,然后orderby是排序,加~是倒序排列,
  210. 因为id是自动递增的,所以倒序排列后取第一行就可以得到最新的一个id。
  211. limitby是限制出行(0,1)就是指第一行。
  212. 第一行被取出后,也是column做key,value则为第一行对应的值,组成一个字典;
  213. 这个字典成为列表的元素,而列表则作为整个self.conn().select()函数调用过程得到的结果。
  214. 然后取列表的第一个元素[0]就可以取出最新的一行,赋值给run,然后run["id"]取出最新的一行的id。
  215. '''
  216. def get_latest_test_run_history_id_from_table(self):
  217. run = self.conn(self.conn[self.test_run_history]).select(self.conn[self.test_run_history].ALL, orderby=~self.conn[self.test_run_history].id, limitby=(0, 1))[0]
  218. '''
  219. 这里将最新的一条记录从数据库取出,取出的这一列相当于是一个字典,
  220. 这个字典是以标题column为key,具体存放数据为value,然后这里的取法是先用self.conn[self.test_run_history])定出是哪一张表格,然后再在这张表格里面用select操作。
  221. '''
  222. return run["id"]
  223. '''
  224. 下面一个函数应该是最实用的一个函数,self.conn.executesql(数据库命令),
  225. 这里调用executesql,可以直接识别数据库的命令,那样只要知道数据库命令,调用这个函数就相当于对数据库直接做操作。
  226. 我在下面的数据库命令中主要是判断在整张表中是否有符合我入参的行,
  227. 而且注意使用format,因为如果把data['IP']等写到用单引号引用的数据库命令中,会导致Python无法识别,数据库更加无法识别。
  228. 而且注意,表格的名字直接用device_information,而不是self.conn[self.deviceinfotab],因为数据库也根本不能识别这个函数模式。
  229. '''
  230. def validate_device_information_exists(self, data):
  231. sql_order = 'SELECT * FROM device_information WHERE ip="{0}" AND device_type="{1}" AND serial_no="{2}" AND mlfb="{3}" AND mac="{4}" AND hw_version="{5}" AND sd_card_vendor="{6}"'.format(data['IP'], data['DeviceType'], data['SerialNo'], data['MLFB'], data['MAC'], data['HWVersion'], data['SDCardVendor'])
  232. Info = self.conn.executesql(sql_order)
  233. if Info:
  234. return True
  235. else:
  236. return False

总结:在Python中,做到一件逻辑复杂的事情可以将这个事情分成一个个小块,然后复杂的逻辑用if,else等实现即可 

知识点:

关于导入模块的方法,如上图中位于keywords文件夹中的configuration.py(记为K),这个文件的函数如果想调用enums文件夹中configuration.py文件(记为E),所在类下面的方法(函数),则可以用导入的方式。在下面图片中,我们可以看到,我想在K中调用E里面的类TSTimezone,则用命令如下:

from ..enums.configuration import TSTimezone #这是跨文件夹的调用

from ..utils import Utils

这种,是utils.py文件与文件夹keywords在同一层级目录下的导入方法。

from ._context import _Context

这种只有一个点,是_context.py与K,在同一文件层级下的导入方法。(以上)

如下代码是对上面pqdatabase.py中的PQDatabase类的调用,同样是写在pqdatabase.py文件中的类class _ComprehensiveKeywords(_Context):

  1. from .configuration import _ConfigurationKeywords
  2. from .device import _DeviceKeywords
  3. from .seleniumext import _SeleniumExtKeywords
  4. from .common import _CommonKeywords
  5. from .logging import LOGDataSource
  6. from .logging import _LoggingKeywords
  7. from .recording import RECMeasurementSource
  8. from .recording import RECQueryMethod
  9. from .recording import RECQueryType
  10. from .recording import RECRecordType
  11. from .recording import RECTrendOutputType
  12. from .recording import _RecordingKeywords
  13. '''
  14. 1.本文件与上面类class PQDatabase(object):在同一个文件中都属于pqdatabase.py(自定义)
  15. 2.关于上面一大片的from...import...;是从configuration.py;device.py等里面
  16. 导入这些.py文件其中的类,以达到在pqdatabase.py文件中调用device.py等里面的类下面的函数,
  17. 3.见下面引用的方法,用super()实现了在子类_ComprehensiveKeywords中调用父类_Context的
  18. __init__()的方法,然后将导入的有需要的类,全部赋值给self._*** 。
  19. 以下是_Context.py文件的内容:(在class _ComprehensiveKeywords(_Context):中的
  20. super().__init__(*args)相当于调用了这个父类__init__下面的所有方法。)
  21. from COMTRADERobot import COMTRADERobot
  22. from IEC61850Robot import IEC61850Robot
  23. from ModbusRobot import ModbusRobot
  24. from PSURobot import PSURobot
  25. from SeleniumLibrary import SeleniumLibrary
  26. from SeleniumLibrary.locators import ElementFinder
  27. class _Context(object):
  28. def __init__(self, sel, mbus, psu, iec, comt):
  29. self._sel = sel # type: SeleniumLibrary
  30. self._mbus = mbus # type: ModbusRobot
  31. self._psu = psu # type: PSURobot
  32. self._element_finder = ElementFinder(self._sel)
  33. self._iec = iec # type: IEC61850Robot
  34. self._comt = comt # type: COMTRADERobot
  35. '''
  36. class _ComprehensiveKeywords(_Context):
  37. def __init__(self, *args):
  38. super().__init__(*args)
  39. self._device = _DeviceKeywords(*args)
  40. self._seleniumext = _SeleniumExtKeywords(*args)
  41. self._logging = _LoggingKeywords(*args)
  42. self._common = _CommonKeywords(*args)
  43. self._recording = _RecordingKeywords(*args)
  44. self._configuration = _ConfigurationKeywords(*args)
  45. self._db = None
  46. #region store general information
  47. def get_sd_card_vendor(self):
  48. #这个函数不用管,是产生数据给下面存数据用的
  49. self._sel.go_to(self._device.get_device_debug_address('/sdcardstatistic'))
  50. self._sel.wait_until_page_contains_element('css:a[href=sdcardstatistic2]')
  51. statsText = self._sel.find_element('tag:pre').text
  52. sdCardVendor = re.search(r"^\s*sd card vendor:\s*(.*?)\s*$", statsText, re.I | re.M)[1]
  53. if sdCardVendor:
  54. return sdCardVendor
  55. else:
  56. raise ValueError('Can\'t get SD card type.')
  57. def get_generalinfo_list(self, ipAddress, getSDCardVendor=True):
  58. #这个函数不用管,是产生数据给下面存数据用的
  59. ipAddress = str(ipAddress)
  60. getSDCardVendor = robottypes.is_truthy(getSDCardVendor)
  61. self._sel.go_to(self._device.get_device_address('/InfoDeviceInfo.html'))
  62. self._sel.wait_until_page_contains_element('id:footer', timeout=None, error='Device information page not found')
  63. devinfoDic = {}
  64. tblRows = self._sel.find_elements('css:#workingArea > table > tbody > tr')
  65. regex = re.compile(r'[ ()]')
  66. for row in tblRows: # type: WebElement
  67. cells = self._element_finder.find('tag:td', first_only=False, required=False, parent=row)
  68. if cells:
  69. keys = regex.sub('', cells[0].text.lower())
  70. devinfoDic[keys] = cells[1].text
  71. if _DeviceKeywords.active_device().deviceType in [SupportedDevice.Q200]:
  72. if getSDCardVendor:
  73. sdCardVendor = self.get_sd_card_vendor()
  74. else:
  75. sdCardVendor = 'NULL'
  76. else:
  77. sdCardVendor = 'NULL'
  78. if _DeviceKeywords.active_device().deviceType in [SupportedDevice.Q200, SupportedDevice.Q100, SupportedDevice.P855, SupportedDevice.P850]:
  79. devinfoDic['bootloaderversion'] = 'NULL'
  80. if '?' in devinfoDic['localtime']:
  81. devinfoDic['localtime'] = devinfoDic['localtime'].replace('?', ':')
  82. devinfoDic['localtime'] = datetime.strptime(devinfoDic['localtime'][0:19], '%Y-%m-%d %H:%M:%S')
  83. generalInfoDic = {
  84. 'IP': ipAddress,
  85. 'DeviceType': devinfoDic['devicetype'],
  86. 'SerialNo': devinfoDic['serialnumber'],
  87. 'MLFB': devinfoDic['ordernumbermlfb'],
  88. 'MAC': devinfoDic['macaddress'],
  89. 'HWVersion': 'NULL',
  90. 'SDCardVendor': str(sdCardVendor),
  91. 'FirmwareVersion': devinfoDic['firmwareversion'],
  92. 'BootloaderVersion': devinfoDic['bootloaderversion'],
  93. 'ParameterSetVersion': devinfoDic['parametersetversion'],
  94. 'FirmwarePackageVersion': devinfoDic['firmwarepackageversion'],
  95. 'DeviceLocalTime': devinfoDic['localtime'],
  96. }
  97. logger.info('The device general info is {}'.format(generalInfoDic))
  98. return generalInfoDic
  99. '''
  100. 这里就开始往数据库里面存数据
  101. '''
  102. def store_device_information_into_database(self, username, password, host, dbname):
  103. generalInfoDic = self.get_generalinfo_list(_DeviceKeywords.active_device().ipAddress)
  104. #这里调用函数是为了得到它返回的一个字典,这个字典是待后面合并用的
  105. '''
  106. 关于连接数据库这里self._db已经在最开始有一个None初值。
  107. 以下函数调用,这里是连接数据库,直接调用了类PQDatabase,
  108. 然而在这个类被调用时,它的__init__函数下的内容就都会被调用,其中有self.open_database_connection,因为在__init__文件下,所以也被调用到,从而进行数据库连接。
  109. '''
  110. self._db = PQDatabase(username, password, host, dbname)
  111. #连接数据库,并且将类赋给了self._db
  112. try:
  113. self._db.create_device_info_table()#调用类PQDatabase下面的方法(函数)
  114. '''
  115. 首先创建device info表格,
  116. 然后判断符合条件的信息是否已经存入,没存入则进行存入,存入一条信息id就会自动加1;
  117. 如果已经存入,则跳过存入阶段,已经存入则id必然已经出现。
  118. 最后取出符合条件的id这个id是存入信息时自动生成的。
  119. 取出这个id被return出,其他表格可以将这个信息作为字典元素写入,
  120. 这样就可以建立其他表格与device info表格的关联,实现交互。
  121. '''
  122. if self._db.validate_device_information_exists(generalInfoDic) is False:
  123. self._db.store_device_information(generalInfoDic)
  124. device_id = self._db.get_device_id_from_device_info_table(generalInfoDic)
  125. return device_id
  126. finally:
  127. self._db.close_database_connection()
  128. '''
  129. 下面的函数是往test_run_history表格里面存数据,
  130. 这里面有一个self.test_run_history_id = self._db.store_test_run_history(info)
  131. 其中PQDatabase类中的store_test_run_history(info),是取出最新一条储存数据的id,
  132. 用self,是将其变成全局变量,便于其他函数(方法)直接调用这个属性。
  133. '''
  134. def store_test_run_information_into_database(self, username, password, host, dbname, caseVersion, uncertaintyVersion):
  135. self.test_run_history_id = None
  136. generalInfoDic = self.get_generalinfo_list(_DeviceKeywords.active_device().ipAddress)
  137. device_id = self.store_device_information_into_database(username, password, host, dbname)#获取device information表格里面的id
  138. configuration = self._configuration.get_ac_measurement_configuration()
  139. pt = '{}:{}'.format(configuration.primRatedVoltage, configuration.secRatedVoltage)
  140. ct = '{}:{}'.format(configuration.primRatedCurrent, configuration.secRatedCurrent)
  141. if configuration.networkType == ACMNetworkType.SINGLE_PHASE:
  142. renamed_networkType = "1P2W"
  143. elif configuration.networkType == ACMNetworkType.THREE_WIRE_THREE_PHASE_BALANCED:
  144. renamed_networkType = "3P3WB"
  145. elif configuration.networkType == ACMNetworkType.THREE_WIRE_THREE_PHASE_UNBALANCED_2L:
  146. renamed_networkType = "3P3W2I"
  147. elif configuration.networkType == ACMNetworkType.THREE_WIRE_THREE_PHASE_UNBALANCED_3L:
  148. renamed_networkType = "3P3W3I"
  149. elif configuration.networkType == ACMNetworkType.FOUR_WIRE_THREE_PHASE_BALANCED:
  150. renamed_networkType = "3P4WB"
  151. elif configuration.networkType == ACMNetworkType.FOUR_WIRE_THREE_PHASE_UNBALANCED:
  152. renamed_networkType = "3P4WU"
  153. extended = {
  154. 'signalEquipment': BuiltIn().get_variable_value('${SignalModel}'),
  155. 'signalId': BuiltIn().get_variable_value('${OmicronSerial}'),
  156. 'frequency': configuration.frequency.name,
  157. 'networkType': renamed_networkType,
  158. 'Udin': configuration.primNomVoltage,
  159. 'Idin': configuration.primRatedCurrent,
  160. 'pt': pt,
  161. 'ct': ct,
  162. 'device_information_id': device_id,
  163. '''
  164. 将device_id作为value给device_information_id,然后将这个key对应的value再赋值给
  165. store_test_run_history中bulk_insert里面的device_information_id,从而存入数据库。
  166. '''
  167. 'test_start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  168. 'caseVersion': caseVersion,
  169. 'uncertaintyVersion': uncertaintyVersion,
  170. }
  171. info = [{**generalInfoDic, **extended}]#注意如何拼接字典合成一个大字典
  172. #值得注意的是字典里的key:value可以多于store_test_run_history中bulk_insert所需
  173. self._db = PQDatabase(username, password, host, dbname)#连接数据库
  174. try:
  175. self._db.create_test_run_history_table()
  176. #创建一个test_run数据库表
  177. self.test_run_history_id = self._db.store_test_run_history(info)
  178. #存入数据到test_run数据库表中
  179. finally:
  180. self._db.close_database_connection()#最后关闭数据库
  181. #endregion
  182. #region store longterm test log
  183. #下面的这个函数相当于是建立一个字典,给数据库存
  184. #region store measurement test log
  185. def create_measurand_test_log(self, caseVersion, caseId, uncertaintyVersion, uncertaintyId, measurand, allowedUncertainty, expectedValue=None, measuredValues=None, actualUncertainty=None, errorOccured=None):
  186. caseVersion = str(caseVersion)
  187. caseId = str(caseId)
  188. uncertaintyVersion = str(uncertaintyVersion)
  189. uncertaintyId = str(uncertaintyId)
  190. measurand = str(measurand)
  191. allowedUncertainty = float(allowedUncertainty)
  192. if expectedValue is not None: # Used for IEC62586/IEC61557 basic measurand and power test
  193. expectedValue = round(float(expectedValue), 4)
  194. lower_threshold = round((expectedValue - allowedUncertainty), 4)
  195. upper_threshold = round((expectedValue + allowedUncertainty), 4)
  196. else:
  197. expectedValue = 'NULL'
  198. lower_threshold = 'NULL'
  199. upper_threshold = 'NULL'
  200. if measuredValues is not None: # Used for IEC62586/IEC61557 basic measurand and power test
  201. measuredValues = [*map(float, measuredValues)]
  202. measuredValues = list(measuredValues)
  203. measuredValues = sorted(measuredValues)
  204. logger.info("The measured values are {}".format(measuredValues))
  205. if np.isinf(measuredValues).any():
  206. minValue = 999999 #999999 is inf overflow, due to database can't handel overflow and invalid value
  207. maxValue = 999999
  208. averageValue = 999999
  209. elif np.isnan(measuredValues).any():
  210. minValue = 666666 #666666 is nan invalid
  211. maxValue = 666666
  212. averageValue = 666666
  213. else:
  214. logger.info("into")
  215. minValue = round(measuredValues[0], 4)
  216. maxValue = round(measuredValues[-1], 4)
  217. sumValue = 0
  218. for value in measuredValues:
  219. sumValue = sumValue + value
  220. averageValue = round(sumValue/len(measuredValues), 4)
  221. logger.info("max value is {}, min value is {}, average value is {}".format(maxValue, minValue, averageValue))
  222. else:
  223. minValue = 'NULL'
  224. maxValue = 'NULL'
  225. averageValue = 'NULL'
  226. if actualUncertainty is not None: #Used for IEC61557/IEC62053 energy test
  227. actualUncertainty = float(actualUncertainty)
  228. else:
  229. actualUncertainty = 'NULL'
  230. if errorOccured is False:
  231. testResult = 'Pass'
  232. else:
  233. testResult = 'Failed'
  234. measurand_log = {
  235. 'Case_Version': caseVersion,
  236. 'Case_Id': caseId,
  237. 'Uncertainty_Version': uncertaintyVersion,
  238. 'Uncertainty_Id': uncertaintyId,
  239. 'Measurand': measurand,
  240. 'Expected_Value': expectedValue,
  241. 'Allowed_Uncertainty': allowedUncertainty,
  242. 'Lower_Threshold': lower_threshold,
  243. 'Upper_Threshold': upper_threshold,
  244. 'Min_Value': minValue,
  245. 'Max_Value': maxValue,
  246. 'Average_Value': averageValue,
  247. 'Actual_Uncertainty': actualUncertainty,
  248. 'Test_Result': testResult,
  249. }
  250. return measurand_log
  251. def store_measurand_test_result_into_database(self, username, password, host, dbname, dataCluster):
  252. runId = self.test_run_history_id#上面函数中的全局变量在这里用到了
  253. infoList = []
  254. generalInfoDic = {
  255. 'Test_Run_History_Id': str(runId),
  256. }
  257. for data in dataCluster:
  258. info = {**generalInfoDic, **data}#合并成字典
  259. infoList.append(info)#加入到列表infoList中
  260. self._db = PQDatabase(username, password, host, dbname)
  261. try:
  262. self._db.create_measurand_test_table()
  263. self._db.store_measurand_log_entries(infoList)
  264. finally:
  265. self._db.close_database_connection()
  266. #endregion