forked from roomthily/ISO-19115-Distribution-Structures
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_uniqueidentifiers.py
131 lines (107 loc) · 4.03 KB
/
run_uniqueidentifiers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import json as js # name conflict with sqla
import sqlalchemy as sqla
from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy import and_
from mpp.models import Response
from mpp.models import UniqueIdentifier
from semproc.timed_command import TimedCmd
from optparse import OptionParser
import tempfile
from os import write, close, unlink
import traceback
# TODO: some issue with an integer being too large for python.
def main():
op = OptionParser()
op.add_option('--start', '-s', default=0, type="int")
op.add_option('--end', '-e', default=100, type="int")
op.add_option('--interval', '-i', default=100, type="int")
options, arguments = op.parse_args()
START = options.start
TOTAL = options.end
LIMIT = options.interval
conf = 'big_rds.conf'
cmd = 'python unique_identifier_cli.py -f {0} -u "{1}"'
timeout = 120 # in seconds, more than 2minutes seems like an eternity
with open(conf, 'r') as f:
config = js.loads(f.read())
# our connection
engine = sqla.create_engine(config.get('connection'), pool_timeout=360)
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
@event.listens_for(engine, "engine_connect")
def ping_connection(connection, branch):
if branch:
return
try:
connection.scalar(select([1]))
except exc.DBAPIError as err:
if err.connection_invalidated:
connection.scalar(select([1]))
else:
raise
for i in xrange(START, TOTAL, LIMIT):
print '***** START INTERVAL: ', i
# for any clean runs
for response in session.query(Response).filter(
Response.format == 'xml').limit(LIMIT).offset(i).all():
# join_query = session.query(UniqueIdentifier.response_id)
# for response in session.query(Response).filter(
# and_(Response.format == 'xml', ~Response.id.in_(join_query))
# ).limit(LIMIT).offset(i).all():
print '\tready'
response_id = response.id
if response.identifiers:
continue
print '\tgo'
cleaned_content = response.cleaned_content
# put it in a tempfile to deal with
# very long files and paper over the
# encoding, escaping junk
handle, name = tempfile.mkstemp(suffix='.xml')
write(handle, cleaned_content)
close(handle)
tc = TimedCmd(cmd.format(name, response.source_url))
try:
status, output, error = tc.run(timeout)
except Exception as ex:
print '******propagated failed extraction: ', response_id
# traceback.print_exc()
print
continue
finally:
unlink(name)
if error:
print '******error from cli: ', response_id
print error
print
continue
commits = []
for i in output.split('\n'):
if not i:
continue
ident = js.loads(i)
identifier = UniqueIdentifier(
response_id=response_id,
tag=ident.get('tag'),
extraction_type=ident.get('extraction_type'),
match_type=ident.get('match_type'),
original_text=ident.get('original_text'),
potential_identifier=ident.get('potential_identifier')
)
commits.append(identifier)
try:
session.add_all(commits)
session.commit()
except Exception as ex:
print '**********failed commit: ', response_id
print ex
print
session.rollback()
print '\tcommitted'
session.close()
if __name__ == '__main__':
main()