11import os
22import zipfile
33
4+ import yaml
5+
46from astrbot .core import logger
57from astrbot .core .utils .astrbot_path import get_astrbot_plugin_path
68from astrbot .core .utils .io import ensure_dir , remove_dir
79
810from ..star .star import StarMetadata
911from ..updator import RepoZipUpdator
1012
13+ PLUGIN_METADATA_FILENAMES = ("metadata.yaml" , "metadata.yml" )
14+ PLUGIN_METADATA_REQUIRED_FIELDS = ("name" , "desc" , "version" , "author" )
15+
1116
1217class PluginUpdator (RepoZipUpdator ):
1318 def __init__ (self , repo_mirror : str = "" , verify : str | bool | None = None ) -> None :
@@ -58,6 +63,7 @@ async def update(
5863 elif repo_url :
5964 await self .download_from_repo_url (plugin_path , repo_url , proxy = proxy )
6065
66+ self .validate_plugin_archive (plugin_path + ".zip" )
6167 try :
6268 remove_dir (plugin_path )
6369 except BaseException as e :
@@ -69,7 +75,135 @@ async def update(
6975
7076 return plugin_path
7177
78+ @classmethod
79+ def find_plugin_metadata_entry (cls , entries : list [str ]) -> str | None :
80+ """Find AstrBot plugin metadata in archive entries.
81+
82+ Args:
83+ entries: Zip archive member names.
84+
85+ Returns:
86+ The original archive entry name for plugin metadata, or None.
87+ """
88+ update_dir = cls ._resolve_archive_root_dir (entries )
89+ portable_update_dir = os .path .normpath (update_dir ).replace ("\\ " , "/" )
90+ if portable_update_dir == "." :
91+ portable_update_dir = ""
92+
93+ entries_by_portable_path = {}
94+ for entry in entries :
95+ portable_entry = os .path .normpath (entry ).replace ("\\ " , "/" )
96+ if portable_entry in ("" , "." ):
97+ continue
98+ entries_by_portable_path [portable_entry ] = entry
99+
100+ metadata_candidates = (
101+ [
102+ f"{ portable_update_dir } /{ filename } "
103+ for filename in PLUGIN_METADATA_FILENAMES
104+ ]
105+ if portable_update_dir
106+ else list (PLUGIN_METADATA_FILENAMES )
107+ )
108+ for candidate in metadata_candidates :
109+ if candidate in entries_by_portable_path :
110+ return entries_by_portable_path [candidate ]
111+ return None
112+
113+ @staticmethod
114+ def validate_plugin_metadata (metadata : object , metadata_label : str ) -> None :
115+ """Validate AstrBot plugin metadata content.
116+
117+ Args:
118+ metadata: Parsed metadata YAML content.
119+ metadata_label: Metadata filename or archive entry for error messages.
120+
121+ Raises:
122+ ValueError: If metadata is malformed or misses required fields.
123+ """
124+ if not isinstance (metadata , dict ):
125+ raise ValueError (f"{ metadata_label } 格式错误。" )
126+
127+ normalized_metadata = dict (metadata )
128+ if "desc" not in normalized_metadata and "description" in normalized_metadata :
129+ normalized_metadata ["desc" ] = normalized_metadata ["description" ]
130+
131+ missing_fields = [
132+ field
133+ for field in PLUGIN_METADATA_REQUIRED_FIELDS
134+ if field not in normalized_metadata
135+ ]
136+ if missing_fields :
137+ raise ValueError (
138+ f"{ metadata_label } 中缺少必需字段: { ', ' .join (missing_fields )} 。"
139+ )
140+
141+ invalid_fields = [
142+ field
143+ for field in PLUGIN_METADATA_REQUIRED_FIELDS
144+ if not isinstance (normalized_metadata [field ], str )
145+ or not normalized_metadata [field ].strip ()
146+ ]
147+ if invalid_fields :
148+ raise ValueError (
149+ f"{ metadata_label } 中字段 { ', ' .join (invalid_fields )} 必须是非空字符串。"
150+ )
151+
152+ @classmethod
153+ def inspect_plugin_archive (cls , zip_path : str ) -> dict [str , object ]:
154+ """Inspect plugin metadata in an AstrBot plugin archive.
155+
156+ Args:
157+ zip_path: Path to the plugin archive.
158+
159+ Returns:
160+ A dict containing the metadata entry and parsed metadata.
161+
162+ Raises:
163+ ValueError: If the archive is not a valid AstrBot plugin.
164+ """
165+ try :
166+ with zipfile .ZipFile (zip_path , "r" ) as z :
167+ metadata_entry = cls .find_plugin_metadata_entry (z .namelist ())
168+ if metadata_entry is None :
169+ raise ValueError (
170+ "压缩包不是合法的 AstrBot 插件:未找到 metadata.yaml 或 metadata.yml。"
171+ )
172+
173+ try :
174+ metadata_text = z .read (metadata_entry ).decode ("utf-8" )
175+ metadata = yaml .safe_load (metadata_text )
176+ except UnicodeDecodeError as exc :
177+ raise ValueError (f"{ metadata_entry } 必须使用 UTF-8 编码。" ) from exc
178+ except yaml .YAMLError as exc :
179+ raise ValueError (f"{ metadata_entry } 格式错误。" ) from exc
180+
181+ cls .validate_plugin_metadata (metadata , metadata_entry )
182+ return {
183+ "metadata_entry" : metadata_entry ,
184+ "metadata" : metadata ,
185+ }
186+ except zipfile .BadZipFile as exc :
187+ raise ValueError ("插件压缩包格式错误。" ) from exc
188+
189+ @classmethod
190+ def validate_plugin_archive (cls , zip_path : str ) -> str :
191+ """Validate that an archive contains a valid AstrBot plugin.
192+
193+ Args:
194+ zip_path: Path to the plugin archive.
195+
196+ Returns:
197+ The archive entry name of the plugin metadata file.
198+
199+ Raises:
200+ ValueError: If the archive is not a valid AstrBot plugin.
201+ """
202+ inspection = cls .inspect_plugin_archive (zip_path )
203+ return str (inspection ["metadata_entry" ])
204+
72205 def unzip_file (self , zip_path : str , target_dir : str ) -> None :
206+ self .validate_plugin_archive (zip_path )
73207 ensure_dir (target_dir )
74208 logger .info (f"Extracting archive: { zip_path } " )
75209 with zipfile .ZipFile (zip_path , "r" ) as z :
0 commit comments