-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdio_importer.py
More file actions
144 lines (108 loc) · 4.24 KB
/
dio_importer.py
File metadata and controls
144 lines (108 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import argparse
import traceback
import json
from utils.logging import Logger
from utils.queries import ESConnection
from sys import exit
logger = Logger("DoParser")
def prepare_indices(es_conn, session):
index = "criba_trace_{}".format(session)
es_conn.createNewDIOTracerIndex(index)
es_conn.createNewIndex(index+"-paths")
es_conn.createDIOIngestPipeleine()
return index
def bulk_data(es_conn, bulk, bulk_size, bulk_start_index, bulk_end_index, index, session):
errors, took = es_conn.bulkIndex(bulk, bulk_start_index, index, session, "split-events-pipeline")
if errors:
logger.error("Got following errors while bulking: ")
for error in errors:
logger.error("\t- {} (x{})".format(error, errors[error]))
_finish()
else:
logger.debug("bulked {} records ({} to {}) in {} ms".format(len(bulk), bulk_start_index, bulk_end_index, took))
bulk_start_index = bulk_end_index
bulk = []
return bulk, bulk_start_index, bulk_end_index
def parse_tracer(es_conn, session, filename, bulk_size=1000):
logger.info("Parsing file: {}".format(filename))
# parse file and save records to ES
bulk = []
bulk_start_index = 0
bulk_end_index = 0
min_time_str = ""
max_time_str = ""
time = {"min_t": -1, "max_t": 0, "duration": 0}
index = None
if session is not None:
newSessionName = True
logger.info("Session name: {}".format(session))
else:
newSessionName = False
try:
with open(filename, 'r') as file:
# read file line by line
for line in file:
# flush records if size equals bulk_size
if len(bulk) == bulk_size:
if index is None:
index = prepare_indices(es_conn, session)
bulk, bulk_start_index, bulk_end_index = bulk_data(es_conn, bulk, bulk_size, bulk_start_index, bulk_end_index, index, session)
# parse line to a json object
jsonObject = json.loads(line)
if newSessionName:
jsonObject["session_name"] = session
elif session is None:
session = jsonObject["session_name"]
logger.info("Session name: {}".format(session))
if ("call_timestamp" in jsonObject) and (time["min_t"] == -1 or jsonObject["call_timestamp"] < time["min_t"]):
time["min_t"] = jsonObject["call_timestamp"]
min_time_str = jsonObject["time_called"]
if ("return_timestamp" in jsonObject) and (jsonObject["return_timestamp"] > time["max_t"]):
time["max_t"] = jsonObject["return_timestamp"]
max_time_str = jsonObject["time_called"]
# add json object to list of records
bulk.append(jsonObject)
bulk_end_index += 1
# verify if there is any record to flush
if len(bulk) > 0:
if index is None:
index = prepare_indices(es_conn, session)
bulk, bulk_start_index, bulk_end_index = bulk_data(es_conn, bulk, bulk_size, bulk_start_index, bulk_end_index, index, session)
logger.info("Sent %d records to ES" % bulk_end_index)
# Creating doc of duration
time["duration"] = time["max_t"] - time["min_t"]
time["session_name"] = session
time["min_t"] = min_time_str
time["max_t"] = max_time_str
es_conn.docIndex(index, time, "{}_{}".format(session, bulk_end_index+1))
logger.info("Sent duration doc to ES: {}".format(time))
except IOError:
logger.error("could not load the provided file")
def _start():
logger.info("DoParser Started!")
def _finish():
logger.info("DoParser Finished!")
exit(0)
def main():
parser = argparse.ArgumentParser(description='Parses DIO trace file and export to ElasticSearch')
parser.add_argument('-u', '--url', default="http://cloud124:31111", type=str, help='elasticSearch URL')
parser.add_argument('--session', help='session name', default=None, nargs='?')
parser.add_argument('--size', metavar='size', default=1000, type=int, help='bulk size')
parser.add_argument('-d', '--debug', action='store_true', default=False, help='Debug mode')
parser.add_argument('file', help='DIO trace file', default=None, nargs='?')
args = parser.parse_args()
_start
try:
if args.debug:
logger.setLevel("debug")
es_conn = ESConnection(args.url)
if args.file == None:
logger.error("A valid file must be provided.")
_finish()
parse_tracer(es_conn, args.session, args.file, args.size)
except Exception as e:
logger.error("Got an unexpected error: %s" % e)
traceback.print_exception(type(e), e, e.__traceback__)
_finish()
if __name__ == '__main__':
main()