diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aa474fc..5a208a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,12 +41,22 @@ jobs: brew install subversion mercurial if: matrix.os == 'macos-latest' + - name: Install dependencies (macOS) + run: | + brew install subversion mercurial breezy + if: matrix.os == 'macos-latest' + - name: Install dependencies (Ubuntu) run: | sudo apt-get update - sudo apt-get install -y --no-install-recommends subversion mercurial + sudo apt-get install -y --no-install-recommends subversion mercurial brz if: startsWith(matrix.os, 'ubuntu') + - name: Install breezy pip (Ubuntu 24.04) + run: | + python -m pip install --upgrade breezy # Adds breezy to the PYTHONPATH + if: startsWith(matrix.os, 'ubuntu-24.04') + - name: Test with pytest run: | pip install --upgrade .[test] diff --git a/test/test_bzr.py b/test/test_bzr.py new file mode 100644 index 0000000..a4ec73c --- /dev/null +++ b/test/test_bzr.py @@ -0,0 +1,130 @@ +"""Integration tests for BzrClient class.""" + +import os +import tarfile +import tempfile +import unittest +from shutil import which + +from vcs2l.clients.bzr import BzrClient +from vcs2l.util import rmtree + +bzr = which('bzr') + + +@unittest.skipIf(not bzr, '`bzr` was not found') +class TestCheckout(unittest.TestCase): + """Simple integration tests for BzrClient checkout functionality.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_dir = tempfile.mkdtemp() + self.repo_link = 'https://github.com/octocat/Hello-World.git' + self.repo_path = os.path.join(self.test_dir, 'test_repo') + + def tearDown(self): + """Clean up after tests.""" + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_checkout_repository(self): + """Test checkout with a valid repository URL.""" + client = BzrClient(self.repo_path) + result = client.checkout(self.repo_link) + + self.assertTrue(result) + self.assertTrue(os.path.exists(self.repo_path)) + + def test_checkout_invalid_url(self): + """Test checkout with invalid URL raises ValueError.""" + client = BzrClient(self.repo_path) + + with self.assertRaises(ValueError): + client.checkout(None) + + with self.assertRaises(ValueError): + client.checkout('') + + def test_checkout_existing_directory_fails(self): + """Test checkout fails when directory already exists with content.""" + # Create the target directory and put a file in it + os.makedirs(self.repo_path, exist_ok=True) + test_file = os.path.join(self.repo_path, 'existing_file.txt') + with open(test_file, 'w', encoding='utf-8') as f: + f.write('This file already exists') + + client = BzrClient(self.repo_path) + + with self.assertRaises(RuntimeError) as context: + client.checkout(self.repo_link) + + # Verify the error message mentions the path + self.assertIn(self.repo_path, str(context.exception)) + self.assertIn('Target path exists and is not empty', str(context.exception)) + + +@unittest.skipIf(not bzr, '`bzr` was not found') +class TestExportRepository(unittest.TestCase): + """Integration tests for BzrClient export_repository functionality.""" + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.repo_path = os.path.join(self.test_dir, 'test_repo') + self.export_path = os.path.join(self.test_dir, 'export_test') + self.client = BzrClient(self.repo_path) + # Use a git repository instead of launchpad for future-proofing + self.repo_url = 'https://github.com/octocat/Hello-World.git' + + def tearDown(self): + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_export_repository(self): + """Test export repository.""" + self.client.checkout(self.repo_url) + + # Change to the repository directory for export + original_cwd = os.getcwd() + try: + os.chdir(self.repo_path) + + # Test export with a specific revision + result = self.client.export_repository(None, self.export_path) + self.assertTrue(result, 'Export should return True on success') + + archive_path = self.export_path + '.tar.gz' + self.assertTrue( + os.path.exists(archive_path), 'Archive file should be created' + ) + + # Verify the archive is a valid tar.gz file + with tarfile.open(archive_path, 'r:gz') as tar: + members = tar.getnames() + self.assertGreater(len(members), 0, 'Archive should contain files') + + finally: + os.chdir(original_cwd) + + def test_export_repository_git_version_unsupported(self): + """Test export repository with unsupported version for git repo.""" + self.client.checkout(self.repo_url) + + # Change to the repository directory for export + original_cwd = os.getcwd() + try: + os.chdir(self.repo_path) + + # Test export with invalid version + result = self.client.export_repository('999999999', self.export_path) + self.assertFalse(result, 'Version is not supported for git repositories.') + + archive_path = self.export_path + '.tar.gz' + self.assertFalse( + os.path.exists(archive_path), 'Archive should not be created on failure' + ) + finally: + os.chdir(original_cwd) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_git.py b/test/test_git.py new file mode 100644 index 0000000..24cb3d1 --- /dev/null +++ b/test/test_git.py @@ -0,0 +1,145 @@ +"""Unit tests for GitClient checkout functionality""" + +import os +import tarfile +import tempfile +import unittest + +from vcs2l.clients.git import GitClient +from vcs2l.util import rmtree + + +class TestCheckout(unittest.TestCase): + """Test cases for GitClient checkout method""" + + def setUp(self): + # Create a temporary directory for testing + self.test_dir = tempfile.mkdtemp() + self.repo_path = os.path.join(self.test_dir, 'test_repo') + + def tearDown(self): + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_checkout_specific_branch(self): + """Test checking out a specific branch""" + client = GitClient(self.repo_path) + + url = 'https://github.com/octocat/Hello-World.git' + success = client.checkout(url, version='test', shallow=True) + + self.assertTrue(success, 'Checkout should succeed') + self.assertTrue(os.path.exists(self.repo_path), 'Repo directory should exist') + self.assertTrue( + os.path.isdir(os.path.join(self.repo_path, '.git')), + 'Should be a git repository', + ) + + def test_checkout_nonexistent_repo(self): + """Test checking out a non-existent repository should fail""" + client = GitClient(self.repo_path) + + url = 'https://github.com/this/repo/does/not/exist.git' + success = client.checkout(url, verbose=True) + + self.assertFalse(success, 'Checkout should fail for non-existent repo') + + def test_checkout_to_existing_directory(self): + """Test checking out to a non-empty directory should fail""" + # Create a non-empty directory + os.makedirs(self.repo_path) + with open( + os.path.join(self.repo_path, 'existing_file.txt'), 'w', encoding='utf-8' + ) as f: + f.write('This file already exists') + + client = GitClient(self.repo_path) + url = 'https://github.com/octocat/Hello-World.git' + success = client.checkout(url) + + self.assertFalse(success, 'Checkout should fail for non-empty directory') + + +class TestExportRepository(unittest.TestCase): + """Test cases for GitClient export_repository method""" + + def setUp(self): + # Create a temporary directory for testing + self.test_dir = tempfile.mkdtemp() + self.repo_path = os.path.join(self.test_dir, 'test_repo') + self.export_dir = os.path.join(self.test_dir, 'exports') + os.makedirs(self.export_dir, exist_ok=True) + + self.test_repo_url = 'https://github.com/octocat/Hello-World.git' + self.client = GitClient(self.repo_path) + + def tearDown(self): + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_export_specific_branch(self): + """Test exporting the repository at a specific branch""" + success = self.client.checkout(self.test_repo_url, version='test') + + if not success: + self.fail('Failed to clone test repository') + + basepath = os.path.join(self.export_dir, 'repo_export') + filepath = self.client.export_repository('test', basepath) + + self.assertTrue(os.path.exists(filepath), 'Export file should exist') + self.assertGreater( + os.path.getsize(filepath), 0, 'Export file should not be empty' + ) + # Verify it's a valid tar.gz file + try: + if filepath and isinstance(filepath, str): + with tarfile.open(filepath, 'r:gz') as tar: + members = tar.getnames() + self.assertIn('README', members, 'Should contain README file') + else: + self.fail('Exported file path is invalid') + except tarfile.ReadError: + self.fail('Exported file should be a valid tar.gz archive') + + def test_export_with_local_changes_uses_temp_dir(self): + """Test that export uses temp directory when there are local changes""" + # First clone the repository + success = self.client.checkout(self.test_repo_url) + if not success: + self.fail('Failed to clone test repository') + + # Create a local change + test_file = os.path.join(self.repo_path, 'test_change.txt') + with open(test_file, 'w', encoding='utf-8') as f: + f.write('test change') + + basepath = os.path.join(self.export_dir, 'local_changes_test') + filepath = self.client.export_repository(None, basepath) + + self.assertIsNotNone(filepath, 'Export should succeed even with local changes') + self.assertTrue(os.path.exists(filepath), 'Export file should exist') + self.assertGreater( + os.path.getsize(filepath), 0, 'Export file should not be empty' + ) + + # Clean up + os.remove(test_file) + + def test_export_invalid_branch(self): + """Test exporting a non-existent branch should fail gracefully""" + success = self.client.checkout(self.test_repo_url) + + if not success: + self.fail('Failed to clone test repository') + + basepath = os.path.join(self.export_dir, 'nonexistent_export') + result = self.client.export_repository('nonexistent-branch-12345', basepath) + + self.assertEqual( + result, False, 'Export should return False for non-existent ref' + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_hg.py b/test/test_hg.py new file mode 100644 index 0000000..2438db7 --- /dev/null +++ b/test/test_hg.py @@ -0,0 +1,123 @@ +"""Integration tests for HgClient checkout functionality""" + +import os +import subprocess +import tempfile +import unittest +from shutil import which + +from vcs2l.clients.hg import HgClient +from vcs2l.util import rmtree + +hg = which('hg') + + +@unittest.skipIf(not hg, '`hg` was not found') +class TestCheckout(unittest.TestCase): + """Test cases for HgClient checkout method.""" + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.repo_url = 'https://www.mercurial-scm.org/repo/hello' + self.repo_path = os.path.join(self.test_dir, 'test_repo') + + def tearDown(self): + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_checkout_with_version(self): + """Test checkout with a specific version/revision.""" + client = HgClient(self.repo_path) + result = client.checkout(self.repo_url, version='1') + + self.assertTrue(result, 'Checkout with version should return True on success') + self.assertTrue( + os.path.exists(self.repo_path), 'Repository directory should be created' + ) + self.assertTrue( + HgClient.is_repository(self.repo_path), + 'Should create valid Mercurial repository', + ) + + def test_checkout_invalid_url(self): + """Test checkout with an invalid URL.""" + client = HgClient(self.repo_path) + result = client.checkout('https://invalid-url-for-testing.com/repo') + + self.assertFalse(result, 'Checkout with invalid URL should return False') + + def test_checkout_existing_directory(self): + """Test checkout fails when directory already exists with content.""" + # Create the target directory and put a file in it + os.makedirs(self.repo_path, exist_ok=True) + test_file = os.path.join(self.repo_path, 'existing_file.txt') + with open(test_file, 'w', encoding='utf-8') as f: + f.write('This file already exists') + + client = HgClient(self.repo_path) + result = client.checkout(self.repo_url) + + self.assertFalse( + result, 'Checkout should return False when directory exists with content' + ) + + +@unittest.skipIf(not hg, '`hg` was not found') +class TestExportRepository(unittest.TestCase): + """Integration tests for HgClient export_repository functionality.""" + + def setUp(self): + """Set up test fixtures for each test""" + self.test_dir = tempfile.mkdtemp(prefix='hg_test_') + self.repo_path = os.path.join(self.test_dir, 'hello') + self.export_base_path = os.path.join(self.test_dir, 'exported_repo') + self.repo_url = 'https://www.mercurial-scm.org/repo/hello' + + self.hg_client = HgClient(self.repo_path) + + self._ensure_repo_cloned() + + def tearDown(self): + """Clean up test fixtures""" + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def _ensure_repo_cloned(self): + """Ensure the test repository is cloned locally""" + if not os.path.exists(self.repo_path): + try: + subprocess.run( + ['hg', 'clone', '--rev', 'tip', self.repo_url, self.repo_path], + check=True, + capture_output=True, + text=True, + ) + except subprocess.CalledProcessError as e: + self.fail(f'Failed to clone repository: {e.stderr}') + + def test_export_repository_specific_revision(self): + """Test exporting a specific revision""" + result = self.hg_client.export_repository('1', self.export_base_path) + self.assertTrue(result, "Export should succeed for revision '1'") + + # Verify files were created correctly + tar_gz_path = self.export_base_path + '.tar.gz' + self.assertTrue(os.path.exists(tar_gz_path)) + + def test_export_repository_invalid_revision(self): + """Test exporting with an invalid revision""" + invalid_revision = 'nonexistent123456789' + + result = self.hg_client.export_repository( + invalid_revision, self.export_base_path + ) + self.assertFalse(result, 'Export should fail for invalid revision') + + tar_gz_path = self.export_base_path + '.tar.gz' + tar_path = self.export_base_path + '.tar' + self.assertFalse(os.path.exists(tar_gz_path)) + self.assertFalse(os.path.exists(tar_path)) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_svn.py b/test/test_svn.py new file mode 100644 index 0000000..d618030 --- /dev/null +++ b/test/test_svn.py @@ -0,0 +1,152 @@ +"""Integration tests for SvnClient class.""" + +import os +import subprocess +import tarfile +import tempfile +import unittest +from shutil import which + +from vcs2l.clients.svn import SvnClient +from vcs2l.util import rmtree + +svn = which('svn') + + +@unittest.skipIf(not svn, '`svn` was not found') +class TestCheckout(unittest.TestCase): + """Test cases for SvnClient checkout method.""" + + def setUp(self): + # Create a temporary directory for testing + self.test_dir = tempfile.mkdtemp(prefix='svn_test_') + self.repo_url = 'https://svn.apache.org/repos/asf/abdera/abdera2/' + self.client = SvnClient(self.test_dir) + + def tearDown(self): + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_checkout_version(self): + """Test checkout repository with specific revision""" + version = '1928014' # Specific revision number + + result = self.client.checkout(self.repo_url, version=version) + + self.assertTrue(result, 'Checkout with specific version should succeed') + + # Verify that .svn directory exists + svn_dir = os.path.join(self.test_dir, '.svn') + self.assertTrue( + os.path.exists(svn_dir), '.svn directory should exist after checkout' + ) + + # Verify the checked out revision (using svn info) + try: + result = subprocess.run( + ['svn', 'info', '--show-item', 'revision'], + check=True, + cwd=self.test_dir, + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode == 0: + actual_revision = result.stdout.strip() + self.assertEqual( + actual_revision, + version, + f'Checked out revision should be {version}, got {actual_revision}', + ) + except ( + subprocess.TimeoutExpired, + subprocess.SubprocessError, + FileNotFoundError, + ): + self.fail('Failed to run svn info to verify revision') + + def test_invalid_version(self): + """Test that checkout fails with invalid revision number""" + invalid_version = '999999999' + + result = self.client.checkout(self.repo_url, version=invalid_version) + + self.assertFalse(result, 'Checkout with invalid revision should fail') + + def test_existing_non_empty_dir_should_fail(self): + """Test that checkout fails if target directory is not empty""" + # Create a file in the test directory first + test_file = os.path.join(self.test_dir, 'existing_file.txt') + with open(test_file, 'w', encoding='utf-8') as f: + f.write('This file already exists') + + with self.assertRaises(RuntimeError) as context: + self.client.checkout(self.repo_url) + + self.assertIn('Target path exists and is not empty', str(context.exception)) + + +@unittest.skipIf(not svn, '`svn` was not found') +class TestSvnExportRepository(unittest.TestCase): + """Integration tests for SvnClient export_repository functionality.""" + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.repo_url = 'https://svn.apache.org/repos/asf/abdera/abdera2/' + self.repo_path = os.path.join(self.test_dir, 'test_repo') + self.export_path = os.path.join(self.test_dir, 'export_test') + + def tearDown(self): + if os.path.exists(self.test_dir): + rmtree(self.test_dir) + + def test_export_repository_with_version(self): + """Test export repository with specific version.""" + client = SvnClient(self.repo_path) + client.checkout(self.repo_url) + + # Change to the repository directory for export + original_cwd = os.getcwd() + try: + os.chdir(self.repo_path) + + result = client.export_repository('1928014', self.export_path) + self.assertTrue(result, 'Export should return True on success') + + # Verify tar.gz file was created + archive_path = self.export_path + '.tar.gz' + self.assertTrue( + os.path.exists(archive_path), 'Archive file should be created' + ) + + # Verify the archive is a valid tar.gz file + with tarfile.open(archive_path, 'r:gz') as tar: + members = tar.getnames() + self.assertGreater(len(members), 0, 'Archive should contain files') + + finally: + os.chdir(original_cwd) + + def test_export_repository_invalid_version(self): + """Test export repository with invalid version returns False.""" + client = SvnClient(self.repo_path) + client.checkout(self.repo_url) + + original_cwd = os.getcwd() + try: + os.chdir(self.repo_path) + + result = client.export_repository('999999999', self.export_path) + self.assertFalse(result, 'Export should return False for invalid version') + + archive_path = self.export_path + '.tar.gz' + self.assertFalse( + os.path.exists(archive_path), 'Archive should not be created on failure' + ) + + finally: + os.chdir(original_cwd) + + +if __name__ == '__main__': + unittest.main() diff --git a/vcs2l/clients/__init__.py b/vcs2l/clients/__init__.py index 93e4da2..906271d 100644 --- a/vcs2l/clients/__init__.py +++ b/vcs2l/clients/__init__.py @@ -1,42 +1,42 @@ vcs2l_clients = [] try: - from .bzr import BzrClient + from vcs2l.clients.bzr import BzrClient vcs2l_clients.append(BzrClient) except ImportError: pass try: - from .git import GitClient + from vcs2l.clients.git import GitClient vcs2l_clients.append(GitClient) except ImportError: pass try: - from .hg import HgClient + from vcs2l.clients.hg import HgClient vcs2l_clients.append(HgClient) except ImportError: pass try: - from .svn import SvnClient + from vcs2l.clients.svn import SvnClient vcs2l_clients.append(SvnClient) except ImportError: pass try: - from .tar import TarClient + from vcs2l.clients.tar import TarClient vcs2l_clients.append(TarClient) except ImportError: pass try: - from .zip import ZipClient + from vcs2l.clients.zip import ZipClient vcs2l_clients.append(ZipClient) except ImportError: @@ -47,3 +47,24 @@ raise RuntimeError( 'Multiple vcs clients share the same type: ' + ', '.join(sorted(_client_types)) ) + + +def get_vcs_client(vcs_type, uri): + """ + Get a VCS client instance for the specified type and URI. + + Args: + vcs_type: The type of VCS (e.g., 'git', 'svn', 'hg', etc.) + uri: The repository URI/path + + Returns: + An instance of the appropriate VCS client + + Raises: + ValueError: If no client supports the specified type + """ + for client_class in vcs2l_clients: + if client_class.type == vcs_type: + return client_class(uri) + + raise ValueError(f'No VCS client found for type: {vcs_type}') diff --git a/vcs2l/clients/bzr.py b/vcs2l/clients/bzr.py index 02bd03c..8da5fa7 100644 --- a/vcs2l/clients/bzr.py +++ b/vcs2l/clients/bzr.py @@ -194,6 +194,66 @@ def _get_parent_branch(self): result['output'] = branch return result + def export_repository(self, version, basepath): + """Export the bzr repository at a given version to a tar.gz file.""" + self._check_executable() + + # Construct the bzr export command + cmd = [BzrClient._executable, 'export', '--format=tgz', basepath + '.tar.gz'] + + if version: + cmd.append('--revision') + cmd.append(version) + + result = self._run_command(cmd) + + return result['returncode'] == 0 + + def checkout(self, url, version=None, verbose=False, shallow=False, timeout=None): + """Creates a local Bazaar branch from a remote repository.""" + if url is None or url.strip() == '': + raise ValueError('Invalid empty url: "%s"' % url) + + # Check if directory exists and is not empty + if os.path.exists(self.path) and os.listdir(self.path): + raise RuntimeError('Target path exists and is not empty: %s' % self.path) + + # Create parent directory if it doesn't exist, but not the target directory itself + # This is because 'bzr branch' will create the target directory. + parent_dir = os.path.dirname(self.path) + if parent_dir and not os.path.exists(parent_dir): + result = self._create_path() + if result and result.get('returncode'): + return False + + self._check_executable() + + # Build the branch command + cmd = ['bzr', 'branch'] + + if verbose: + cmd.append('--verbose') + + if version: + cmd.extend(['--revision', str(version)]) + + # let bzr create the target directory + cmd.extend([url, self.path]) + + # Run the command with proper timeout handling + try: + result = self._run_command(cmd) + except Exception as e: + print('Branch command failed with exception: %s' % str(e)) + return False + + if result['returncode'] != 0: + error_msg = result.get('output', 'Unknown error') + print('Branch failed: %s' % error_msg) + return False + + return True + def _check_executable(self): assert BzrClient._executable is not None, "Could not find 'bzr' executable" diff --git a/vcs2l/clients/git.py b/vcs2l/clients/git.py index 4f013a8..8d4be31 100644 --- a/vcs2l/clients/git.py +++ b/vcs2l/clients/git.py @@ -782,6 +782,94 @@ def _check_color(self, cmd): if GitClient._config_color_is_auto: cmd[1:1] = '-c', 'color.ui=always' + def export_repository(self, version, basepath): + """Export repository to a tar.gz file at the specified version.""" + if not GitClient.is_repository(self.path): + return False + + self._check_executable() + filepath = f'{basepath}.tar.gz' + + cmd = [ + GitClient._executable, + 'archive', + '--format=tar.gz', + f'--output={filepath}', + version or 'HEAD', + ] + result = self._run_command(cmd) + return filepath if result['returncode'] == 0 else False + + def checkout(self, url, version=None, verbose=False, shallow=False, timeout=None): + """Clone a git repository to the specified path and checkout a specific version.""" + if url is None or url.strip() == '': + raise ValueError('Invalid empty url: "%s"' % url) + + self._check_executable() + + if os.path.exists(self.path): + if os.path.isdir(self.path): + # Check if directory is empty + if os.listdir(self.path): + return False + else: + return False + else: + # Create parent directories + try: + os.makedirs(self.path, exist_ok=True) + except OSError: + return False + + try: + cmd_clone = [GitClient._executable, 'clone'] + + if shallow: + cmd_clone.extend(['--depth', '1']) + if self.get_git_version() >= [1, 7, 10]: + cmd_clone.append('--no-single-branch') + + if version is None: + cmd_clone.append('--recursive') + + cmd_clone.extend([url, '.']) + + result_clone = self._run_command(cmd_clone) + if result_clone['returncode']: + return False + + # Checkout specific version if provided + if version: + cmd_checkout = [GitClient._executable, 'checkout', version] + result_checkout = self._run_command(cmd_checkout) + + if result_checkout['returncode']: + # Try fetching all refs first + cmd_fetch = [GitClient._executable, 'fetch', '--all', '--tags'] + result_fetch = self._run_command(cmd_fetch) + if result_fetch['returncode'] == 0: + result_checkout = self._run_command(cmd_checkout) + + if result_checkout['returncode']: + return False + + # Update submodules after version checkout + cmd_submodules = [ + GitClient._executable, + 'submodule', + 'update', + '--init', + '--recursive', + ] + result_submodules = self._run_command(cmd_submodules) + if result_submodules['returncode']: + return False + + return True + + except Exception: + return False + def _check_executable(self): assert GitClient._executable is not None, "Could not find 'git' executable" @@ -797,6 +885,24 @@ def _get_hash_ref_tuples(self, ls_remote_output): tuples.append((hash_, ref)) return tuples + def _get_current_version(self): + """Get current HEAD SHA.""" + cmd = [GitClient._executable, 'rev-parse', 'HEAD'] + result = self._run_command(cmd) + return result['output'].strip() if result['returncode'] == 0 else None + + def _get_version_sha(self, version): + """Get SHA for a given version.""" + cmd = [GitClient._executable, 'rev-parse', version] + result = self._run_command(cmd) + return result['output'].strip() if result['returncode'] == 0 else None + + def _has_no_local_changes(self): + """Check if there are no local changes.""" + cmd = [GitClient._executable, 'diff', '--quiet'] + result = self._run_command(cmd) + return result['returncode'] == 0 + if not GitClient._executable: GitClient._executable = which('git') diff --git a/vcs2l/clients/hg.py b/vcs2l/clients/hg.py index 9d41ebb..c880721 100644 --- a/vcs2l/clients/hg.py +++ b/vcs2l/clients/hg.py @@ -1,3 +1,4 @@ +import gzip import os from shutil import which from threading import Lock @@ -339,6 +340,88 @@ def _check_color(self, cmd): if HgClient._config_color: cmd[1:1] = '--color', 'always' + def export_repository(self, version, basepath): + """Export the hg repository at a given version to a tar.gz file.""" + self._check_executable() + + cmd = [ + HgClient._executable, + 'archive', + '-t', + 'tar', + '-r', + version, + basepath + '.tar', + ] + result = self._run_command(cmd) + if result['returncode']: + print('Failed to export hg repository: %s' % result['output']) + return False + + try: + with open(basepath + '.tar', 'rb') as tar_file: + with gzip.open(basepath + '.tar.gz', 'wb') as gzip_file: + gzip_file.writelines(tar_file) + finally: + try: + os.remove(basepath + '.tar') + except OSError: + print('Could not remove intermediate tar file %s.tar' % basepath) + + return True + + def checkout(self, url, version='', verbose=False, shallow=False, timeout=None): + """Checkout the repository from the given URL.""" + if url is None or url.strip() == '': + raise ValueError('Invalid empty url: "%s"' % url) + + self._check_executable() + + # Check if path exists and handle accordingly + if os.path.exists(self.path): + if os.path.isdir(self.path): + if os.listdir(self.path): + return False + else: + return False + + # Ensure parent directory exists + parent_dir = os.path.dirname(self.path) + if parent_dir: + try: + os.makedirs(parent_dir, exist_ok=True) + except OSError: + return False + + try: + # Clone repository + cmd_clone = [HgClient._executable, 'clone', url, self.path] + result_clone = self._run_command(cmd_clone) + + if result_clone['returncode'] != 0: + print( + "Could not clone repository '%s': %s" + % (url, result_clone['output']) + ) + return False + + # Checkout specific version if provided + if version and version.strip(): + cmd_checkout = [HgClient._executable, 'update', version.strip()] + result_checkout = self._run_command(cmd_checkout) + + if result_checkout['returncode'] != 0: + print( + "Could not checkout '%s': %s" + % (version, result_checkout['output']) + ) + return False + + return True + + except Exception: + return False + def _check_executable(self): assert HgClient._executable is not None, "Could not find 'hg' executable" diff --git a/vcs2l/clients/svn.py b/vcs2l/clients/svn.py index b10ff86..49242d4 100644 --- a/vcs2l/clients/svn.py +++ b/vcs2l/clients/svn.py @@ -1,8 +1,11 @@ import os +import tarfile +import tempfile from shutil import which -from xml.etree.ElementTree import fromstring +from xml.etree.ElementTree import ParseError, fromstring from vcs2l.clients.vcs_base import VcsClientBase +from vcs2l.util import rmtree class SvnClient(VcsClientBase): @@ -258,6 +261,103 @@ def validate(self, command): return {'cmd': cmd, 'cwd': self.path, 'output': output, 'returncode': None} + def export_repository(self, version, basepath): + """Export the svn repository at a given version to a tar.gz file.""" + self._check_executable() + + # Create temporary directory for export + temp_dir = tempfile.mkdtemp() + + try: + # Get current repository URL + cmd_info = [SvnClient._executable, 'info', '--xml'] + result_info = self._run_command(cmd_info) + if result_info['returncode']: + print('Could not determine url: ' + result_info['output']) + return False + + try: + root = fromstring(result_info['output']) + entry = root.find('entry') + + if entry is None: + print('No entry found in SVN info XML output') + return False + + url = entry.findtext('url') + if url is None: + print('No URL found in SVN info XML output') + return False + + except ParseError: + print('Could not parse SVN info XML output') + return False + + # Build export URL with version + export_url = url + if version: + export_url += '@' + str(version) + + cmd_export = [ + SvnClient._executable, + 'export', + '--force', + export_url, + temp_dir, + ] + result_export = self._run_command(cmd_export) + + if result_export['returncode']: + print('Could not export repository: ' + result_export['output']) + return False + + # Create tar.gz archive + with tarfile.open(basepath + '.tar.gz', 'w:gz') as tar: + tar.add(temp_dir, arcname='') + return True + + except tarfile.TarError as e: + print('Could not create tar.gz archive: %s' % e) + return False + + finally: + rmtree(temp_dir) + + def checkout(self, url, version=None, verbose=False, shallow=False, timeout=None): + """Checkout the repository from the given URL.""" + + if url is None or url.strip() == '': + raise ValueError('Invalid empty url: "%s"' % url) + + # Check if directory exists and is not empty + if os.path.exists(self.path) and os.listdir(self.path): + raise RuntimeError('Target path exists and is not empty: %s' % self.path) + + # Create the directory if it doesn't exist + not_exist = self._create_path() + if not_exist: + print(not_exist['output']) + return False + + self._check_executable() + + cmd = [SvnClient._executable, '--non-interactive', 'checkout'] + + if version: + cmd.extend(['-r', str(version)]) + + # Add URL and target directory + cmd.extend([url, '.']) + + result = self._run_command(cmd) + + if result['returncode']: + if result['output']: + print('Checkout failed: %s' % result['output']) + return False + + return True + def _check_executable(self): assert SvnClient._executable is not None, "Could not find 'svn' executable" diff --git a/vcs2l/clients/tar.py b/vcs2l/clients/tar.py index ffafa70..4f84e05 100644 --- a/vcs2l/clients/tar.py +++ b/vcs2l/clients/tar.py @@ -4,6 +4,7 @@ from urllib.error import URLError from vcs2l.clients.vcs_base import VcsClientBase, load_url, test_url +from vcs2l.errors import Vcs2lError from vcs2l.util import rmtree @@ -113,3 +114,9 @@ def validate(self, command): 'output': "Tarball url '%s' exists" % command.url, 'returncode': None, } + + def export_repository(self, version, basepath): + raise Vcs2lError('export repository not implemented for extracted tars') + + def checkout(self, url, version=None, verbose=False, shallow=False, timeout=None): + raise Vcs2lError('checkout not implemented for extracted tars.') diff --git a/vcs2l/clients/vcs_base.py b/vcs2l/clients/vcs_base.py index 50ebd36..a3013da 100644 --- a/vcs2l/clients/vcs_base.py +++ b/vcs2l/clients/vcs_base.py @@ -69,6 +69,41 @@ def _create_path(self): } return None + def get_path(self): + """Get the local path of the repository.""" + return self.path + + def export_repository(self, version, basepath): + """Export the repository at the given version to the given basepath. + + Args: + version: the version to export + basepath: the path to export to + Returns: + A dict with keys 'cmd', 'cwd', 'output', and 'returncode'. + """ + raise NotImplementedError( + 'Base class export_repository method must be overridden for client type %s ' + % self.__class__.type + ) + + def checkout(self, url, version=None, verbose=False, shallow=False, timeout=None): + """Checkout the repository from the given URL. + + Args: + url: the URL to checkout from + version: the version to checkout (branch, tag, or revision) + verbose: whether to run the command in verbose mode + shallow: whether to perform a shallow checkout (if supported) + timeout: timeout for network operations (if supported) + Returns: + True on success, False otherwise. + """ + raise NotImplementedError( + 'Base class checkout method must be overridden for client type %s ' + % self._vcs_type_name + ) + def run_command(cmd, cwd, env=None): if not os.path.exists(cwd): diff --git a/vcs2l/clients/zip.py b/vcs2l/clients/zip.py index 8218481..d43b4ac 100644 --- a/vcs2l/clients/zip.py +++ b/vcs2l/clients/zip.py @@ -4,6 +4,7 @@ from urllib.error import URLError from vcs2l.clients.vcs_base import VcsClientBase, load_url, test_url +from vcs2l.errors import Vcs2lError from vcs2l.util import rmtree @@ -128,3 +129,9 @@ def validate(self, command): 'output': "Zip url '%s' exists" % command.url, 'returncode': None, } + + def export_repository(self, version, basepath): + raise Vcs2lError('export repository not implemented for extracted zips') + + def checkout(self, url, version=None, verbose=False, shallow=False, timeout=None): + raise Vcs2lError('checkout not implemented for extracted zips.')