Skip to content

Commit

Permalink
Fix building index images in parallel (#183)
Browse files Browse the repository at this point in the history
Using sys.argv and environment variables make calling entrypoint
pubtools-iib-add-bundles is not thread safe, so following changes are
made.

- Call entrypoint pubtools-iib-add-bundles with parameters instead of
  using sys.argv.
- Add setting required environment variables for all threads before
  running the thread pool and cleaning them after that.
  • Loading branch information
zxiong authored Sep 5, 2023
1 parent 3bec27c commit 25d9b71
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 27 deletions.
54 changes: 33 additions & 21 deletions pubtools/_quay/operator_pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import yaml
from concurrent import futures
from concurrent.futures.thread import ThreadPoolExecutor
import os

import requests
from requests.adapters import HTTPAdapter
Expand Down Expand Up @@ -685,28 +686,39 @@ def build_index_images(self):

num_thread_build_index_images = self.target_settings.get("num_thread_build_index_images", 5)

with ThreadPoolExecutor(max_workers=num_thread_build_index_images) as executor:
future_results = {
executor.submit(
lambda param: self.iib_add_bundles(
bundles=param.bundles,
index_image=param.index_image,
deprecation_list=param.deprecation_list,
build_tags=param.build_tags,
target_settings=param.target_settings,
),
param,
): param
for param in build_index_image_params
}
for future in futures.as_completed(future_results):
build_details = future.result()
param = future_results[future]
iib_results[param.tag + "-" + param.origin] = {
"iib_result": build_details,
"signing_keys": param.signing_keys,
"destination_tags": param.destination_tags,
orig_environ = os.environ.copy()
_, environ_vars = self.pubtools_iib_get_common_args(self.target_settings)
for key in environ_vars:
os.environ[key] = environ_vars[key]

try:
with ThreadPoolExecutor(max_workers=num_thread_build_index_images) as executor:
future_results = {
executor.submit(
lambda param: self.iib_add_bundles(
bundles=param.bundles,
index_image=param.index_image,
deprecation_list=param.deprecation_list,
build_tags=param.build_tags,
target_settings=param.target_settings,
),
param,
): param
for param in build_index_image_params
}
for future in futures.as_completed(future_results):
build_details = future.result()
param = future_results[future]
iib_results[param.tag + "-" + param.origin] = {
"iib_result": build_details,
"signing_keys": param.signing_keys,
"destination_tags": param.destination_tags,
}
finally:
os.environ.update(orig_environ)
to_delete = [key for key in os.environ if key not in orig_environ]
for key in to_delete:
del os.environ[key]

return iib_results

Expand Down
11 changes: 6 additions & 5 deletions pubtools/_quay/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,23 @@ def setup_entry_point_cli(entry_tuple, name, args, environ_vars):
environ_vars (dict):
Env variable names and values to set for the entrypoint.
"""
orig_argv = sys.argv[:]
orig_environ = os.environ.copy()

try:
# First argv element is always the entry point name.
# For a console_scripts entry point, this will be the same value
# as if the script was invoked directly. For any other kind of entry point,
# this value is probably meaningless.
sys.argv = [name]
sys.argv.extend(args)
for key in environ_vars:
os.environ[key] = environ_vars[key]
entry_point_func = pkg_resources.load_entry_point(*entry_tuple)
yield entry_point_func
if args:
func_args = [name]
func_args.extend(args)
yield functools.partial(entry_point_func, func_args)
else:
yield entry_point_func
finally:
sys.argv = orig_argv[:]
os.environ.update(orig_environ)

to_delete = [key for key in os.environ if key not in orig_environ]
Expand Down
28 changes: 27 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


def test_run_entrypoint(caplog):
def side_effect_entrypoint():
def side_effect_entrypoint(*args):
print("output generated by entrypoint")
LOG.warning("log generated by entrypoint")
LOG.info("environment variable: {0}".format(os.environ["PYXIS_PASSWORD"]))
Expand Down Expand Up @@ -53,6 +53,32 @@ def side_effect_entrypoint():
compare_logs(caplog, expected_logs)


def test_run_entrypoint_without_args(caplog):
def side_effect_entrypoint(*args):
print("output generated by entrypoint")
LOG.warning("log generated by entrypoint")
LOG.info("environment variable: {0}".format(os.environ["PYXIS_PASSWORD"]))
return {"pyxis": "data"}

with mock_entry_point(
"pubtools-pyxis", "console_scripts", "pubtools-pyxis-get-operator-indices"
) as entrypoint:
entrypoint.side_effect = side_effect_entrypoint
ret_val = misc.run_entrypoint(
("pubtools-pyxis", "console_scripts", "pubtools-pyxis-get-operator-indices"),
"pubtools-pyxis-get-operator-indices",
[],
{"PYXIS_PASSWORD": "some-password"},
)
expected_logs = [
"Running task with arguments:",
"pubtools-pyxis-get-operator-indices",
"log generated by entrypoint",
"environment variable: some-password",
]
compare_logs(caplog, expected_logs)


def test_get_internal_repo_name():
internal_name = misc.get_internal_container_repo_name("namespace/repo")
assert internal_name == "namespace----repo"
Expand Down

0 comments on commit 25d9b71

Please sign in to comment.