forked from Cog-Creators/Red-DiscordBot
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathalias_entry.py
More file actions
281 lines (241 loc) · 10.2 KB
/
alias_entry.py
File metadata and controls
281 lines (241 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
from typing import Tuple, Dict, Optional, List, Union
from re import findall
import discord
from discord.ext.commands.view import StringView # DEP-WARN
from redbot.core import commands, Config
from redbot.core.i18n import Translator
from redbot.core.utils import AsyncIter
_ = Translator("Alias", __file__)
class ArgParseError(Exception):
pass
class AliasEntry:
"""An object containing all required information about an alias"""
name: str
command: Union[Tuple[str], str]
creator: int
guild: Optional[int]
uses: int
def __init__(
self, name: str, command: Union[Tuple[str], str], creator: int, guild: Optional[int]
):
super().__init__()
self.name = name
self.command = command
self.creator = creator
self.guild = guild
self.uses = 0
def inc(self):
"""
Increases the `uses` stat by 1.
:return: new use count
"""
self.uses += 1
return self.uses
def get_extra_args_from_alias(self, message: discord.Message, prefix: str) -> str:
"""
When an alias is executed by a user in chat this function tries
to get any extra arguments passed in with the call.
Whitespace will be trimmed from both ends.
:param message:
:param prefix:
:param alias:
:return:
"""
known_content_length = len(prefix) + len(self.name)
extra = message.content[known_content_length:]
view = StringView(extra)
view.skip_ws()
extra = []
while not view.eof:
prev = view.index
try:
word = view.get_quoted_word()
except discord.ext.commands.errors.UnexpectedQuoteError:
view.skip_ws()
continue
if len(word) < view.index - prev:
word = "".join((view.buffer[prev], word, view.buffer[view.index - 1]))
extra.append(word.strip(" "))
return extra
def to_json(self) -> dict:
return {
"name": self.name,
"command": self.command,
"creator": self.creator,
"guild": self.guild,
"uses": self.uses,
}
@classmethod
def from_json(cls, data: dict):
ret = cls(data["name"], data["command"], data["creator"], data["guild"])
ret.uses = data.get("uses", 0)
return ret
class AliasCache:
def __init__(self, config: Config, cache_enabled: bool = True):
self.config = config
self._cache_enabled = cache_enabled
self._loaded = False
self._aliases: Dict[Optional[int], Dict[str, AliasEntry]] = {None: {}}
async def anonymize_aliases(self, user_id: int):
async with self.config.entries() as global_aliases:
for a in global_aliases:
if a.get("creator", 0) == user_id:
a["creator"] = 0xDE1
if self._cache_enabled:
self._aliases[None][a["name"]] = AliasEntry.from_json(a)
all_guilds = await self.config.all_guilds()
async for guild_id, guild_data in AsyncIter(all_guilds.items(), steps=100):
for a in guild_data["entries"]:
if a.get("creator", 0) == user_id:
break
else:
continue
# basically, don't build a context manager without a need.
async with self.config.guild_from_id(guild_id).entries() as entry_list:
for a in entry_list:
if a.get("creator", 0) == user_id:
a["creator"] = 0xDE1
if self._cache_enabled:
self._aliases[guild_id][a["name"]] = AliasEntry.from_json(a)
async def load_aliases(self):
if not self._cache_enabled:
self._loaded = True
return
for alias in await self.config.entries():
self._aliases[None][alias["name"]] = AliasEntry.from_json(alias)
all_guilds = await self.config.all_guilds()
async for guild_id, guild_data in AsyncIter(all_guilds.items(), steps=100):
if guild_id not in self._aliases:
self._aliases[guild_id] = {}
for alias in guild_data["entries"]:
self._aliases[guild_id][alias["name"]] = AliasEntry.from_json(alias)
self._loaded = True
async def get_aliases(self, ctx: commands.Context) -> List[AliasEntry]:
"""Returns all possible aliases with the given context"""
global_aliases: List[AliasEntry] = []
server_aliases: List[AliasEntry] = []
global_aliases = await self.get_global_aliases()
if ctx.guild and ctx.guild.id in self._aliases:
server_aliases = await self.get_guild_aliases(ctx.guild)
return global_aliases + server_aliases
async def get_guild_aliases(self, guild: discord.Guild) -> List[AliasEntry]:
"""Returns all guild specific aliases"""
aliases: List[AliasEntry] = []
if self._cache_enabled:
if guild.id in self._aliases:
for _, alias in self._aliases[guild.id].items():
aliases.append(alias)
else:
aliases = [AliasEntry.from_json(d) for d in await self.config.guild(guild).entries()]
return aliases
async def get_global_aliases(self) -> List[AliasEntry]:
"""Returns all global specific aliases"""
aliases: List[AliasEntry] = []
if self._cache_enabled:
for _, alias in self._aliases[None].items():
aliases.append(alias)
else:
aliases = [AliasEntry.from_json(d) for d in await self.config.entries()]
return aliases
async def get_alias(
self, guild: Optional[discord.Guild], alias_name: str
) -> Optional[AliasEntry]:
"""Returns an AliasEntry object if the provided alias_name is a registered alias"""
server_aliases: List[AliasEntry] = []
if self._cache_enabled:
if alias_name in self._aliases[None]:
return self._aliases[None][alias_name]
if guild is not None:
if guild.id in self._aliases:
if alias_name in self._aliases[guild.id]:
return self._aliases[guild.id][alias_name]
else:
if guild:
server_aliases = [
AliasEntry.from_json(d) for d in await self.config.guild(guild.id).entries()
]
global_aliases = [AliasEntry.from_json(d) for d in await self.config.entries()]
all_aliases = global_aliases + server_aliases
for alias in all_aliases:
if alias.name == alias_name:
return alias
return None
@staticmethod
def format_command_for_alias(command: str) -> str:
# This was present in add_alias previously
# Made this into a separate method so as to reuse the same code in edit_alias
indices = findall(r"{(\d*)}", command)
if indices:
try:
indices = [int(a[0]) for a in indices]
except IndexError:
raise ArgParseError(_("Arguments must be specified with a number."))
low = min(indices)
indices = [a - low for a in indices]
high = max(indices)
gaps = set(indices).symmetric_difference(range(high + 1))
if gaps:
raise ArgParseError(
_("Arguments must be sequential. Missing arguments: ")
+ ", ".join(str(i + low) for i in gaps)
)
command = command.format(*(f"{{{i}}}" for i in range(-low, high + low + 1)))
return command
async def add_alias(
self, ctx: commands.Context, alias_name: str, command: str, global_: bool = False
) -> AliasEntry:
command = self.format_command_for_alias(command)
if global_:
alias = AliasEntry(alias_name, command, ctx.author.id, None)
settings = self.config
if self._cache_enabled:
self._aliases[None][alias.name] = alias
else:
alias = AliasEntry(alias_name, command, ctx.author.id, ctx.guild.id)
settings = self.config.guild(ctx.guild)
if self._cache_enabled:
if ctx.guild.id not in self._aliases:
self._aliases[ctx.guild.id] = {}
self._aliases[ctx.guild.id][alias.name] = alias
async with settings.entries() as curr_aliases:
curr_aliases.append(alias.to_json())
return alias
async def edit_alias(
self, ctx: commands.Context, alias_name: str, command: str, global_: bool = False
) -> bool:
command = self.format_command_for_alias(command)
if global_:
settings = self.config
else:
settings = self.config.guild(ctx.guild)
async with settings.entries() as aliases:
for index, alias in enumerate(aliases):
if alias["name"] == alias_name:
alias_edited = AliasEntry.from_json(alias)
alias_edited.command = command
aliases[index] = alias_edited.to_json()
if self._cache_enabled:
if global_:
self._aliases[None][alias_edited.name] = alias_edited
else:
self._aliases[ctx.guild.id][alias_edited.name] = alias_edited
return True
return False
async def delete_alias(
self, ctx: commands.Context, alias_name: str, global_: bool = False
) -> bool:
if global_:
settings = self.config
else:
settings = self.config.guild(ctx.guild)
async with settings.entries() as aliases:
for alias in aliases:
if alias["name"] == alias_name:
aliases.remove(alias)
if self._cache_enabled:
if global_:
del self._aliases[None][alias_name]
else:
del self._aliases[ctx.guild.id][alias_name]
return True
return False