Skip to content
This repository was archived by the owner on Jul 22, 2024. It is now read-only.

Commit 67cd5c6

Browse files
author
John Dunham
authored
Merge pull request #688 from mew2057/json-fix
Reducing number of json files #679
2 parents 04be4b2 + 9280605 commit 67cd5c6

File tree

3 files changed

+124
-6
lines changed

3 files changed

+124
-6
lines changed

csm_big_data/beats/config/filebeat.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,32 @@
11
filebeat.inputs:
22
- type: log
33
enabled: true
4+
close_removed: 1h
5+
close_inactive: 1h
46
paths:
57
- /var/log/ibm/csm/csm_transaction.log.*
68
tags: ["transaction"]
79

810
- type: log
911
enabled: true
12+
close_removed: 1h
13+
close_inactive: 1h
1014
paths:
1115
- /var/log/ibm/csm/csm_allocation_metrics.log.*
1216
tags: ["allocation","metrics"]
1317

1418
- type: log
1519
enabled: true
20+
close_removed: 1h
21+
close_inactive: 1h
1622
paths:
1723
- "/var/log/ibm/csm/archive/*.json"
1824
tags: ["archive"]
1925

2026
- type: log
2127
enabled: true
28+
close_removed: 1h
29+
close_inactive: 1h
2230
paths:
2331
- /var/log/ibm/csm/csm_ras_events.log
2432
tags: ["csm","ras"]
@@ -27,8 +35,8 @@ filebeat.config.modules:
2735
path: ${path.config}/modules.d/*.yml
2836
reload.enabled: false
2937

30-
name: "master"
3138

39+
name: "master"
3240
setup.kibana:
3341
host: _KIBANA_HOST_PORT_
3442

csmdb/sql/csm_db_history_archive.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from datetime import date
3737
import threading
3838
from multiprocessing.dummy import Pool as ThreadPool
39+
from csm_db_rollup import rollupDir
3940

4041
DEFAULT_LOG='''/var/log/ibm/csm/db/csm_db_archive_script.log'''
4142
DEFAULT_TARGET='''/var/log/ibm/csm/archive'''
@@ -47,6 +48,19 @@
4748
# Additional Formatting style
4849
line1 = "---------------------------------------------------------------------------------------------------------"
4950

51+
def sanitize_string(v):
52+
return v.decode('utf-8', 'ignore')
53+
54+
def sanitize_dict(d):
55+
for k, v in d.iteritems():
56+
if isinstance(v, dict):
57+
d[k] = sanitize_dict(v)
58+
elif isinstance(v, str):
59+
d[k] = sanitize_string(v)
60+
else:
61+
d[k] = v
62+
return d
63+
5064
# username defined
5165
username = commands.getoutput("whoami")
5266

@@ -168,11 +182,11 @@ def dump_table( db, user, table_name, count, target_dir, is_ras=False ):
168182
# Append the logs to the file.
169183
try:
170184
with open(file_name, 'a') as file:
171-
172185
colnames = [desc[0] for desc in cursor.description]
173186
for row in cursor:
174187
file.write('{{ "type":"db-{0}", "data":{1} }}\n'.format(
175-
table_name, json.dumps(dict(zip(colnames, row)), default=str)))
188+
table_name, json.dumps(sanitize_dict(dict(zip(colnames, row))),
189+
default=str)))
176190
except Exception as e:
177191
print "[INFO] Exception caught: {0}".format(e)
178192
logger.info("Exception caught: {0}".format(e))
@@ -226,6 +240,13 @@ def main(args):
226240
# Verifies path exists.
227241
if not os.path.exists(args.target):
228242
os.makedirs(args.target)
243+
244+
# Make the temp directory for staging the tables for dumping.
245+
# This is used before writing to mitigate potential data loss.
246+
temp_dir = "{0}/tmp".format(args.target)
247+
if not os.path.exists(temp_dir):
248+
os.makedirs(temp_dir)
249+
229250

230251
# Process the script detail info. for screen and logging.
231252
logging.info("DB Name: | {0}".format(args.db))
@@ -244,18 +265,21 @@ def main(args):
244265

245266
pool = ThreadPool(int(args.threads))
246267

247-
tmp_list = pool.map( lambda table: dump_table( args.db, args.user, table, args.count, args.target ), TABLES )
268+
tmp_list = pool.map( lambda table: dump_table( args.db, args.user, table, args.count, temp_dir ), TABLES )
248269

249270
for entry in tmp_list:
250271
if entry is None:0
251272
else:
252273
print entry
253274

254275
for table in RAS_TABLES:
255-
entry = dump_table( args.db, args.user, table, args.count, args.target, True)
276+
entry = dump_table( args.db, args.user, table, args.count, temp_dir, True)
256277
if entry is None:0
257278
else:
258-
print entry
279+
print (entry, args.target)
280+
281+
# After the tables are dumped, it's time to merge them into the weekly report.
282+
rollupDir(temp_dir,"..")
259283

260284
# Process the finishing info. for screen and logging.
261285
ft = datetime.datetime.now()

csmdb/sql/csm_db_rollup.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#!/bin/python
2+
# encoding: utf-8
3+
#================================================================================
4+
#
5+
# csm_db_rollup.py
6+
#
7+
# © Copyright IBM Corporation 2015-2019. All Rights Reserved
8+
#
9+
# This program is licensed under the terms of the Eclipse Public License
10+
# v1.0 as published by the Eclipse Foundation and available at
11+
# http://www.eclipse.org/legal/epl-v10.html
12+
#
13+
# U.S. Government Users Restricted Rights: Use, duplication or disclosure
14+
# restricted by GSA ADP Schedule Contract with IBM Corp.
15+
#
16+
#================================================================================
17+
18+
#================================================================================
19+
# usage ./csm_db_rollup.py <archive directory> # DEFAULT :/var/log/ibm/csm/archive
20+
# current_version 1.0
21+
# date_created: 05-17-2019
22+
# date_modified: 05-17-2019
23+
#================================================================================
24+
import os
25+
import sys
26+
from datetime import datetime
27+
28+
DEFAULT_TARGET='''/var/log/ibm/csm/archive'''
29+
30+
def rollupDir (directory, archiveTarget="old"):
31+
''' Performs a rollup operation on a directory, designed to condense in to weeks of data.
32+
This requires the files provided are in the format `<table>.archive.<date>.json`
33+
34+
directory -- A directory containing a collection of archive files to condense (string).
35+
archiveTarget -- A directory to store the rollups in.
36+
'''
37+
# If the directory is not present return early.
38+
if not os.path.exists(directory):
39+
print("{0} does not exist".format(directory))
40+
return
41+
42+
# Create an archive directory as needed.
43+
archiveDir="{0}/{1}".format(directory,archiveTarget)
44+
if not os.path.exists(archiveDir):
45+
os.makedirs(archiveDir)
46+
47+
# Iterate over a list of strings
48+
files=[f for f in os.listdir(directory) if os.path.isfile("{0}/{1}".format(directory,f))]
49+
for f in files:
50+
# Skip Hidden Files.
51+
if f[0] == '.':
52+
continue
53+
54+
iFile="{0}/{1}".format(directory,f)
55+
56+
# If the file doesn't match our pattern ignore it.
57+
try:
58+
sFile=f.split(".")
59+
weekStr=datetime.strptime(sFile[2],"%Y-%m-%d").strftime("%Y-%U")
60+
oFile="{0}/{1}-{2}.json".format(archiveDir, sFile[0], weekStr)
61+
except:
62+
continue
63+
64+
# Get the contents first
65+
contents=""
66+
with open(iFile,'r') as inputFile:
67+
contents=inputFile.read()
68+
# remove and skip empties.
69+
if len(contents) == 0:
70+
os.remove(iFile)
71+
continue
72+
73+
# Archive the contents.
74+
with open(oFile, 'a') as ofile:
75+
ofile.write(contents)
76+
os.remove(iFile) # Remove only if the write was good!
77+
78+
def main(args):
79+
target = DEFAULT_TARGET
80+
if len(sys.argv) > 1:
81+
target=sys.argv[0]
82+
83+
rollupDir(target)
84+
85+
if __name__ == "__main__":
86+
sys.exit(main(sys.argv))

0 commit comments

Comments
 (0)