forked from openvinotoolkit/open_model_zoo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_convert.py
67 lines (52 loc) · 2.55 KB
/
test_convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
Copyright (c) 2018-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import threading
import warnings
from accuracy_checker.annotation_converters.convert import AtomicWriteFileHandle
def thread_access_file(file_path, data_dict, thread_id, write_lines):
if os.path.exists(file_path):
with open(file_path, 'r') as file:
read_lines = len(file.readlines())
# when a new thread reads a file, all lines must already be written
if read_lines != write_lines:
warn_message = f"Thread {thread_id}: Incorrect number of lines read from {file_path} ({read_lines} != {write_lines})"
warnings.warn(warn_message)
data_dict['assert'] = warn_message
else:
with AtomicWriteFileHandle(file_path, 'wt') as file:
for i in range(write_lines):
file.write(f"Thread {thread_id}:Line{i} {data_dict[thread_id]}\n")
class TestAtomicWriteFileHandle:
def test_multithreaded_atomic_file_write(self):
target_file_path = "test_atomic_file.txt"
threads = []
num_threads = 10
write_lines = 10
data_chunks = [f"Data chunk {i}" for i in range(num_threads)]
threads_dict = {i: data_chunks[i] for i in range(len(data_chunks))}
if os.path.exists(target_file_path):
os.remove(target_file_path)
for i in range(num_threads):
thread = threading.Thread(target=thread_access_file, args=(target_file_path, threads_dict, i, write_lines))
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
with open(target_file_path, 'r') as file:
lines = file.readlines()
os.remove(target_file_path)
# check asserts passed from threads
assert 'assert' not in threads_dict.keys() , threads_dict['assert']
assert sum(1 for line in lines for data_chunk in data_chunks if data_chunk in line) == write_lines, f"data_chunks data not found in the {target_file_path} file"