Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ def desktop_test(
)

error_vms = report.get_error_vm_list()
missing_vm_names = report.get_missing_vm_names(data.vm_names)
if missing_vm_names:
print(f"[red]|ERROR| The following VMs are missing in the report: {missing_vm_names}")
if len(error_vms) > 0:
print(f"[red]|ERROR| Tests for the following VMs have errors: {error_vms}")
else:
Expand Down
68 changes: 67 additions & 1 deletion tests/desktop_tests/tools/desktop_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,41 @@ def get_error_vm_list(self) -> list[str]:
df = self.report.read(self.path)
return df[df['Exit_code'] != 'Passed']['Vm_name'].unique()

def get_reported_vm_names(self, df: Optional[pd.DataFrame] = None) -> list[str]:
"""
Returns VM names from the report.
:param df: Optional report DataFrame.
:return: List of VM names.
"""
if df is None:
if not isfile(self.path):
raise FileNotFoundError(f"[red]|ERROR| Report not found: {self.path}")
df = self.report.read(self.path)

if df.empty or 'Vm_name' not in df.columns:
return []

vm_names = df['Vm_name'].dropna().astype(str).map(str.strip)
return self._unique_preserve_order([name for name in vm_names if name])

def get_missing_vm_names(self, expected_vm_names: list[str], df: Optional[pd.DataFrame] = None) -> list[str]:
"""
Returns expected VM names missing in the report.
:param expected_vm_names: List of expected VM names.
:param df: Optional report DataFrame.
:return: List of missing VM names.
"""
if not expected_vm_names:
return []

expected_clean = [str(name).strip() for name in expected_vm_names if name]
if not expected_clean:
return []

reported_vm_names = set(self.get_reported_vm_names(df=df))
missing_vm_names = [name for name in expected_clean if name not in reported_vm_names]
return self._unique_preserve_order(missing_vm_names)

def get_full(self, version: str) -> str:
File.delete(self.path, stdout=False) if isfile(self.path) else ...
self.report.merge(
Expand Down Expand Up @@ -134,7 +169,13 @@ def _get_os_name(row: pd.Series) -> str:
def exists(self) -> bool:
return isfile(self.path)

def send_to_tg(self, data):
def send_to_tg(self, data) -> None:
"""
Send report to Telegram.
:param data: DesktopTestData instance.
:param expected_vm_names: List of expected VM names to validate report completeness.
:return: None
"""
if not isfile(self.path):
return print(f"[red]|ERROR| Report for sending to telegram not exists: {self.path}")

Expand All @@ -145,6 +186,12 @@ def send_to_tg(self, data):
package_not_exists_os = self._get_os_list_by_status(df, self.portal_data.test_status.not_exists_package)
failed_create_vm_os = self._get_os_list_by_status(df, self.portal_data.test_status.failed_create_vm)

missing_vm_names = (
self.get_missing_vm_names(data.vm_names, df=df)
if data.vm_names
else []
)

caption_parts = [
f"{data.title} desktop editor tests completed on version: `{update_info}{data.version}`\n\n",
f"Package: `{data.package_name}`\n",
Expand All @@ -156,6 +203,9 @@ def send_to_tg(self, data):
if failed_create_vm_os:
caption_parts.append(f"Failed to create VM for OS: `{', '.join(failed_create_vm_os)}`\n\n")

if missing_vm_names:
caption_parts.append(f"Missing VMs in report: `{', '.join(missing_vm_names)}`\n\n")

caption_parts.append(f"Number of tested Os: `{self.get_total_count('Exit_code')}`")
caption = ''.join(caption_parts)
Telegram(token=data.tg_token, chat_id=data.tg_chat_id).send_document(self.path, caption=caption)
Expand Down Expand Up @@ -186,6 +236,22 @@ def _writer(self, mode: str, message: list, delimiter='\t', encoding='utf-8'):
def _write_titles(self):
self._writer(mode='w', message=['Os', 'Vm_name', 'Version', 'Test_name', 'Package_name', 'Exit_code'])

@staticmethod
def _unique_preserve_order(items: list[str]) -> list[str]:
"""
Remove duplicates while preserving order.
:param items: List of items.
:return: Unique list.
"""
result = []
seen = set()
for item in items:
if item in seen:
continue
seen.add(item)
result.append(item)
return result

@staticmethod
def _get_thread_result(future):
"""
Expand Down