-
-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathdatabase.py
481 lines (407 loc) · 16.9 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
#handles the database interactions
import os, json, datetime, re
import sqlite3 as sql
from random import sample
data_dir = "data"
def create_connection():
#create connection, create db if doesn't exist
conn = None
try:
conn = sql.connect(os.path.join(data_dir, "matteo.db"))
# enable write-ahead log for performance and resilience
conn.execute('pragma journal_mode=wal')
return conn
except:
print("oops, db connection no work")
return conn
def initialcheck():
conn = create_connection()
soulscream_table_check_string = """ CREATE TABLE IF NOT EXISTS soulscreams (
counter integer PRIMARY KEY,
name text NOT NULL,
soulscream text NOT NULL,
timestamp text NOT NULL
); """
player_cache_table_check_string = """ CREATE TABLE IF NOT EXISTS players (
counter integer PRIMARY KEY,
name text NOT NULL,
json_string text NOT NULL,
timestamp text NOT NULL
); """
player_table_check_string = """ CREATE TABLE IF NOT EXISTS user_designated_players (
user_id integer PRIMARY KEY,
user_name text,
player_id text NOT NULL,
player_name text NOT NULL,
player_json_string text NOT NULL
);"""
player_stats_table_check_string = """ CREATE TABLE IF NOT EXISTS stats (
counter integer PRIMARY KEY,
id text,
name text,
json_string text,
outs_pitched integer DEFAULT 0,
walks_allowed integer DEFAULT 0,
hits_allowed integer DEFAULT 0,
strikeouts_given integer DEFAULT 0,
runs_allowed integer DEFAULT 0,
plate_appearances integer DEFAULT 0,
walks_taken integer DEFAULT 0,
sacrifices integer DEFAULT 0,
hits integer DEFAULT 0,
home_runs integer DEFAULT 0,
total_bases integer DEFAULT 0,
rbis integer DEFAULT 0,
strikeouts_taken integer DEFAULT 0
);"""
teams_table_check_string = """ CREATE TABLE IF NOT EXISTS teams (
counter integer PRIMARY KEY,
name text NOT NULL,
team_json_string text NOT NULL,
timestamp text NOT NULL,
owner_id integer
); """
one_big_league_check_string = """ CREATE TABLE IF NOT EXISTS one_big_league (
counter integer PRIMARY KEY,
team_name text NOT NULL,
teams_beaten_list text,
current_opponent_pool text,
obl_points int DEFAULT 0,
rival_name text
);"""
if conn is not None:
c = conn.cursor()
c.execute(soulscream_table_check_string)
c.execute(player_cache_table_check_string)
c.execute(player_table_check_string)
c.execute(player_stats_table_check_string)
c.execute(teams_table_check_string)
c.execute(one_big_league_check_string)
conn.commit()
conn.close()
def get_stats(player_name):
conn = create_connection()
if conn is not None:
c = conn.cursor()
c.execute("SELECT * FROM players WHERE name=?", (player_name,))
player = c.fetchone()
try:
cachetime = datetime.datetime.fromisoformat(player[3])
if datetime.datetime.now(datetime.timezone.utc) - cachetime >= datetime.timedelta(days = 7):
#delete old cache
c.execute("DELETE FROM players WHERE name=?", (player_name,))
conn.commit()
conn.close()
return None
except TypeError:
conn.close()
return None
conn.close()
return player[2] #returns a json_string
conn.close()
return None
def cache_stats(name, json_string):
conn = create_connection()
store_string = """ INSERT INTO players(name, json_string, timestamp)
VALUES (?,?, ?) """
if conn is not None:
c = conn.cursor()
c.execute(store_string, (name, json_string, datetime.datetime.now(datetime.timezone.utc)))
conn.commit()
conn.close()
def get_soulscream(username):
conn = create_connection()
#returns none if not found or more than 3 days old
if conn is not None:
c = conn.cursor()
c.execute("SELECT * FROM soulscreams WHERE name=?", (username,))
scream = c.fetchone()
try:
cachetime = datetime.datetime.fromisoformat(scream[3])
if datetime.datetime.now(datetime.timezone.utc) - cachetime >= datetime.timedelta(days = 7):
#delete old cache
c.execute("DELETE FROM soulscreams WHERE name=?", (username,))
conn.commit()
conn.close()
return None
except TypeError:
conn.close()
return None
conn.close()
return scream[2]
conn.close()
return None
def cache_soulscream(username, soulscream):
conn = create_connection()
store_string = """ INSERT INTO soulscreams(name, soulscream, timestamp)
VALUES (?,?, ?) """
if conn is not None:
c = conn.cursor()
c.execute(store_string, (username, soulscream, datetime.datetime.now(datetime.timezone.utc)))
conn.commit()
conn.close()
def designate_player(user, player_json):
conn = create_connection()
store_string = """ INSERT INTO user_designated_players(user_id, user_name, player_id, player_name, player_json_string)
VALUES (?, ?, ?, ?, ?)"""
user_player = get_user_player_conn(conn, user)
c = conn.cursor()
if user_player is not None:
c.execute("DELETE FROM user_designated_players WHERE user_id=?", (user.id,)) #delete player if already exists
c.execute(store_string, (user.id, user.name, player_json["id"], player_json["name"], json.dumps(player_json)))
conn.commit()
conn.close()
def get_user_player_conn(conn, user):
try:
if conn is not None:
c = conn.cursor()
c.execute("SELECT player_json_string FROM user_designated_players WHERE user_id=?", (user.id,))
try:
return json.loads(c.fetchone()[0])
except TypeError:
return False
else:
conn.close()
return False
except:
conn.close()
return False
conn.close()
return False
def get_user_player(user):
conn = create_connection()
player = get_user_player_conn(conn, user)
conn.close()
return player
def save_team(name, team_json_string, user_id):
conn = create_connection()
try:
if conn is not None:
c = conn.cursor()
store_string = """ INSERT INTO teams(name, team_json_string, timestamp, owner_id)
VALUES (?,?, ?, ?) """
c.execute(store_string, (re.sub('[^A-Za-z0-9 ]+', '', name), team_json_string, datetime.datetime.now(datetime.timezone.utc), user_id)) #this regex removes all non-standard characters
conn.commit()
conn.close()
return True
conn.close()
return False
except:
return False
conn.close()
return False
def update_team(name, team_json_string):
conn = create_connection()
try:
if conn is not None:
c = conn.cursor()
store_string = "UPDATE teams SET team_json_string = ? WHERE name=?"
c.execute(store_string, (team_json_string, (re.sub('[^A-Za-z0-9 ]+', '', name)))) #this regex removes all non-standard characters
conn.commit()
conn.close()
return True
conn.close()
return False
except:
conn.close()
return False
conn.close()
return False
def get_team(name, owner=False):
conn = create_connection()
if conn is not None:
c = conn.cursor()
if not owner:
c.execute("SELECT team_json_string FROM teams WHERE name=?", (re.sub('[^A-Za-z0-9 ]+', '', name),)) #see above note re: regex
else:
c.execute("SELECT * FROM teams WHERE name=?", (re.sub('[^A-Za-z0-9 ]+', '', name),)) #see above note re: regex
team = c.fetchone()
conn.close()
return team #returns a json string if owner is false, otherwise returns (counter, name, team_json_string, timestamp, owner_id)
conn.close()
return None
def delete_team(team):
conn = create_connection()
if conn is not None:
try:
c = conn.cursor()
c.execute("DELETE FROM teams WHERE name=?", (re.sub('[^A-Za-z0-9 ]+', '', team.name),))
conn.commit()
conn.close()
return True
except:
conn.close()
return False
conn.close()
return False
def assign_owner(team_name, owner_id):
conn = create_connection()
if conn is not None:
try:
c = conn.cursor()
c.execute("UPDATE teams SET owner_id = ? WHERE name = ?",(owner_id, re.sub('[^A-Za-z0-9 ]+', '', team_name)))
conn.commit()
conn.close()
return True
except:
conn.close()
return False
conn.close()
return False
def get_all_teams():
conn = create_connection()
if conn is not None:
c = conn.cursor()
c.execute("SELECT team_json_string FROM teams")
team_strings = c.fetchall()
conn.close()
return team_strings
conn.close()
return None
def get_all_team_names():
conn = create_connection()
if conn is not None:
c = conn.cursor()
c.execute("SELECT name FROM teams")
team_names = c.fetchall()
team_names_out = [name for (name,) in team_names]
conn.close()
return team_names_out
conn.close()
return None
def get_filtered_teams(i_filter_list):
teams_list = get_all_team_names()
out_list = []
filter_list = [re.sub('[^A-Za-z0-9 %]+', '', filter_team) for filter_team in i_filter_list]
for team in teams_list:
if re.sub('[^A-Za-z0-9 %]+', '', team) not in filter_list:
out_list.append(team)
return out_list
def search_teams(search_string):
conn = create_connection()
if conn is not None:
c = conn.cursor()
c.execute("SELECT team_json_string, counter FROM teams WHERE name LIKE ?",(re.sub('[^A-Za-z0-9 %]+', '', f"%{search_string}%"),))
team_json_strings = c.fetchall()
conn.close()
return team_json_strings
conn.close()
return None
def add_stats(player_game_stats_list):
conn = create_connection()
if conn is not None:
c=conn.cursor()
for (name, player_stats_dic) in player_game_stats_list:
c.execute("SELECT * FROM stats WHERE name=?",(name,))
this_player = c.fetchone()
if this_player is not None:
for stat in player_stats_dic.keys():
c.execute(f"SELECT {stat} FROM stats WHERE name=?",(name,))
old_value = int(c.fetchone()[0])
c.execute(f"UPDATE stats SET {stat} = ? WHERE name=?",(player_stats_dic[stat]+old_value,name))
else:
c.execute("INSERT INTO stats(name) VALUES (?)",(name,))
for stat in player_stats_dic.keys():
c.execute(f"UPDATE stats SET {stat} = ? WHERE name=?",(player_stats_dic[stat],name))
conn.commit()
conn.close()
def add_team_obl(team):
conn = create_connection()
if conn is not None:
c=conn.cursor()
opponents = sample(get_filtered_teams([team.name]), 5)
c.execute("INSERT INTO one_big_league(team_name, current_opponent_pool) VALUES (?, ?)", (team.name, list_to_newline_string(opponents)))
conn.commit()
conn.close()
def save_obl_results(winning_team, losing_team, xvi_team = None):
conn = create_connection()
if conn is not None:
c=conn.cursor()
try:
c.execute("SELECT teams_beaten_list, current_opponent_pool, obl_points FROM one_big_league WHERE team_name = ?", (winning_team.name,))
beaten_string, opponents_string, obl_points = c.fetchone()
except:
return
beaten_teams = newline_string_to_list(beaten_string)
opponent_teams = newline_string_to_list(opponents_string)
if re.sub('[^A-Za-z0-9 %]+', '', losing_team.name) in opponent_teams:
beaten_teams.append(losing_team.name)
try:
opponent_teams = sample(get_filtered_teams([winning_team.name] + beaten_teams), 5)
except ValueError:
opponent_teams = get_filtered_teams([winning_team.name] + beaten_teams)
obl_points += 1
c.execute("UPDATE one_big_league SET teams_beaten_list = ?, current_opponent_pool = ?, obl_points = ? WHERE team_name = ?", (list_to_newline_string(beaten_teams), list_to_newline_string(opponent_teams), obl_points, winning_team.name))
conn.commit()
conn.close()
if xvi_team is not None:
add_obl_point(xvi_team)
return
def add_obl_point(team):
conn = create_connection()
if conn is not None:
c=conn.cursor()
c.execute("SELECT obl_points FROM one_big_league WHERE team_name = ?", (team.name,))
xvi_obl_points = c.fetchone()[0]
xvi_obl_points += 1
c.execute("UPDATE one_big_league SET obl_points = ? WHERE team_name = ?", (xvi_obl_points, team.name))
conn.commit()
conn.close()
return
def get_obl_stats(team, full = False):
conn = create_connection()
if conn is not None:
c=conn.cursor()
opponents_string = None
while opponents_string is None:
c.execute("SELECT teams_beaten_list, current_opponent_pool, rival_name, obl_points FROM one_big_league WHERE team_name = ?", (team.name,))
try:
beaten_string, opponents_string, rival_name, obl_points = c.fetchone()
except TypeError: #add team to OBL
obl_points = 0
add_team_obl(team)
beaten_teams = newline_string_to_list(beaten_string)
opponent_teams = opponents_string
teams_list = [name for name, points in obl_leaderboards()]
rank = teams_list.index(team.name) + 1
if not full:
return (obl_points, opponent_teams, rank)
else:
return (obl_points, beaten_teams, opponent_teams, rank, rival_name)
conn.close()
return (None, None)
def obl_leaderboards():
conn = create_connection()
if conn is not None:
c=conn.cursor()
c.execute("SELECT team_name, obl_points FROM one_big_league ORDER BY obl_points DESC")
teams_list = c.fetchall()
return teams_list #element (team_name, obl_points)
conn.close()
return False
def set_obl_rival(base_team, rival):
conn = create_connection()
if conn is not None:
c=conn.cursor()
c.execute("UPDATE one_big_league SET rival_name = ? WHERE team_name = ?", (rival.name, base_team.name))
conn.commit()
conn.close()
def clear_obl():
conn = create_connection()
if conn is not None:
c=conn.cursor()
c.execute("DELETE FROM one_big_league")
conn.commit()
conn.close()
def list_to_newline_string(list):
string = ""
for element in list:
if string != "":
string += "\n"
string += element
return string
def newline_string_to_list(string):
if string is not None and string != "":
return string.split("\n")
else:
return []