-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
198 lines (164 loc) · 6.56 KB
/
main.py
File metadata and controls
198 lines (164 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from asyncio import sleep
import logging
from discord import Intents, Activity, ActivityType, Interaction, app_commands, FFmpegPCMAudio, ui, ButtonStyle, VoiceChannel
from discord.ext import commands
import utils as u
from config import Config
# region logging-init
# init logger
l = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
root_logger = logging.getLogger()
root_logger.handlers.clear() # clear default handler
# set stream handler
shandler = logging.StreamHandler()
shandler.setFormatter(u.CustomFormatter(colorful=True))
root_logger.addHandler(shandler)
# init config
c = Config().config
# continue init logger
root_logger.level = logging.DEBUG if c.debug else logging.INFO # set log level
# reset stream handler
root_logger.handlers.clear()
shandler = logging.StreamHandler()
shandler.setFormatter(u.CustomFormatter(colorful=True))
root_logger.addHandler(shandler)
# set file handler
if c.log_file:
log_file_path = u.get_path(c.log_file)
l.info(f'Saving logs to {log_file_path}')
fhandler = logging.FileHandler(log_file_path, encoding='utf-8', errors='ignore')
fhandler.setFormatter(u.CustomFormatter(colorful=False))
root_logger.addHandler(fhandler)
# endregion logging-init
bot = commands.Bot("/", intents=Intents.all(), proxy=c.proxy)
ids = []
names = []
playing = False
playlist_id = []
playlist_name = []
@bot.event
async def on_ready():
activity = Activity(name="Type / and select this bot to start music player.", type=ActivityType.playing)
await bot.change_presence(activity=activity)
await bot.tree.sync()
# player = app_commands.Group(name="player", description="A simple player to play music into Voice Channel.")
# bot.tree.add_command(player)
@bot.tree.command(name="search", description="Search music by name.")
@app_commands.describe(music_name="Music Name")
async def search(interaction: Interaction, music_name: str):
await interaction.response.send_message(f"Searching for `{music_name}`...")
global ids
global names
ids = []
names = []
content = await do_search(music_name, 0, False)
view = Control(music_name)
await interaction.edit_original_response(content=content, view=view)
offset = 0
class Control(ui.View):
def __init__(self, music_name):
global m_name
m_name = music_name
super().__init__()
@ui.button(label="Previous", style=ButtonStyle.danger, emoji="👈")
async def approve(self, interaction: Interaction, button: ui.Button):
global offset
await interaction.response.defer()
if not offset == 0:
offset -= 30
content = await do_search(m_name, offset, True)
view = Control(m_name)
await interaction.edit_original_response(content=content, view=view)
@ui.button(label="Next", style=ButtonStyle.success, emoji="👉")
async def reject(self, interaction: Interaction, button: ui.Button):
global offset
await interaction.response.defer()
offset += 30
content = await do_search(m_name, offset, False)
view = Control(m_name)
await interaction.edit_original_response(content=content, view=view)
num = 0
async def do_search(music_name: str, offset: int, previous: bool) -> str:
global num
search = await u.get_json(f"{c.ncm_api}/cloudsearch?keywords={music_name}&limit=30&offset={offset}&type=1")
if not search:
return "Search failed!"
messages = "**Search result:**\n"
if search["code"] == 200:
for songs in search["result"]["songs"]:
ids.append(songs["id"])
name = songs["ar"][0]["name"] + " - " + songs["name"]
names.append(name)
messages += str(num) + ". " + name + "\n"
if not previous:
num += 1
else:
num -= 1
messages += "**Type /play <music_num> to start player.**"
return messages
else:
return "**No search result. Check your music name and try again**"
@bot.tree.command(name="list", description="See the playlist.")
async def list(interaction: Interaction):
global playlist_id
global playlist_name
if not playlist_id or not playlist_name:
await interaction.response.send_message("**No items in the playlist**")
return
messages = "**Playlist:**\n"
num = 0
for playlist in playlist_name:
ids.append(playlist)
messages += str(num) + ". " + playlist + "\n"
num += 1
await interaction.response.send_message(messages)
@bot.tree.command(name="play", description="Play a music into Voice Channel.")
@app_commands.describe(music_num="Music Num")
async def play(interaction: Interaction, music_num: int, channel: VoiceChannel | None = None):
global ids
global names
global playing
if not ids or not names:
await interaction.response.send_message("**Please search a music first!**")
return
music_id = ids[music_num - 1]
music_name = names[music_num - 1]
if channel:
voice_chan = channel
else:
try:
voice_chan: VoiceChannel = interaction.user.voice.channel # type: ignore
except AttributeError:
await interaction.response.send_message("**Please join the Voice Channel first, or specific a voice channel manually!**")
return
playlist_id.append(music_id)
playlist_name.append(music_name)
if playing:
await interaction.response.send_message(f"Added `{music_name}` to playlist.")
return
await interaction.response.send_message("Starting player...")
# lyric = await u.get_json(f"{c.ncm_api}/lyric/new?id={music_id}")
# if lyric:
# music_lrc = lyric["lrc"]["lyric"]
voice_cli = await voice_chan.connect()
while True:
if playlist_id:
await start_play(playlist_id.pop(0), playlist_name.pop(0), interaction, voice_cli)
else:
playing = False
await voice_cli.disconnect()
break
async def start_play(music_id, music_name, interaction: Interaction, voice_cli):
global playing
match = await u.get_json(f"{c.unm_api}/match?id={music_id}&server=qq,pyncmd")
if match and match["message"] == "匹配成功":
audio = FFmpegPCMAudio(match["data"]["url"])
voice_cli.play(audio)
playing = True
await interaction.edit_original_response(content=f"Player started.")
await interaction.channel.send(content=f"Now playing: {music_name}") # type: ignore
while voice_cli.is_playing():
await sleep(1)
await interaction.edit_original_response(content="Player stopped.")
bot.run(c.token)