Skip to content
This repository was archived by the owner on Aug 6, 2025. It is now read-only.

Commit 26bed05

Browse files
authored
Switch to multithreading, lock around layer pushes (#2)
1 parent 0ad7894 commit 26bed05

File tree

4 files changed

+27
-8
lines changed

4 files changed

+27
-8
lines changed

clients/logging/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def log_and_raise(self, severity, error_msg, *args, **kwargs):
115115

116116
# format the exception into the raised error message if we got one
117117
if 'exc' in kwargs:
118-
error_msg = '{0}: {1}'.format(error_msg, kwargs['exc'].lower())
118+
error_msg = '{0}: {1}'.format(error_msg, kwargs['exc'])
119119

120120
exception_type = kwargs.get('exc_type', RuntimeError)
121121

core/processor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ def process(self):
6464
manifest = self._get_manifest(tmp_dir_name)
6565
self._logger.debug('Extracted archive manifest', manifest=manifest)
6666

67-
# prepare proc pool, note tarfile is not thread safe https://bugs.python.org/issue23649
68-
with multiprocessing.pool.Pool(processes=self._parallel) as pool:
67+
# prepare thread pool, note tarfile is not thread safe https://bugs.python.org/issue23649
68+
# so if full extraction is not done beforehand, this is not safe
69+
with multiprocessing.pool.ThreadPool(processes=self._parallel) as pool:
6970
for image_config in manifest:
7071
res = pool.apply_async(
7172
process_image,

core/registry.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import hashlib
66
import urllib.parse
77
import time
8+
import threading
89

910
import humanfriendly
1011
import requests
@@ -41,6 +42,8 @@ def __init__(
4142
if self._login:
4243
self._basicauth = requests.auth.HTTPBasicAuth(self._login, self._password)
4344

45+
self._layer_locks = {}
46+
4447
self._logger.debug(
4548
'Initialized',
4649
registry_url=self._registry_url,
@@ -78,10 +81,7 @@ def process_image(self, tmp_dir_name, image_config):
7881
# push individual image layers
7982
layers = image_config["Layers"]
8083
for layer in layers:
81-
self._logger.info('Pushing layer', layer=layer)
82-
push_url = self._initialize_push(image)
83-
layer_path = os.path.join(tmp_dir_name, layer)
84-
self._push_layer(layer_path, push_url)
84+
self._process_layer(layer, image, tmp_dir_name)
8585

8686
# then, push image config
8787
self._logger.info(
@@ -150,6 +150,24 @@ def _push_manifest(self, manifest, image, tag):
150150
content=response.content,
151151
)
152152

153+
def _process_layer(self, layer, image, tmp_dir_name):
154+
155+
# isolate layer key
156+
layer_key = os.path.dirname(layer)
157+
158+
# pushing the layer in parallel from different images might result in 500 internal server error
159+
self._logger.debug('Acquiring layer lock', layer_key=layer_key)
160+
self._layer_locks.setdefault(layer_key, threading.Lock())
161+
self._layer_locks[layer_key].acquire()
162+
try:
163+
self._logger.info('Pushing layer', layer=layer)
164+
push_url = self._initialize_push(image)
165+
layer_path = os.path.join(tmp_dir_name, layer)
166+
self._push_layer(layer_path, push_url)
167+
finally:
168+
self._logger.debug('Releasing layer lock', layer_key=layer_key)
169+
self._layer_locks[layer_key].release()
170+
153171
def _initialize_push(self, repository):
154172
"""
155173
Request a push URL for the image repository for a layer or manifest

dockerregistrypusher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def register_arguments(parser):
6363
parser.add_argument(
6464
'-p',
6565
'--parallel',
66-
help='Control parallelism (multi-processing)',
66+
help='Control parallelism (threads)',
6767
type=int,
6868
default=1,
6969
)

0 commit comments

Comments
 (0)