-
Notifications
You must be signed in to change notification settings - Fork 105
Expand file tree
/
Copy pathmain.py
More file actions
267 lines (229 loc) · 10.6 KB
/
Copy pathmain.py
File metadata and controls
267 lines (229 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# SPDX-License-Identifier: GPL-3.0-only
import asyncio
import logging
import os
from pathlib import Path
from textwrap import dedent
from packageurl import PackageURL
from hermeto import APP_NAME
from hermeto.core.config import get_config
from hermeto.core.constants import Mode
from hermeto.core.errors import NotAGitRepo, PackageRejected, UnsupportedFeature
from hermeto.core.models.input import BundlerBinaryFilters, Request
from hermeto.core.models.output import EnvironmentVariable, ProjectFile, RequestOutput
from hermeto.core.models.property_semantics import Property, PropertySet
from hermeto.core.models.sbom import Component, create_backend_annotation
from hermeto.core.package_managers.bundler.parser import (
GemDependency,
GemPlatformSpecificDependency,
GitDependency,
ParseResult,
PathDependency,
parse_lockfile,
)
from hermeto.core.package_managers.general import async_download_files, get_vcs_qualifiers
from hermeto.core.rooted_path import RootedPath
from hermeto.core.scm import get_repo_id
log = logging.getLogger(__name__)
CONFIG_OVERRIDE = "bundler/config_override"
def fetch_bundler_source(request: Request) -> RequestOutput:
"""Resolve and process all bundler packages."""
components: list[Component] = []
project_files: list[ProjectFile] = []
git_paths = []
for package in request.bundler_packages:
path_within_root = request.source_dir.join_within_root(package.path)
_comps, _git_paths = _resolve_bundler_package(
package_dir=path_within_root,
output_dir=request.output_dir,
binary_filters=package.binary,
)
components.extend(_comps)
git_paths.extend(_git_paths)
environment_variables: list[EnvironmentVariable] = (
_prepare_environment_variables_for_hermetic_build(git_paths)
)
project_files.append(_prepare_for_hermetic_build(request.source_dir, request.output_dir))
annotations = []
if backend_annotation := create_backend_annotation(components, "bundler"):
annotations.append(backend_annotation)
return RequestOutput.from_obj_list(
components=components,
environment_variables=environment_variables,
project_files=project_files,
annotations=annotations,
)
# Aliases for git dependency name, file system name, and remote URL:
DepName = str
FSDepName = str
DepURL = str
def _resolve_bundler_package(
package_dir: RootedPath,
output_dir: RootedPath,
binary_filters: BundlerBinaryFilters | None = None,
) -> tuple[list[Component], list[tuple[DepName, FSDepName, DepURL]]]:
"""Process a request for a single bundler package."""
deps_dir = output_dir.join_within_root("deps", "bundler")
deps_dir.path.mkdir(parents=True, exist_ok=True)
dependencies = parse_lockfile(package_dir, binary_filters)
name, version = _get_main_package_name_and_version(package_dir, dependencies)
try:
qualifiers = get_vcs_qualifiers(package_dir.root)
except NotAGitRepo:
if get_config().mode == Mode.PERMISSIVE:
qualifiers = None
else:
raise
main_package_purl = PackageURL(
type="gem",
name=name,
version=version,
qualifiers=qualifiers,
subpath=str(package_dir.subpath_from_root),
)
components = [Component(name=name, version=version, purl=main_package_purl.to_string())]
git_paths = []
files_to_download: dict[str, RootedPath] = {}
for dep in dependencies:
properties: list[Property] = []
match dep:
case GemPlatformSpecificDependency():
files_to_download[dep.remote_location] = dep.download_location(deps_dir)
properties = PropertySet(bundler_package_binary=True).to_properties()
case GemDependency():
files_to_download[dep.remote_location] = dep.download_location(deps_dir)
case GitDependency():
dep.download_to(deps_dir)
git_paths.append((dep.name, dep.repo_name + "-" + dep.ref[:12], str(dep.url)))
c = Component(name=dep.name, version=dep.version, purl=dep.purl, properties=properties)
components.append(c)
if files_to_download:
asyncio.run(
async_download_files(
files_to_download=files_to_download,
concurrency_limit=get_config().runtime.concurrency_limit,
)
)
return components, git_paths
def _get_main_package_name_and_version(
package_dir: RootedPath,
dependencies: ParseResult,
) -> tuple[str, str | None]:
"""
Get main package name and version.
The main package is the package that is being processed by our application.
Not any of its dependencies.
"""
name_and_version = _get_name_and_version_from_lockfile(dependencies)
if name_and_version is not None:
return name_and_version
# fallback to origin remote
try:
name = _get_repo_name_from_origin_remote(package_dir)
# if the git repository does not have an origin remote
except UnsupportedFeature:
raise PackageRejected(
reason="Failed to extract package name from origin remote",
solution=(
f"Please specify package name and version in a way that {APP_NAME} understands,\n"
f"or make sure that the directory {APP_NAME} is processing is a git repository with\n"
f"an 'origin' remote, in which case {APP_NAME} will infer the package name from the remote URL."
),
)
return name, None
def _get_name_and_version_from_lockfile(dependencies: ParseResult) -> tuple[str, str] | None:
"""
Extract the package name and version from dependencies in the Gemfile.lock.
Gemfile.lock only contains the name and version of the package. If the gemspec file
is explicitly defined in the Gemfile, Bundler will create a path dependency record
representing the gem in the package directory.
Note that having a gemspec file is an edge case when the package is not an actual gem.
But it is possible to include a gemspec file in the package directory that defines its name,
version, and other metadata even if the package is not a gem. So we respect this edge case.
See design doc for more details:
https://github.com/hermetoproject/hermeto/blob/main/docs/design/bundler.md
"""
for dep in dependencies:
if isinstance(dep, PathDependency) and dep.subpath == ".":
return dep.name, dep.version
return None
def _get_repo_name_from_origin_remote(package_dir: RootedPath) -> str:
"""Extract repository name from git origin remote in the package directory."""
try:
repo_id = get_repo_id(package_dir.root)
except NotAGitRepo:
raise PackageRejected(
reason="Unable to infer package name from origin URL",
solution=(
"Provide valid metadata in the package files or ensure "
"the package files are in a git repository whose 'origin' remote has a valid URL."
),
)
repo_name = Path(repo_id.parsed_origin_url.path).stem
resolved_path = Path(repo_name).joinpath(package_dir.subpath_from_root)
return str(resolved_path)
def _prepare_environment_variables_for_hermetic_build(
git_paths: list[tuple[DepName, FSDepName, DepURL]] | None = None,
) -> list[EnvironmentVariable]:
env_vars = [
# Contains path to a directory where a new config could be found.
EnvironmentVariable(name="BUNDLE_APP_CONFIG", value="${output_dir}/" + CONFIG_OVERRIDE),
]
if git_paths:
# Redirect git remote URLs to pre-fetched local clones via git's
# GIT_CONFIG_COUNT/KEY/VALUE mechanism. This injects url.insteadOf
# entries without replacing the global git config.
# See: https://git-scm.com/docs/git-config#ENVIRONMENT
_check_for_duplicate_git_urls(git_paths)
# (key, value) pairs for GIT_CONFIG_KEY_N / GIT_CONFIG_VALUE_N
git_config: list[tuple[str, str]] = []
for _packname, dirname, url in git_paths:
clone_file_url = "file://${output_dir}/deps/bundler/" + dirname + "/"
git_config.append((f"url.{clone_file_url}.insteadOf", url))
git_config.append(("protocol.file.allow", "always"))
env_vars.append(EnvironmentVariable(name="GIT_CONFIG_COUNT", value=str(len(git_config))))
for idx, (key, value) in enumerate(git_config):
env_vars.append(EnvironmentVariable(name=f"GIT_CONFIG_KEY_{idx}", value=key))
env_vars.append(EnvironmentVariable(name=f"GIT_CONFIG_VALUE_{idx}", value=value))
return env_vars
def _check_for_duplicate_git_urls(
git_paths: list[tuple[DepName, FSDepName, DepURL]],
) -> None:
"""Raise if multiple git deps share the same URL with different revisions."""
url_to_dirs: dict[str, set[str]] = {}
for _packname, dirname, url in git_paths:
url_to_dirs.setdefault(url, set()).add(dirname)
for url, dirs in url_to_dirs.items():
if len(dirs) > 1:
raise UnsupportedFeature(
f"Multiple git dependencies point to the same repository ({url}) "
f"but use different revisions: {', '.join(sorted(dirs))}. "
"This is not supported because git's url.insteadOf redirect "
"can only map a repository URL to a single local clone."
)
def _prepare_for_hermetic_build(source_dir: RootedPath, output_dir: RootedPath) -> ProjectFile:
"""Prepare a package for hermetic build by injecting necessary config."""
potential_bundle_config = source_dir.join_within_root(".bundle/config").path
hermetic_config = dedent(
"""
BUNDLE_CACHE_PATH: "${output_dir}/deps/bundler"
BUNDLE_DEPLOYMENT: "true"
BUNDLE_NO_PRUNE: "true"
BUNDLE_ALLOW_OFFLINE_INSTALL: "true"
BUNDLE_DISABLE_VERSION_CHECK: "true"
BUNDLE_VERSION: "system"
"""
)
if potential_bundle_config.is_file():
config_data = potential_bundle_config.read_text()
config_data += hermetic_config
elif (alternative_config := os.getenv("BUNDLE_APP_CONFIG")) is not None:
# Corner case: a user decides to define their own alternate config.
# In this scenario the application must try to copy over user-defined variables
# to its overriding alternate config.
config_data = Path(alternative_config, "config").read_text()
config_data += hermetic_config
else:
config_data = hermetic_config
overriding_bundler_config_path = output_dir.join_within_root(CONFIG_OVERRIDE, "config").path
return ProjectFile(abspath=overriding_bundler_config_path, template=config_data)