Skip to content

Commit 3b9a672

Browse files
committed
Refactor pretranslation to use helper worker classes
1 parent d68939f commit 3b9a672

File tree

4 files changed

+296
-57
lines changed

4 files changed

+296
-57
lines changed

src/pretranslate.cpp

Lines changed: 265 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -26,45 +26,104 @@
2626
#include "pretranslate.h"
2727

2828
#include "configuration.h"
29+
#include "errors.h"
2930
#include "progress.h"
3031
#include "str_helpers.h"
3132
#include "tm/transmem.h"
3233

3334
#include <wx/stopwatch.h>
3435

36+
#include <boost/thread/thread.hpp>
37+
38+
#include <algorithm>
3539
#include <atomic>
40+
#include <chrono>
3641
#include <deque>
3742
#include <mutex>
43+
#include <thread>
44+
45+
using namespace std::chrono_literals;
3846

3947

4048
namespace pretranslate
4149
{
4250

43-
Stats PreTranslateCatalog(CatalogPtr catalog, const CatalogItemArray& range, PreTranslateOptions options, dispatch::cancellation_token_ptr cancellation_token)
51+
namespace
4452
{
45-
wxStopWatch sw;
4653

47-
if (range.empty())
48-
return {};
4954

50-
const bool use_local_tm = Config::UseTM();
51-
if (!use_local_tm)
52-
return {};
55+
struct JobMetadata
56+
{
57+
Language srclang, lang;
58+
unsigned nplurals;
59+
PreTranslateOptions options;
60+
};
5361

54-
TranslationMemory& tm = TranslationMemory::Get();
55-
auto srclang = catalog->GetSourceLanguage();
56-
auto lang = catalog->GetLanguage();
57-
const auto flags = options.flags;
5862

59-
Progress top_progress(1);
60-
top_progress.message(_(L"Preparing strings…"));
63+
/**
64+
Base class for a worked implementing pre-translation process.
65+
66+
Typically runs the work in some background threads, modifying catalog items passed to it.
67+
*/
68+
class Worker
69+
{
70+
public:
71+
Worker(const JobMetadata& meta) : m_metadata(meta), m_completed(false) {}
72+
virtual ~Worker() {}
73+
74+
/// Add another item for processing
75+
void upload(CatalogItemPtr item)
76+
{
77+
std::lock_guard lock(m_mutex);
78+
if (m_completed)
79+
return;
80+
m_queue.push_back(item);
81+
}
6182

62-
// Function to apply fetched suggestions to a catalog item:
63-
auto process_results = [=](CatalogItemPtr dt, unsigned index, const SuggestionsList& results) -> ResType
83+
/**
84+
Call to mark the job as done with adding items to the queue.
85+
Can only be called from the primary thread, i.e. one that created the worker.
86+
*/
87+
void upload_completed()
88+
{
89+
std::lock_guard lock(m_mutex);
90+
m_completed = true;
91+
}
92+
93+
/// Is processing of the entire queue finished?
94+
bool is_finished() const
95+
{
96+
std::lock_guard lock(m_mutex);
97+
return m_completed && m_queue.empty();
98+
}
99+
100+
/**
101+
Perform some amount of work on primary thread. May take some time, but shouldn't be _long_ time.
102+
103+
Returns true if there is more work to do, false otherwise.
104+
105+
Should accomodate cancellation by checking the cancellation token and returning false if cancellation
106+
is requested, but may e.g. do it inside http request handling, not immediately.
107+
108+
Can only be called from the primary thread, i.e. one that created the worker.
109+
*/
110+
virtual bool pump(dispatch::cancellation_token_ptr) = 0;
111+
112+
/// Assignable next worker to process items this worker couldn't handle
113+
Worker *next_worker = nullptr;
114+
115+
/// Assignable stats collector for processed items
116+
std::shared_ptr<Stats> stats;
117+
118+
protected:
119+
ResType process_results(CatalogItemPtr dt, unsigned index, const SuggestionsList& results)
64120
{
65121
if (results.empty())
66122
return ResType::None;
123+
124+
const auto flags = m_metadata.options.flags;
67125
auto& res = results.front();
126+
68127
if ((flags & PreTranslate_OnlyExact) && !res.IsExactMatch())
69128
return ResType::Rejected;
70129

@@ -89,33 +148,96 @@ Stats PreTranslateCatalog(CatalogPtr catalog, const CatalogItemArray& range, Pre
89148
dt->SetFuzzy(isFuzzy);
90149

91150
return res.IsExactMatch() ? ResType::Exact : ResType::Fuzzy;
92-
};
151+
}
93152

94-
Stats stats;
153+
ResType process_result(CatalogItemPtr dt, unsigned index, const Suggestion& result)
154+
{
155+
return process_results(dt, index, SuggestionsList{result});
156+
}
95157

96-
std::vector<dispatch::future<ResType>> operations;
97-
for (auto dt: range)
158+
void clear_queue()
98159
{
99-
if (dt->IsTranslated() && !dt->IsFuzzy())
100-
continue;
160+
std::lock_guard lock(m_mutex);
161+
m_queue.clear();
162+
m_completed = true;
163+
}
164+
165+
protected:
166+
JobMetadata m_metadata;
101167

102-
stats.input_strings_count++;
168+
mutable std::mutex m_mutex;
169+
std::deque<CatalogItemPtr> m_queue;
170+
std::atomic<bool> m_completed;
171+
};
103172

104-
operations.push_back(dispatch::async([=,&tm]() -> ResType
173+
174+
class LocalDBWorker : public Worker
175+
{
176+
public:
177+
LocalDBWorker(const JobMetadata& meta)
178+
: Worker(meta), m_tm(TranslationMemory::Get())
179+
{
180+
const auto nthreads = std::clamp(std::thread::hardware_concurrency(), 4u, 16u);
181+
for (unsigned i = 0; i < nthreads; ++i)
105182
{
106-
if (cancellation_token->is_cancelled())
107-
return {};
183+
m_threads.create_thread([this]{ thread_worker(); });
184+
}
185+
}
108186

109-
auto results = tm.Search(srclang, lang, str::to_wstring(dt->GetString()));
187+
bool pump(dispatch::cancellation_token_ptr cancellation_token) override
188+
{
189+
if (cancellation_token->is_cancelled())
190+
{
191+
clear_queue();
192+
// fall through to wait for threads to finish and exit
193+
}
194+
195+
if (is_finished())
196+
{
197+
m_threads.join_all();
198+
return false;
199+
}
200+
201+
return true;
202+
}
203+
204+
private:
205+
void thread_worker()
206+
{
207+
CatalogItemPtr dt;
208+
209+
while (true)
210+
{
211+
// pop one item of work:
212+
{
213+
std::lock_guard lock(m_mutex);
214+
if (m_queue.empty())
215+
{
216+
if (m_completed)
217+
{
218+
break; // no more work to do
219+
}
220+
else
221+
{
222+
std::this_thread::yield();
223+
continue; // wait for more work to be added
224+
}
225+
}
226+
227+
dt = std::move(m_queue.front());
228+
m_queue.pop_front();
229+
}
230+
231+
auto results = m_tm.Search(m_metadata.srclang, m_metadata.lang, str::to_wstring(dt->GetString()));
110232
auto rt = process_results(dt, 0, results);
111233

112234
if (translated(rt) && dt->HasPlural())
113235
{
114-
switch (lang.nplurals())
236+
switch (m_metadata.nplurals)
115237
{
116238
case 2: // "simple" English-like plurals
117239
{
118-
auto results_plural = tm.Search(srclang, lang, str::to_wstring(dt->GetPluralString()));
240+
auto results_plural = m_tm.Search(m_metadata.srclang, m_metadata.lang, str::to_wstring(dt->GetPluralString()));
119241
process_results(dt, 1, results_plural);
120242
}
121243
case 1: // nothing else to do
@@ -124,26 +246,129 @@ Stats PreTranslateCatalog(CatalogPtr catalog, const CatalogItemArray& range, Pre
124246
}
125247
}
126248

127-
return rt;
128-
}));
249+
if (next_worker)
250+
{
251+
if (!translated(rt))
252+
{
253+
// no usable translation, request elsewhere
254+
next_worker->upload(dt);
255+
continue;
256+
}
257+
else
258+
{
259+
// usable local translation, but try to find better quality elsewhere if possible
260+
auto score = results.front().score;
261+
if (score < 0.95)
262+
{
263+
next_worker->upload(dt);
264+
continue;
265+
}
266+
}
267+
}
268+
269+
// if the item wasn't passed to next worker, count it
270+
if (stats)
271+
{
272+
stats->inc_processed();
273+
stats->add(rt);
274+
}
275+
}
129276
}
130277

278+
private:
279+
boost::thread_group m_threads;
280+
TranslationMemory& m_tm;
281+
};
282+
283+
284+
} // anonymous namespace
285+
286+
287+
std::shared_ptr<Stats> PreTranslateCatalog(CatalogPtr catalog,
288+
const CatalogItemArray& range,
289+
PreTranslateOptions options,
290+
dispatch::cancellation_token_ptr cancellation_token)
291+
{
292+
wxStopWatch sw;
293+
294+
auto stats = std::make_shared<Stats>();
295+
296+
if (range.empty())
297+
return stats;
298+
299+
Progress top_progress(1);
300+
top_progress.message(_(L"Preparing strings…"));
301+
302+
const bool use_local_tm = Config::UseTM();
303+
if (!use_local_tm)
304+
return stats;
305+
306+
JobMetadata metadata;
307+
metadata.srclang = catalog->GetSourceLanguage();
308+
metadata.lang = catalog->GetLanguage();
309+
metadata.nplurals = metadata.lang.nplurals();
310+
metadata.options = options;
311+
312+
auto worker_local = use_local_tm ? std::make_unique<LocalDBWorker>(metadata) : nullptr;
313+
314+
if (worker_local)
315+
worker_local->stats = stats;
316+
317+
Worker *worker_ingest = worker_local.get();
318+
319+
Progress top_progress(1);
320+
top_progress.message(_(L"Preparing strings…"));
321+
322+
// Feed in the work to the worker:
323+
for (auto dt: range)
131324
{
132-
Progress progress((int)operations.size());
133-
progress.message(_(L"Pre-translating from translation memory…"));
325+
if (dt->IsTranslated() && !dt->IsFuzzy())
326+
continue;
327+
328+
stats->input_strings_count++;
329+
worker_ingest->upload(dt);
330+
}
331+
worker_ingest->upload_completed();
332+
333+
// Wait for completion:
334+
Progress progress(stats->input_strings_count, top_progress, 1);
335+
progress.message(_(L"Pre-translating…"));
134336

135-
for (auto& op: operations)
337+
int last_matched = 0;
338+
bool more_work = true;
339+
while (more_work)
340+
{
341+
try
136342
{
137-
if (cancellation_token->is_cancelled())
138-
break;
343+
std::this_thread::sleep_for(10ms);
344+
345+
// pump the workers:
346+
more_work = false;
347+
if (worker_local)
348+
{
349+
if (worker_local->pump(cancellation_token))
350+
{
351+
more_work = true;
352+
}
353+
else
354+
{
355+
worker_local.reset();
356+
}
357+
}
139358

140-
auto rt = op.get();
141-
stats.add(rt);
142-
if (translated(rt))
359+
// update progress bar:
360+
if (last_matched != stats->matched)
143361
{
144-
progress.message(wxString::Format(wxPLURAL("Pre-translated %u string", "Pre-translated %u strings", stats.matched), stats.matched));
362+
last_matched = stats->matched;
363+
progress.message(wxString::Format(wxPLURAL("Pre-translated %u string", "Pre-translated %u strings", last_matched), last_matched));
145364
}
146-
progress.increment();
365+
progress.set(stats->input_strings_processed);
366+
}
367+
catch (...)
368+
{
369+
stats->errors++;
370+
wxLogError("%s", DescribeCurrentException());
371+
break;
147372
}
148373
}
149374

0 commit comments

Comments
 (0)