11import os
22import socket
3+ import subprocess
34import sys
5+ import time
6+ from datetime import datetime as dt
47from unittest .mock import Mock
58
69import pytest
10+ from packaging .version import Version
711from peewee import SqliteDatabase
12+ from PyQt6 .QtCore import QCoreApplication
813
914import vorta
1015import vorta .application
1116import vorta .borg .jobs_manager
1217
18+ # Used for conditional setup depending on test context
19+ from tests .unit .test_constants import TEST_SOURCE_DIR , TEST_TEMP_DIR
20+ from vorta .store .models import (
21+ ArchiveModel ,
22+ BackupProfileModel ,
23+ EventLogModel ,
24+ RepoModel ,
25+ RepoPassword ,
26+ SchemaVersion ,
27+ SettingsModel ,
28+ SourceFileModel ,
29+ WifiSettingModel ,
30+ )
31+ from vorta .utils import borg_compat
32+ from vorta .views .main_window import ArchiveTab , MainWindow
33+
34+
35+ def disconnect_all (signal ):
36+ """
37+ Disconnect ALL handlers from a Qt signal.
38+ Unlike signal.disconnect() without arguments which only disconnects ONE handler,
39+ this function disconnects all connected handlers by calling disconnect in a loop
40+ until TypeError is raised (indicating no more handlers are connected).
41+ """
42+ while True :
43+ try :
44+ signal .disconnect ()
45+ except TypeError :
46+ # No more handlers connected
47+ break
48+
49+
50+ def all_workers_finished (jobs_manager ):
51+ """
52+ Check if all worker threads have actually exited.
53+ This is more thorough than is_worker_running() which only checks current_job,
54+ because threads may still be alive briefly after current_job is set to None.
55+ """
56+ for worker in jobs_manager .workers .values ():
57+ if worker .is_alive ():
58+ return False
59+ return True
60+
61+
62+ models = [
63+ RepoModel ,
64+ RepoPassword ,
65+ BackupProfileModel ,
66+ SourceFileModel ,
67+ SettingsModel ,
68+ ArchiveModel ,
69+ WifiSettingModel ,
70+ EventLogModel ,
71+ SchemaVersion ,
72+ ]
73+
74+
75+ def load_window (qapp : vorta .application .VortaApp ):
76+ """
77+ Reload the main window of the given application.
78+ Used to repopulate fields after loading mock data.
79+ """
80+ qapp .main_window .deleteLater ()
81+ # Skip QCoreApplication.processEvents() - it can trigger D-Bus operations that hang in CI.
82+ # Use a small sleep instead to allow deleteLater to be processed.
83+ time .sleep (0.1 )
84+ del qapp .main_window
85+ qapp .main_window = MainWindow (qapp )
86+
1387
1488def pytest_configure (config ):
1589 pytest ._wait_defaults = {'timeout' : 20000 }
@@ -56,6 +130,12 @@ def _mock_get_network_status_monitor():
56130
57131 vorta .utils .get_network_status_monitor = _mock_get_network_status_monitor
58132
133+ # Add custom markers
134+ config .addinivalue_line (
135+ "markers" ,
136+ "min_borg_version(): set minimum required borg version for a test" ,
137+ )
138+
59139
60140@pytest .fixture (scope = 'session' )
61141def qapp (tmpdir_factory ):
@@ -82,3 +162,280 @@ def qapp(tmpdir_factory):
82162 yield qapp
83163 mock_db .close ()
84164 qapp .quit ()
165+
166+
167+ @pytest .fixture (scope = 'function' , autouse = True )
168+ def borg_version (request ):
169+ """
170+ Determine the real borg version if this is an integration test.
171+ Otherwise, inject a dummy version to save time and dependencies.
172+ """
173+ is_integration = "tests/integration/" in request .node .nodeid
174+ if not is_integration :
175+ return '1.2.4' , Version ('1.2.4' )
176+
177+ borg_ver = os .getenv ('BORG_VERSION' )
178+ if not borg_ver :
179+ borg_ver = subprocess .run (['borg' , '--version' ], stdout = subprocess .PIPE ).stdout .decode ('utf-8' )
180+ borg_ver = borg_ver .split (' ' )[1 ].strip ()
181+
182+ # test window does not automatically set borg version
183+ borg_compat .set_version (borg_ver , borg_compat .path )
184+
185+ parsed_borg_version = Version (borg_ver )
186+ return borg_ver , parsed_borg_version
187+
188+
189+ @pytest .fixture (autouse = True )
190+ def min_borg_version (borg_version , request ):
191+ marker = request .node .get_closest_marker ('min_borg_version' )
192+ if marker :
193+ parsed_borg_version = borg_version [1 ]
194+ req_version = marker .args [0 ]
195+ if parsed_borg_version < Version (req_version ):
196+ pytest .skip (f'skipped due to borg version requirement for test: { req_version } ' )
197+
198+
199+ @pytest .fixture (scope = 'function' )
200+ def create_test_repo (tmpdir_factory , borg_version ):
201+ repo_path = tmpdir_factory .mktemp ('repo' )
202+ source_files_dir = tmpdir_factory .mktemp ('borg_src' )
203+
204+ is_borg_v2 = borg_version [1 ] >= Version ('2.0.0b1' )
205+
206+ if is_borg_v2 :
207+ subprocess .run (['borg' , '-r' , str (repo_path ), 'rcreate' , '--encryption=none' ], check = True )
208+ else :
209+ subprocess .run (['borg' , 'init' , '--encryption=none' , str (repo_path )], check = True )
210+
211+ def create_archive (timestamp , name ):
212+ if is_borg_v2 :
213+ subprocess .run (
214+ ['borg' , '-r' , str (repo_path ), 'create' , '--timestamp' , timestamp , name , str (source_files_dir )],
215+ cwd = str (repo_path ),
216+ check = True ,
217+ )
218+ else :
219+ subprocess .run (
220+ ['borg' , 'create' , '--timestamp' , timestamp , f'{ repo_path } ::{ name } ' , str (source_files_dir )],
221+ cwd = str (repo_path ),
222+ check = True ,
223+ )
224+
225+ # Setup dummy directory structure and mock files
226+ file_path = os .path .join (source_files_dir , 'file' )
227+ with open (file_path , 'w' ) as f :
228+ f .write ('test' )
229+
230+ dir_path = os .path .join (source_files_dir , 'dir' )
231+ os .mkdir (dir_path )
232+
233+ file_path = os .path .join (dir_path , 'file' )
234+ with open (file_path , 'w' ) as f :
235+ f .write ('test' )
236+
237+ create_archive ('2023-06-14T01:00:00' , 'test-archive1' )
238+
239+ symlink_path = os .path .join (dir_path , 'symlink' )
240+ os .symlink (file_path , symlink_path )
241+
242+ hardlink_path = os .path .join (dir_path , 'hardlink' )
243+ os .link (file_path , hardlink_path )
244+
245+ fifo_path = os .path .join (dir_path , 'fifo' )
246+ os .mkfifo (fifo_path )
247+
248+ supports_chrdev = True
249+ try :
250+ chrdev_path = os .path .join (dir_path , 'chrdev' )
251+ os .mknod (chrdev_path , mode = 0o600 | 0o020000 )
252+ except PermissionError :
253+ supports_chrdev = False
254+
255+ create_archive ('2023-06-14T02:00:00' , 'test-archive2' )
256+
257+ os .rename (dir_path , os .path .join (source_files_dir , 'dir1' ))
258+ create_archive ('2023-06-14T03:00:00' , 'test-archive3' )
259+
260+ for file in os .listdir (os .path .join (source_files_dir , 'dir1' )):
261+ os .rename (os .path .join (source_files_dir , 'dir1' , file ), os .path .join (source_files_dir , 'dir1' , file + '1' ))
262+ create_archive ('2023-06-14T04:00:00' , 'test-archive4' )
263+
264+ for file in os .listdir (os .path .join (source_files_dir , 'dir1' )):
265+ os .remove (os .path .join (source_files_dir , 'dir1' , file ))
266+ create_archive ('2023-06-14T05:00:00' , 'test-archive5' )
267+
268+ os .chmod (os .path .join (source_files_dir , 'dir1' ), 0o700 )
269+ create_archive ('2023-06-14T06:00:00' , 'test-archive6' )
270+
271+ return repo_path , source_files_dir , supports_chrdev
272+
273+
274+ @pytest .fixture
275+ def window_load (qapp ):
276+ return lambda : load_window (qapp )
277+
278+
279+ @pytest .fixture (scope = 'function' , autouse = True )
280+ def init_db (request , qapp , qtbot , tmpdir_factory ):
281+ is_integration = "tests/integration/" in request .node .nodeid
282+
283+ tmp_db = tmpdir_factory .mktemp ('Vorta' ).join ('settings.sqlite' )
284+ mock_db = SqliteDatabase (
285+ str (tmp_db ),
286+ pragmas = {
287+ 'journal_mode' : 'wal' ,
288+ },
289+ )
290+ vorta .store .connection .init_db (mock_db )
291+
292+ # Common Settings
293+ keyring_setting = SettingsModel .get (key = 'use_system_keyring' )
294+ keyring_setting .value = False
295+ keyring_setting .save ()
296+
297+ default_profile = BackupProfileModel (name = 'Default' )
298+ default_profile .save ()
299+
300+ if is_integration :
301+ try :
302+ repo_path , source_dir , _ = request .getfixturevalue ('create_test_repo' )
303+ except pytest .FixtureLookupError :
304+ repo_path , source_dir , _ = '/tmp/fake_repo' , '/tmp/fake_src' , False
305+ new_repo = RepoModel (url = repo_path )
306+ else :
307+ new_repo = RepoModel (url = 'i0fi93@i593.repo.borgbase.com:repo' )
308+
309+ new_repo .encryption = 'none'
310+ new_repo .save ()
311+
312+ default_profile .repo = new_repo .id
313+ default_profile .dont_run_on_metered_networks = False
314+ default_profile .validation_on = False
315+ default_profile .save ()
316+
317+ if is_integration :
318+ source_dir_model = SourceFileModel (
319+ dir = source_dir , repo = new_repo , dir_size = 12 , dir_files_count = 3 , path_isdir = True
320+ )
321+ source_dir_model .save ()
322+
323+ qapp .main_window .deleteLater ()
324+ del qapp .main_window
325+ qapp .main_window = MainWindow (qapp )
326+ qapp .scheduler .schedule_changed .disconnect ()
327+
328+ else :
329+ # Unit test extra data
330+ test_archive = ArchiveModel (snapshot_id = '99999' , name = 'test-archive' , time = dt (2000 , 1 , 1 , 0 , 0 ), repo = 1 )
331+ test_archive .save ()
332+
333+ test_archive1 = ArchiveModel (snapshot_id = '99998' , name = 'test-archive1' , time = dt (2000 , 1 , 1 , 0 , 0 ), repo = 1 )
334+ test_archive1 .save ()
335+
336+ source_dir_model = SourceFileModel (
337+ dir = TEST_SOURCE_DIR , repo = new_repo , dir_size = 100 , dir_files_count = 18 , path_isdir = True
338+ )
339+ source_dir_model .save ()
340+
341+ disconnect_all (qapp .scheduler .schedule_changed )
342+
343+ if 'window_load' not in request .fixturenames :
344+ load_window (qapp )
345+
346+ yield
347+
348+ qapp .jobs_manager .cancel_all_jobs ()
349+
350+ if is_integration :
351+ qtbot .waitUntil (lambda : all_workers_finished (qapp .jobs_manager ), ** pytest ._wait_defaults )
352+ QCoreApplication .processEvents ()
353+ disconnect_all (qapp .backup_finished_event )
354+ disconnect_all (qapp .scheduler .schedule_changed )
355+ else :
356+ timeout = pytest ._wait_defaults .get ('timeout' , 20000 ) / 1000
357+ start = time .time ()
358+ while not all_workers_finished (qapp .jobs_manager ):
359+ if time .time () - start > timeout :
360+ break
361+ time .sleep (0.1 )
362+
363+ disconnect_all (qapp .backup_finished_event )
364+ disconnect_all (qapp .scheduler .schedule_changed )
365+
366+ qapp .jobs_manager .workers .clear ()
367+ qapp .jobs_manager .jobs .clear ()
368+ mock_db .close ()
369+
370+
371+ @pytest .fixture
372+ def choose_file_dialog (request , tmpdir ):
373+ is_integration = "tests/integration/" in request .node .nodeid
374+
375+ class MockFileDialog :
376+ def __init__ (self , * args , ** kwargs ):
377+ if is_integration :
378+ self .directory = kwargs .get ('directory' , None )
379+ self .subdirectory = kwargs .get ('subdirectory' , None )
380+
381+ def open (self , func ):
382+ func ()
383+
384+ def selectedFiles (self ):
385+ if not is_integration :
386+ return [TEST_TEMP_DIR ]
387+
388+ if hasattr (self , 'subdirectory' ) and self .subdirectory :
389+ return [str (tmpdir .join (self .subdirectory ))]
390+ elif hasattr (self , 'directory' ) and self .directory :
391+ return [str (self .directory )]
392+ else :
393+ return [str (tmpdir )]
394+
395+ return MockFileDialog
396+
397+
398+ @pytest .fixture
399+ def borg_json_output ():
400+ open_files = []
401+
402+ def _read_json (subcommand ):
403+ stdout = open (f'tests/unit/borg_json_output/{ subcommand } _stdout.json' )
404+ stderr = open (f'tests/unit/borg_json_output/{ subcommand } _stderr.json' )
405+ open_files .append (stdout )
406+ open_files .append (stderr )
407+ return stdout , stderr
408+
409+ yield _read_json
410+
411+ for f in open_files :
412+ try :
413+ f .close ()
414+ except Exception :
415+ pass
416+
417+
418+ @pytest .fixture
419+ def rootdir (request ):
420+ is_integration = "tests/integration/" in request .node .nodeid
421+ base_dir = os .path .dirname (os .path .abspath (__file__ ))
422+ if is_integration :
423+ return os .path .join (base_dir , 'integration' )
424+ return os .path .join (base_dir , 'unit' )
425+
426+
427+ @pytest .fixture ()
428+ def archive_env (request , qapp , qtbot ):
429+ is_integration = "tests/integration/" in request .node .nodeid
430+ main : MainWindow = qapp .main_window
431+ tab : ArchiveTab = main .archiveTab
432+ main .tabWidget .setCurrentIndex (3 )
433+
434+ if is_integration :
435+ tab .refresh_archive_list ()
436+ qtbot .waitUntil (lambda : tab .archiveTable .rowCount () > 0 , ** pytest ._wait_defaults )
437+ else :
438+ tab .populate_from_profile ()
439+ qtbot .waitUntil (lambda : tab .archiveTable .rowCount () == 2 , ** pytest ._wait_defaults )
440+
441+ return main , tab
0 commit comments