From 5f60a68aab8b6fd4b3fd4d1300fba65316941fdb Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Tue, 14 May 2024 09:27:40 +0200 Subject: [PATCH 1/7] Add AtomicWriteFileHandle to utils.py --- .../accuracy_checker/utils.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tools/accuracy_checker/accuracy_checker/utils.py b/tools/accuracy_checker/accuracy_checker/utils.py index 8e791364876..8dfb12a6921 100644 --- a/tools/accuracy_checker/accuracy_checker/utils.py +++ b/tools/accuracy_checker/accuracy_checker/utils.py @@ -24,6 +24,7 @@ import sys import zlib import re +import tempfile from enum import Enum from pathlib import Path @@ -981,3 +982,38 @@ def ov_new_api_available(): return True except ImportError: return False + + +class AtomicWriteFileHandle: + """Ensure the file is written once in case of multi processes or threads.""" + + def __init__(self, file_path, open_mode): + self.target_path = file_path + self.mode = open_mode + self.temp_fd, self.temp_path = tempfile.mkstemp() + self.temp_file = os.fdopen(self.temp_fd, open_mode) + + def write(self, data): + self.temp_file.write(data) + + def writelines(self, lines): + self.temp_file.writelines(lines) + + def close(self): + if not self.temp_file.closed: + self.temp_file.close() + if not os.path.exists(self.target_path): + os.rename(self.temp_path, self.target_path) + else: + os.remove(self.temp_path) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + # Mimic other file object methods as needed + def __getattr__(self, item): + """Delegate attribute access to the underlying temporary file object.""" + return getattr(self.temp_file, item) From b34be8eae2d184cf000b854b26fdb078a84d6827 Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Tue, 14 May 2024 09:30:13 +0200 Subject: [PATCH 2/7] Update convert.py with AtomicWriteFileHandle --- .../accuracy_checker/annotation_converters/convert.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py b/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py index 6d508fb0c83..bea91977570 100644 --- a/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py +++ b/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py @@ -34,7 +34,8 @@ ) from ..data_readers import KaldiFrameIdentifier, KaldiMatrixIdentifier from ..utils import ( - get_path, OrderedSet, cast_to_bool, is_relative_to, start_telemetry, send_telemetry_event, end_telemetry + get_path, OrderedSet, cast_to_bool, is_relative_to, start_telemetry, send_telemetry_event, + end_telemetry, AtomicWriteFileHandle ) from ..data_analyzer import BaseDataAnalyzer from .format_converter import BaseFormatConverter @@ -327,7 +328,7 @@ def save_annotation(annotation, meta, annotation_file, meta_file, dataset_config annotation_dir = annotation_file.parent if not annotation_dir.exists(): annotation_dir.mkdir(parents=True) - with annotation_file.open('wb') as file: + with AtomicWriteFileHandle(annotation_file,'wb') as file: if conversion_meta: pickle.dump(conversion_meta, file) for representation in annotation: @@ -337,7 +338,7 @@ def save_annotation(annotation, meta, annotation_file, meta_file, dataset_config meta_dir = meta_file.parent if not meta_dir.exists(): meta_dir.mkdir(parents=True) - with meta_file.open('wt') as file: + with AtomicWriteFileHandle(meta_file, 'wt') as file: json.dump(meta, file) From 3a201fc3d63aaa13a0601239403584bfcf1250b2 Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Tue, 14 May 2024 09:31:49 +0200 Subject: [PATCH 3/7] Test AtomicWriteFileHandle with test_utils.py --- tools/accuracy_checker/tests/test_utils.py | 36 +++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tools/accuracy_checker/tests/test_utils.py b/tools/accuracy_checker/tests/test_utils.py index cd01ee69f39..0f97858f877 100644 --- a/tools/accuracy_checker/tests/test_utils.py +++ b/tools/accuracy_checker/tests/test_utils.py @@ -14,7 +14,9 @@ limitations under the License. """ -from accuracy_checker.utils import concat_lists, contains_all, contains_any, overrides, zipped_transform +import os +import threading +from accuracy_checker.utils import concat_lists, contains_all, contains_any, overrides, zipped_transform, AtomicWriteFileHandle def test_concat_lists(): @@ -125,3 +127,35 @@ class C: assert overrides(B, 'foo', A) assert not overrides(C, 'foo', A) + + +def thread_write_to_file(file_path, data, thread_id): + with AtomicWriteFileHandle(file_path, 'wt') as file: + file.write(f"Thread {thread_id}: {data}\n") + + +class TestAtomicWriteFileHandle: + + def test_multithreaded_atomic_file_write(self): + target_file_path = "test_atomic_file.txt" + threads = [] + num_threads = 8 + data_chunks = [f"Data chunk {i}" for i in range(num_threads)] + + if os.path.exists(target_file_path): + os.remove(target_file_path) + + for i in range(num_threads): + thread = threading.Thread(target=thread_write_to_file, args=(target_file_path, data_chunks[i], i)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + with open(target_file_path, 'r') as file: + lines = file.readlines() + + os.remove(target_file_path) + + assert any(data_chunk in line for line in lines for data_chunk in data_chunks), f"data_chunks data not found in the file" From 27a6a5e401d2c6a37d5f17e97f68b847887ff003 Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Tue, 14 May 2024 09:49:58 +0200 Subject: [PATCH 4/7] Start all threads at once in TestAtomicWriteFileHandle --- tools/accuracy_checker/tests/test_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/accuracy_checker/tests/test_utils.py b/tools/accuracy_checker/tests/test_utils.py index 0f97858f877..0eee0e47514 100644 --- a/tools/accuracy_checker/tests/test_utils.py +++ b/tools/accuracy_checker/tests/test_utils.py @@ -148,7 +148,9 @@ def test_multithreaded_atomic_file_write(self): for i in range(num_threads): thread = threading.Thread(target=thread_write_to_file, args=(target_file_path, data_chunks[i], i)) threads.append(thread) - thread.start() + + for thread in threads: + thread.start() for thread in threads: thread.join() From be57f8f805acea3fef8bc29d568d863a83ee39c9 Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Wed, 15 May 2024 12:06:38 +0200 Subject: [PATCH 5/7] Create temporary file in target directory --- tools/accuracy_checker/accuracy_checker/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/accuracy_checker/accuracy_checker/utils.py b/tools/accuracy_checker/accuracy_checker/utils.py index 8dfb12a6921..b756505f300 100644 --- a/tools/accuracy_checker/accuracy_checker/utils.py +++ b/tools/accuracy_checker/accuracy_checker/utils.py @@ -990,7 +990,7 @@ class AtomicWriteFileHandle: def __init__(self, file_path, open_mode): self.target_path = file_path self.mode = open_mode - self.temp_fd, self.temp_path = tempfile.mkstemp() + self.temp_fd, self.temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path)) self.temp_file = os.fdopen(self.temp_fd, open_mode) def write(self, data): From 0ecb28381a399356d0537adb37335b7f5e3f8901 Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Wed, 15 May 2024 16:01:25 +0200 Subject: [PATCH 6/7] Avoid too much lines in utils.py --- .../annotation_converters/convert.py | 41 +++++++++++- .../accuracy_checker/utils.py | 36 ---------- tools/accuracy_checker/tests/test_convert.py | 67 +++++++++++++++++++ tools/accuracy_checker/tests/test_utils.py | 38 +---------- 4 files changed, 106 insertions(+), 76 deletions(-) create mode 100644 tools/accuracy_checker/tests/test_convert.py diff --git a/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py b/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py index bea91977570..7066bc3b219 100644 --- a/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py +++ b/tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py @@ -16,9 +16,10 @@ import warnings import platform import sys - +import os import copy import json +import tempfile from pathlib import Path import pickle # nosec B403 # disable import-pickle check from argparse import ArgumentParser @@ -34,8 +35,7 @@ ) from ..data_readers import KaldiFrameIdentifier, KaldiMatrixIdentifier from ..utils import ( - get_path, OrderedSet, cast_to_bool, is_relative_to, start_telemetry, send_telemetry_event, - end_telemetry, AtomicWriteFileHandle + get_path, OrderedSet, cast_to_bool, is_relative_to, start_telemetry, send_telemetry_event, end_telemetry ) from ..data_analyzer import BaseDataAnalyzer from .format_converter import BaseFormatConverter @@ -410,3 +410,38 @@ def analyze_dataset(annotations, metadata): else: metadata = {'data_analysis': data_analysis} return metadata + +class AtomicWriteFileHandle: + """Ensure the file is written once in case of multi processes or threads.""" + + def __init__(self, file_path, open_mode): + self.target_path = file_path + self.mode = open_mode + + self.temp_fd, self.temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path)) + self.temp_file = os.fdopen(self.temp_fd, open_mode) + + def write(self, data): + self.temp_file.write(data) + + def writelines(self, lines): + self.temp_file.writelines(lines) + + def close(self): + if not self.temp_file.closed: + self.temp_file.close() + if not os.path.exists(self.target_path): + os.rename(self.temp_path, self.target_path) + else: + os.remove(self.temp_path) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + # Mimic other file object methods as needed + def __getattr__(self, item): + """Delegate attribute access to the underlying temporary file object.""" + return getattr(self.temp_file, item) diff --git a/tools/accuracy_checker/accuracy_checker/utils.py b/tools/accuracy_checker/accuracy_checker/utils.py index b756505f300..8e791364876 100644 --- a/tools/accuracy_checker/accuracy_checker/utils.py +++ b/tools/accuracy_checker/accuracy_checker/utils.py @@ -24,7 +24,6 @@ import sys import zlib import re -import tempfile from enum import Enum from pathlib import Path @@ -982,38 +981,3 @@ def ov_new_api_available(): return True except ImportError: return False - - -class AtomicWriteFileHandle: - """Ensure the file is written once in case of multi processes or threads.""" - - def __init__(self, file_path, open_mode): - self.target_path = file_path - self.mode = open_mode - self.temp_fd, self.temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path)) - self.temp_file = os.fdopen(self.temp_fd, open_mode) - - def write(self, data): - self.temp_file.write(data) - - def writelines(self, lines): - self.temp_file.writelines(lines) - - def close(self): - if not self.temp_file.closed: - self.temp_file.close() - if not os.path.exists(self.target_path): - os.rename(self.temp_path, self.target_path) - else: - os.remove(self.temp_path) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - # Mimic other file object methods as needed - def __getattr__(self, item): - """Delegate attribute access to the underlying temporary file object.""" - return getattr(self.temp_file, item) diff --git a/tools/accuracy_checker/tests/test_convert.py b/tools/accuracy_checker/tests/test_convert.py new file mode 100644 index 00000000000..36fcacff624 --- /dev/null +++ b/tools/accuracy_checker/tests/test_convert.py @@ -0,0 +1,67 @@ +""" +Copyright (c) 2018-2024 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +import threading +import warnings +from accuracy_checker.annotation_converters.convert import AtomicWriteFileHandle + +def thread_access_file(file_path, data_dict, thread_id, write_lines): + if os.path.exists(file_path): + with open(file_path, 'r') as file: + read_lines = len(file.readlines()) + # when new thread sees file it must have all lines already written + if read_lines != write_lines: + warn_message = f"Thread {thread_id}: Incorrect number of lines read from {file_path} ({read_lines} != {write_lines})" + warnings.warn(warn_message) + data_dict['assert'] = warn_message + else: + with AtomicWriteFileHandle(file_path, 'wt') as file: + for i in range(write_lines): + file.write(f"Thread {thread_id}:Line{i} {data_dict[thread_id]}\n") + +class TestAtomicWriteFileHandle: + + def test_multithreaded_atomic_file_write(self): + target_file_path = "test_atomic_file.txt" + threads = [] + num_threads = 10 + write_lines = 10 + data_chunks = [f"Data chunk {i}" for i in range(num_threads)] + threads_dict = {i: data_chunks[i] for i in range(len(data_chunks))} + + if os.path.exists(target_file_path): + os.remove(target_file_path) + + for i in range(num_threads): + thread = threading.Thread(target=thread_access_file, args=(target_file_path, threads_dict, i, write_lines)) + threads.append(thread) + + for thread in threads: + thread.start() + + for i,thread in enumerate(threads): + thread.join() + + with open(target_file_path, 'r') as file: + lines = file.readlines() + + os.remove(target_file_path) + + # check asserts passed from threads + assert 'assert' not in threads_dict.keys() , threads_dict['assert'] + + assert sum(1 for line in lines for data_chunk in data_chunks if data_chunk in line) == write_lines, f"data_chunks data not found in the {target_file_path} file" diff --git a/tools/accuracy_checker/tests/test_utils.py b/tools/accuracy_checker/tests/test_utils.py index 0eee0e47514..cd01ee69f39 100644 --- a/tools/accuracy_checker/tests/test_utils.py +++ b/tools/accuracy_checker/tests/test_utils.py @@ -14,9 +14,7 @@ limitations under the License. """ -import os -import threading -from accuracy_checker.utils import concat_lists, contains_all, contains_any, overrides, zipped_transform, AtomicWriteFileHandle +from accuracy_checker.utils import concat_lists, contains_all, contains_any, overrides, zipped_transform def test_concat_lists(): @@ -127,37 +125,3 @@ class C: assert overrides(B, 'foo', A) assert not overrides(C, 'foo', A) - - -def thread_write_to_file(file_path, data, thread_id): - with AtomicWriteFileHandle(file_path, 'wt') as file: - file.write(f"Thread {thread_id}: {data}\n") - - -class TestAtomicWriteFileHandle: - - def test_multithreaded_atomic_file_write(self): - target_file_path = "test_atomic_file.txt" - threads = [] - num_threads = 8 - data_chunks = [f"Data chunk {i}" for i in range(num_threads)] - - if os.path.exists(target_file_path): - os.remove(target_file_path) - - for i in range(num_threads): - thread = threading.Thread(target=thread_write_to_file, args=(target_file_path, data_chunks[i], i)) - threads.append(thread) - - for thread in threads: - thread.start() - - for thread in threads: - thread.join() - - with open(target_file_path, 'r') as file: - lines = file.readlines() - - os.remove(target_file_path) - - assert any(data_chunk in line for line in lines for data_chunk in data_chunks), f"data_chunks data not found in the file" From a05702f5511a40cc54ea718b50683ae54fc51cae Mon Sep 17 00:00:00 2001 From: Piotr Wolnowski Date: Wed, 15 May 2024 16:09:54 +0200 Subject: [PATCH 7/7] Correct description remove unnecessary enumeration --- tools/accuracy_checker/tests/test_convert.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/accuracy_checker/tests/test_convert.py b/tools/accuracy_checker/tests/test_convert.py index 36fcacff624..d34f4959b53 100644 --- a/tools/accuracy_checker/tests/test_convert.py +++ b/tools/accuracy_checker/tests/test_convert.py @@ -23,7 +23,7 @@ def thread_access_file(file_path, data_dict, thread_id, write_lines): if os.path.exists(file_path): with open(file_path, 'r') as file: read_lines = len(file.readlines()) - # when new thread sees file it must have all lines already written + # when a new thread reads a file, all lines must already be written if read_lines != write_lines: warn_message = f"Thread {thread_id}: Incorrect number of lines read from {file_path} ({read_lines} != {write_lines})" warnings.warn(warn_message) @@ -53,7 +53,7 @@ def test_multithreaded_atomic_file_write(self): for thread in threads: thread.start() - for i,thread in enumerate(threads): + for thread in threads: thread.join() with open(target_file_path, 'r') as file: