2626#include " pretranslate.h"
2727
2828#include " configuration.h"
29+ #include " errors.h"
2930#include " progress.h"
3031#include " str_helpers.h"
3132#include " tm/transmem.h"
3233
3334#include < wx/stopwatch.h>
3435
36+ #include < boost/thread/thread.hpp>
37+
38+ #include < algorithm>
3539#include < atomic>
40+ #include < chrono>
3641#include < deque>
3742#include < mutex>
43+ #include < thread>
44+
45+ using namespace std ::chrono_literals;
3846
3947
4048namespace pretranslate
4149{
4250
43- Stats PreTranslateCatalog (CatalogPtr catalog, const CatalogItemArray& range, PreTranslateOptions options, dispatch::cancellation_token_ptr cancellation_token)
51+ namespace
4452{
45- wxStopWatch sw;
4653
47- if (range.empty ())
48- return {};
4954
50- const bool use_local_tm = Config::UseTM ();
51- if (!use_local_tm)
52- return {};
55+ struct JobMetadata
56+ {
57+ Language srclang, lang;
58+ unsigned nplurals;
59+ PreTranslateOptions options;
60+ };
5361
54- TranslationMemory& tm = TranslationMemory::Get ();
55- auto srclang = catalog->GetSourceLanguage ();
56- auto lang = catalog->GetLanguage ();
57- const auto flags = options.flags ;
5862
59- Progress top_progress (1 );
60- top_progress.message (_ (L" Preparing strings…" ));
63+ /* *
64+ Base class for a worked implementing pre-translation process.
65+
66+ Typically runs the work in some background threads, modifying catalog items passed to it.
67+ */
68+ class Worker
69+ {
70+ public:
71+ Worker (const JobMetadata& meta) : m_metadata(meta), m_completed(false ) {}
72+ virtual ~Worker () {}
73+
74+ // / Add another item for processing
75+ void upload (CatalogItemPtr item)
76+ {
77+ std::lock_guard lock (m_mutex);
78+ if (m_completed)
79+ return ;
80+ m_queue.push_back (item);
81+ }
82+
83+ /* *
84+ Call to mark the job as done with adding items to the queue.
85+ Can only be called from the primary thread, i.e. one that created the worker.
86+ */
87+ void upload_completed ()
88+ {
89+ std::lock_guard lock (m_mutex);
90+ m_completed = true ;
91+ }
92+
93+ // / Is processing of the entire queue finished?
94+ bool is_finished () const
95+ {
96+ std::lock_guard lock (m_mutex);
97+ return m_completed && m_queue.empty ();
98+ }
6199
62- // Function to apply fetched suggestions to a catalog item:
63- auto process_results = [=](CatalogItemPtr dt, unsigned index, const SuggestionsList& results) -> ResType
100+ /* *
101+ Perform some amount of work on primary thread. May take some time, but shouldn't be _long_ time.
102+
103+ Returns true if there is more work to do, false otherwise.
104+
105+ Should accomodate cancellation by checking the cancellation token and returning false if cancellation
106+ is requested, but may e.g. do it inside http request handling, not immediately.
107+
108+ Can only be called from the primary thread, i.e. one that created the worker.
109+ */
110+ virtual bool pump (dispatch::cancellation_token_ptr) = 0;
111+
112+ // / Assignable next worker to process items this worker couldn't handle
113+ Worker *next_worker = nullptr ;
114+
115+ // / Assignable stats collector for processed items
116+ std::shared_ptr<Stats> stats;
117+
118+ protected:
119+ ResType process_results (CatalogItemPtr dt, unsigned index, const SuggestionsList& results)
64120 {
65121 if (results.empty ())
66122 return ResType::None;
123+
124+ const auto flags = m_metadata.options .flags ;
67125 auto & res = results.front ();
126+
68127 if ((flags & PreTranslate_OnlyExact) && !res.IsExactMatch ())
69128 return ResType::Rejected;
70129
@@ -89,33 +148,96 @@ Stats PreTranslateCatalog(CatalogPtr catalog, const CatalogItemArray& range, Pre
89148 dt->SetFuzzy (isFuzzy);
90149
91150 return res.IsExactMatch () ? ResType::Exact : ResType::Fuzzy;
92- };
151+ }
93152
94- Stats stats;
153+ ResType process_result (CatalogItemPtr dt, unsigned index, const Suggestion& result)
154+ {
155+ return process_results (dt, index, SuggestionsList{result});
156+ }
95157
96- std::vector<dispatch::future<ResType>> operations;
97- for (auto dt: range)
158+ void clear_queue ()
98159 {
99- if (dt->IsTranslated () && !dt->IsFuzzy ())
100- continue ;
160+ std::lock_guard lock (m_mutex);
161+ m_queue.clear ();
162+ m_completed = true ;
163+ }
164+
165+ protected:
166+ JobMetadata m_metadata;
167+
168+ mutable std::mutex m_mutex;
169+ std::deque<CatalogItemPtr> m_queue;
170+ std::atomic<bool > m_completed;
171+ };
172+
173+
174+ class LocalDBWorker : public Worker
175+ {
176+ public:
177+ LocalDBWorker (const JobMetadata& meta)
178+ : Worker(meta), m_tm(TranslationMemory::Get())
179+ {
180+ const auto nthreads = std::clamp (std::thread::hardware_concurrency (), 4u , 16u );
181+ for (unsigned i = 0 ; i < nthreads; ++i)
182+ {
183+ m_threads.create_thread ([this ]{ thread_worker (); });
184+ }
185+ }
186+
187+ bool pump (dispatch::cancellation_token_ptr cancellation_token) override
188+ {
189+ if (cancellation_token->is_cancelled ())
190+ {
191+ clear_queue ();
192+ // fall through to wait for threads to finish and exit
193+ }
101194
102- stats.input_strings_count ++;
195+ if (is_finished ())
196+ {
197+ m_threads.join_all ();
198+ return false ;
199+ }
200+
201+ return true ;
202+ }
103203
104- operations.push_back (dispatch::async ([=,&tm]() -> ResType
204+ private:
205+ void thread_worker ()
206+ {
207+ CatalogItemPtr dt;
208+
209+ while (true )
105210 {
106- if (cancellation_token->is_cancelled ())
107- return {};
211+ // pop one item of work:
212+ {
213+ std::lock_guard lock (m_mutex);
214+ if (m_queue.empty ())
215+ {
216+ if (m_completed)
217+ {
218+ break ; // no more work to do
219+ }
220+ else
221+ {
222+ std::this_thread::yield ();
223+ continue ; // wait for more work to be added
224+ }
225+ }
226+
227+ dt = std::move (m_queue.front ());
228+ m_queue.pop_front ();
229+ }
108230
109- auto results = tm .Search (srclang, lang, str::to_wstring (dt->GetString ()));
231+ auto results = m_tm .Search (m_metadata. srclang , m_metadata. lang , str::to_wstring (dt->GetString ()));
110232 auto rt = process_results (dt, 0 , results);
111233
112234 if (translated (rt) && dt->HasPlural ())
113235 {
114- switch (lang .nplurals () )
236+ switch (m_metadata .nplurals )
115237 {
116238 case 2 : // "simple" English-like plurals
117239 {
118- auto results_plural = tm .Search (srclang, lang, str::to_wstring (dt->GetPluralString ()));
240+ auto results_plural = m_tm .Search (m_metadata. srclang , m_metadata. lang , str::to_wstring (dt->GetPluralString ()));
119241 process_results (dt, 1 , results_plural);
120242 }
121243 case 1 : // nothing else to do
@@ -124,26 +246,126 @@ Stats PreTranslateCatalog(CatalogPtr catalog, const CatalogItemArray& range, Pre
124246 }
125247 }
126248
127- return rt;
128- }));
249+ if (next_worker)
250+ {
251+ if (!translated (rt))
252+ {
253+ // no usable translation, request elsewhere
254+ next_worker->upload (dt);
255+ continue ;
256+ }
257+ else
258+ {
259+ // usable local translation, but try to find better quality elsewhere if possible
260+ auto score = results.front ().score ;
261+ if (score < 0.95 )
262+ {
263+ next_worker->upload (dt);
264+ continue ;
265+ }
266+ }
267+ }
268+
269+ // if the item wasn't passed to next worker, count it
270+ if (stats)
271+ {
272+ stats->inc_processed ();
273+ stats->add (rt);
274+ }
275+ }
129276 }
130277
278+ private:
279+ boost::thread_group m_threads;
280+ TranslationMemory& m_tm;
281+ };
282+
283+
284+ } // anonymous namespace
285+
286+
287+ std::shared_ptr<Stats> PreTranslateCatalog (CatalogPtr catalog,
288+ const CatalogItemArray& range,
289+ PreTranslateOptions options,
290+ dispatch::cancellation_token_ptr cancellation_token)
291+ {
292+ wxStopWatch sw;
293+
294+ auto stats = std::make_shared<Stats>();
295+
296+ if (range.empty ())
297+ return stats;
298+
299+ const bool use_local_tm = Config::UseTM ();
300+ if (!use_local_tm)
301+ return stats;
302+
303+ JobMetadata metadata;
304+ metadata.srclang = catalog->GetSourceLanguage ();
305+ metadata.lang = catalog->GetLanguage ();
306+ metadata.nplurals = metadata.lang .nplurals ();
307+ metadata.options = options;
308+
309+ auto worker_local = use_local_tm ? std::make_unique<LocalDBWorker>(metadata) : nullptr ;
310+
311+ if (worker_local)
312+ worker_local->stats = stats;
313+
314+ Worker *worker_ingest = worker_local.get ();
315+
316+ Progress top_progress (1 );
317+ top_progress.message (_ (L" Preparing strings…" ));
318+
319+ // Feed in the work to the worker:
320+ for (auto dt: range)
131321 {
132- Progress progress ((int )operations.size ());
133- progress.message (_ (L" Pre-translating from translation memory…" ));
322+ if (dt->IsTranslated () && !dt->IsFuzzy ())
323+ continue ;
324+
325+ stats->input_strings_count ++;
326+ worker_ingest->upload (dt);
327+ }
328+ worker_ingest->upload_completed ();
329+
330+ // Wait for completion:
331+ Progress progress (stats->input_strings_count , top_progress, 1 );
332+ progress.message (_ (L" Pre-translating…" ));
134333
135- for (auto & op: operations)
334+ int last_matched = 0 ;
335+ bool more_work = true ;
336+ while (more_work)
337+ {
338+ try
136339 {
137- if (cancellation_token->is_cancelled ())
138- break ;
340+ std::this_thread::sleep_for (10ms);
341+
342+ // pump the workers:
343+ more_work = false ;
344+ if (worker_local)
345+ {
346+ if (worker_local->pump (cancellation_token))
347+ {
348+ more_work = true ;
349+ }
350+ else
351+ {
352+ worker_local.reset ();
353+ }
354+ }
139355
140- auto rt = op.get ();
141- stats.add (rt);
142- if (translated (rt))
356+ // update progress bar:
357+ if (last_matched != stats->matched )
143358 {
144- progress.message (wxString::Format (wxPLURAL (" Pre-translated %u string" , " Pre-translated %u strings" , stats.matched ), stats.matched ));
359+ last_matched = stats->matched ;
360+ progress.message (wxString::Format (wxPLURAL (" Pre-translated %u string" , " Pre-translated %u strings" , last_matched), last_matched));
145361 }
146- progress.increment ();
362+ progress.set (stats->input_strings_processed );
363+ }
364+ catch (...)
365+ {
366+ stats->errors ++;
367+ wxLogError (" %s" , DescribeCurrentException ());
368+ break ;
147369 }
148370 }
149371
0 commit comments