-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDBService.py
More file actions
executable file
·208 lines (177 loc) · 8.35 KB
/
DBService.py
File metadata and controls
executable file
·208 lines (177 loc) · 8.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
__author__ = 'rodozov, Algirdas Beinaravicius'
"""
runDataToDB
Author: Algirdas Beinaravicius, modifications by Mircho Rodozov mrodozov@cern.ch
"""
from Singleton import Singleton
import json
import sqlalchemy
from CommandClasses import *
#from sqlalchemy.engine import reflection
import datetime
import multiprocessing as mp
class DBService(object):
__metaclass__ = Singleton
def __init__(self, dbType='sqlite:///', host=None, port=None, user='', password=None, schema='', dbName='dbfiles/runData.db'):
self.__dbType = dbType
self.__schema = schema
self.__host = host
self.__user = user
self.__password = password
self.__dbName = dbName
self.__supportedDBs = {'sqlite': ['sqlite://', 'sqlite:///'], 'oracle': ['oracle://']}
self.lock = mp.Lock()
if (dbType in self.__supportedDBs['sqlite']): # why is this even here ?
print dbType
self.__alchemyDBString = dbType + dbName
else:
# create oracle connection string
if host and port:
self.__alchemyDBString = dbType + user + ':' + password + '@' + host + ':' + port + '/' + dbName
else:
self.__alchemyDBString = dbType + dbName
#self.__alchemyDBString = 'mysql+mysqldb://rodozov:BAKsh0321__@localhost/RPC?charset=utf8'
#self.__alchemyDBString = 'oracle+cx_oracle://rodozov:'+password+'@localhost:1521/XE'
#self.__alchemyDBString = 'oracle+cx_oracle://CMS_COND_RPC_NOISE:j6XFEznqH9f92WUf@cms_orcoff_prep' # pass is wrong
#self.__alchemyDBString = 'oracle+cx_oracle://CMS_RPC_R:rpcr34d3r@cms_omds_lb'
self.__alchemyDBString = 'oracle+cx_oracle://CMS_RPC_COND_W:8B1M410RM1N0RC3SS4T@cms_omds_lb'
self.__engine = sqlalchemy.create_engine(self.__alchemyDBString)
#print self.__alchemyDBString
def createDBStrips(self):
metadata = sqlalchemy.MetaData()
runs = sqlalchemy.Table('RPC_NOISE_STRIPS', metadata,
sqlalchemy.Column('run_number', sqlalchemy.Integer),
sqlalchemy.Column('raw_id', sqlalchemy.Integer),
sqlalchemy.Column('channel_number', sqlalchemy.Integer),
sqlalchemy.Column('strip_number', sqlalchemy.Integer),
sqlalchemy.Column('is_dead', sqlalchemy.Integer),
sqlalchemy.Column('is_masked', sqlalchemy.Integer),
sqlalchemy.Column('rate_hz_cm2', sqlalchemy.Numeric(13,7)),
schema=self.__schema)
#sqlalchemy.Column('rate_hz_cm2', sqlalchemy.Numeric(13,7)))
metadata.create_all(self.__engine)
def createDBRolls(self):
metadata = sqlalchemy.MetaData()
runs = sqlalchemy.Table('RPC_NOISE_ROLLS', metadata,
sqlalchemy.Column('run_number', sqlalchemy.Integer),
sqlalchemy.Column('raw_id', sqlalchemy.Integer),
sqlalchemy.Column('dead_strips', sqlalchemy.Integer),
sqlalchemy.Column('masked_strips', sqlalchemy.Integer),
sqlalchemy.Column('strips_to_unmask', sqlalchemy.Integer),
sqlalchemy.Column('strips_to_mask', sqlalchemy.Integer),
sqlalchemy.Column('rate_hz_cm2', sqlalchemy.Numeric(13,7)),
schema=self.__schema)
#sqlalchemy.Column('rate_hz_cm2', sqlalchemy.Numeric(13,7)))
metadata.create_all(self.__engine)
def insertToDB(self, data, tableName, orderedColumnNames, argsList, rval=None):
retval = False
#print self.__alchemyDBString
metadata = sqlalchemy.MetaData()
# table = sqlalchemy.Table(tableName, metadata, schema='RPC', autoload=True, autoload_with=self.__engine)
#table = sqlalchemy.Table(tableName, metadata, schema=self.__schema, autoload=True, autoload_with=self.__engine)
eng = sqlalchemy.create_engine(self.__alchemyDBString)
table = sqlalchemy.Table(tableName, metadata, schema=self.__schema, autoload=True, autoload_with=eng)
connection = eng.connect()
#insp = reflection.Inspector.from_engine(self.__engine)
#print insp.get_schema_names()
#print insp.get_table_names()
insertionList = []
#print tableName
#print connection
start_time = datetime.datetime.now().replace(microsecond=0)
#print 'insert to db', start_time, self
for line in data:
insertion = {}
argnum = 0
for columnName in orderedColumnNames:
if argnum in argsList:
value = None
if columnName == 'rate_hz_cm2': value = float(line[argnum]) # oracle complains about the non numberic type
else: value = int(line[argnum])
insertion[columnName] = value
argnum += 1
insertionList.append(insertion)
data = None
transaction = connection.begin()
try:
queryResult = connection.execute(table.insert(), insertionList)
transaction.commit()
insertionList = None
except:
transaction.rollback()
raise
retval = True
connection.close()
eng.dispose()
endtime = datetime.datetime.now().replace(microsecond=0)
#print 'time it took: ', endtime-start_time
#retval = queryResult
rval = retval
return retval
def deleteRunFromDB(self, runNumber=None, tableName=None):
metadata = sqlalchemy.MetaData()
table = sqlalchemy.Table(tableName, metadata, schema=self.__schema, autoload=True, autoload_with=self.__engine)
# delete all rows with given runnumber from the data base
connection = self.__engine.connect()
delete = table.delete().where(table.c.runnumber==runNumber)
connection.execute(delete)
connection.close()
def deleteDataFromTable(self, tableName=None):
metadata = sqlalchemy.MetaData()
table = sqlalchemy.Table(tableName, metadata, schema=self.__schema, autoload=True, autoload_with=self.__engine)
# delete all rows with given runnumber from the data base
connection = self.__engine.connect()
delete = table.delete()
connection.execute(delete)
connection.close()
def selectFromDB(self, runNumber=None, tableName=None):
metadata = sqlalchemy.MetaData()
table = sqlalchemy.Table(tableName, metadata, schema=self.__schema, autoload=True, autoload_with=self.__engine)
#table = sqlalchemy.Table(tableName, metadata, schema=self.__schema, autoload=True, autoload_with=self.__engine)
connection = self.__engine.connect()
select = table.select().where(table.c.run_number==runNumber)
result = connection.execute(select)
connection.close()
return result
def getConnection(self):
return self.__engine.connect()
if __name__ == "__main__":
dbpass = ''
optionsObject = None
with open('resources/options_object.txt', 'r') as optobj:
optionsObject = json.loads(optobj.read())
#with open('resources/dbpaswd') as dbpassf:
# dbpass = dbpassf.readline()
db_obj = DBService(dbType='oracle://',host= '',port= '',user= 'CMS_RPC_COND_W',password= '8B1M410RM1N0RC3SS4T',schema= 'CMS_RPC_COND',dbName= 'cms_omds_lb')
#print db_obj
#db_obj2 = DBService()
#print db_obj2
result = db_obj.selectFromDB(269136,'RPC_NOISE_ROLLS')
print result
for row in result:
print 'raw id: ', row['raw_id'] # working, it's ok now
#db_obj.createDBRolls()
#db_obj.createDBStrips()
#db_obj.deleteDataFromTable('RPC_NOISE_ROLLS')
#db_obj.deleteDataFromTable('RPC_NOISE_STRIPS') #blocks for unknown reason
dbup = DBDataUpload(args=optionsObject['dbdataupload'])
dbup.options['filescheck'] = ['/rpctdata/CAF/run269565/database_new.txt', '/rpctdata/CAF/run269565/database_full.txt']
dbup.options['run'] = '269565'
dbup.processTask()
'''
dbuptwo = DBDataUpload(args=optionsObject['dbdataupload'])
dbuptwo.options['filescheck'] = ['results/run263757/database_new.txt', 'results/run263757/database_full.txt']
dbuptwo.options['run'] = '263757'
'''
# print dbup.args
# print dbup.options
#bup.processTask()
#buptwo.processTask()
'''
dbthread_one = Thread(target=dbup.processTask)
dbthread_two = Thread(target=dbuptwo.processTask)
dbthread_one.start()
dbthread_two.start()
dbthread_one.join()
dbthread_two.join()
'''