-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcli.py
executable file
·144 lines (122 loc) · 5.11 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/container-inspector for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging
import os
import sys
import tempfile
import csv as csv_module
import json as json_module
from os import path
import click
from container_inspector import image
from container_inspector import dockerfile
from container_inspector import rootfs
TRACE = False
logger = logging.getLogger(__name__)
if TRACE:
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logger.setLevel(logging.DEBUG)
@click.command()
@click.argument('image_path', metavar='IMAGE_path', type=click.Path(exists=True, readable=True))
@click.argument('extract_directory', metavar='TARGET_DIR', type=click.Path(exists=True, writable=True))
@click.help_option('-h', '--help')
def container_inspector_squash(image_path, extract_directory):
"""
Given a Docker image at IMAGE_PATH, extract and squash that image in TARGET_DIR
merging all layers in a single rootfs-like structure.'))
"""
_container_inspector_squash(image_path, extract_directory)
def _container_inspector_squash(image_path, extract_directory):
images = get_images_from_dir_or_tarball(image_path)
if(len(images) == 1):
raise ValueError("Can only squash one image at a time")
img = images[0]
target_loc = os.path.abspath(os.path.expanduser(extract_directory))
rootfs.rebuild_rootfs(img, target_loc)
@click.command()
@click.argument('directory', metavar='DIR', type=click.Path(exists=True, readable=True))
@click.option('--json', is_flag=True, help='Print information as JSON.')
@click.option('--csv', is_flag=True, help='Print information as CSV.')
@click.help_option('-h', '--help')
def container_inspector_dockerfile(directory, json=False, csv=False):
"""
Find source Dockerfile files in DIR. Print information as JSON or CSV to stdout.
Output is printed to stdout. Use a ">" redirect to save in a file.
"""
_container_inspector_dockerfile(directory, json, csv)
def _container_inspector_dockerfile(directory, json=False, csv=False):
assert json or csv, 'At least one of --json or --csv is required.'
dir_loc = os.path.abspath(os.path.expanduser(directory))
dockerfiles = dockerfile.collect_dockerfiles(location=dir_loc)
if not dockerfiles:
return
if json:
click.echo(json_module.dumps(
[df for _loc, df in dockerfiles.items()], indent=2))
if csv:
dockerfiles = list(dockerfile.flatten_dockerfiles(dockerfiles))
keys = dockerfiles[0].keys()
w = csv_module.DictWriter(sys.stdout, keys)
w.writeheader()
for df in dockerfiles:
w.writerow(df)
@click.command()
@click.argument('image_path', metavar='IMAGE_PATH', type=click.Path(exists=True, readable=True))
@click.option('--extract-to', default=None, metavar='PATH', type=click.Path(exists=True, readable=True))
@click.option('--csv', is_flag=True, default=False, help='Print information as CSV instead of JSON.')
@click.help_option('-h', '--help')
def container_inspector(image_path, extract_to=None, csv=False):
"""
Find Docker images and their layers in IMAGE_PATH.
Print information as JSON by default or as CSV with --csv.
Optionally extract images with extract-to.
Output is printed to stdout. Use a ">" redirect to save in a file.
"""
results = _container_inspector(image_path, extract_to=extract_to, csv=csv)
click.echo(results)
def _container_inspector(image_path, extract_to=None, csv=False, _layer_path_segments=2):
images = get_images_from_dir_or_tarball(image_path, extract_to=extract_to)
as_json = not csv
if as_json:
images = [i.to_dict(layer_path_segments=_layer_path_segments)
for i in images]
return json_module.dumps(images, indent=2)
else:
from io import StringIO
output = StringIO()
flat = list(image.flatten_images_data(
images=images,
layer_path_segments=_layer_path_segments
))
if not flat:
return
keys = flat[0].keys()
w = csv_module.DictWriter(output, keys)
w.writeheader()
for f in flat:
w.writerow(f)
val = output.getvalue()
output.close()
return val
def get_images_from_dir_or_tarball(image_path, extract_to=None, quiet=False):
image_loc = os.path.abspath(os.path.expanduser(image_path))
if path.isdir(image_path):
images = image.Image.get_images_from_dir(image_loc)
else:
# assume tarball
extract_to = extract_to or tempfile.mkdtemp()
images = image.Image.get_images_from_tarball(
archive_location=image_loc,
extracted_location=extract_to,
verify=True,
)
for img in images:
img.extract_layers(extracted_location=extract_to)
if not quiet:
click.echo('Extracting image tarball to: {}'.format(extract_to))
return images