|
47 | 47 | #include <thread>
|
48 | 48 | #include <utility>
|
49 | 49 | #include <vector>
|
| 50 | +#if OPENTELEMETRY_HAVE_EXCEPTIONS |
| 51 | +# include <exception> |
| 52 | +#endif |
50 | 53 |
|
51 | 54 | #if !defined(__CYGWIN__) && defined(_WIN32)
|
52 | 55 | # ifndef WIN32_LEAN_AND_MEAN
|
@@ -1424,71 +1427,90 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
|
1424 | 1427 | return;
|
1425 | 1428 | }
|
1426 | 1429 |
|
1427 |
| - std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock}; |
1428 |
| - if (file_->background_flush_thread) |
| 1430 | +#if OPENTELEMETRY_HAVE_EXCEPTIONS |
| 1431 | + try |
1429 | 1432 | {
|
1430 |
| - return; |
1431 |
| - } |
1432 |
| - |
1433 |
| - std::shared_ptr<FileStats> concurrency_file = file_; |
1434 |
| - std::chrono::microseconds flush_interval = options_.flush_interval; |
1435 |
| - file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() { |
1436 |
| - std::chrono::system_clock::time_point last_free_job_timepoint = |
1437 |
| - std::chrono::system_clock::now(); |
1438 |
| - std::size_t last_record_count = 0; |
| 1433 | +#endif |
1439 | 1434 |
|
1440 |
| - while (true) |
| 1435 | + std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock}; |
| 1436 | + if (file_->background_flush_thread) |
1441 | 1437 | {
|
1442 |
| - std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); |
1443 |
| - // Exit flush thread if there is not data to flush more than one minute. |
1444 |
| - if (now - last_free_job_timepoint > std::chrono::minutes{1}) |
1445 |
| - { |
1446 |
| - break; |
1447 |
| - } |
| 1438 | + return; |
| 1439 | + } |
1448 | 1440 |
|
1449 |
| - if (concurrency_file->is_shutdown.load(std::memory_order_acquire)) |
1450 |
| - { |
1451 |
| - break; |
1452 |
| - } |
| 1441 | + std::shared_ptr<FileStats> concurrency_file = file_; |
| 1442 | + std::chrono::microseconds flush_interval = options_.flush_interval; |
| 1443 | + file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() { |
| 1444 | + std::chrono::system_clock::time_point last_free_job_timepoint = |
| 1445 | + std::chrono::system_clock::now(); |
| 1446 | + std::size_t last_record_count = 0; |
1453 | 1447 |
|
| 1448 | + while (true) |
1454 | 1449 | {
|
1455 |
| - std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock); |
1456 |
| - concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval); |
1457 |
| - } |
| 1450 | + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); |
| 1451 | + // Exit flush thread if there is not data to flush more than one minute. |
| 1452 | + if (now - last_free_job_timepoint > std::chrono::minutes{1}) |
| 1453 | + { |
| 1454 | + break; |
| 1455 | + } |
1458 | 1456 |
|
1459 |
| - { |
1460 |
| - std::size_t current_record_count = |
1461 |
| - concurrency_file->record_count.load(std::memory_order_acquire); |
1462 |
| - std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock}; |
1463 |
| - if (current_record_count != last_record_count) |
| 1457 | + if (concurrency_file->is_shutdown.load(std::memory_order_acquire)) |
1464 | 1458 | {
|
1465 |
| - last_record_count = current_record_count; |
1466 |
| - last_free_job_timepoint = std::chrono::system_clock::now(); |
| 1459 | + break; |
1467 | 1460 | }
|
1468 | 1461 |
|
1469 |
| - if (concurrency_file->current_file) |
1470 | 1462 | {
|
1471 |
| - fflush(concurrency_file->current_file.get()); |
| 1463 | + std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock); |
| 1464 | + concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval); |
1472 | 1465 | }
|
1473 | 1466 |
|
1474 |
| - concurrency_file->flushed_record_count.store(current_record_count, |
1475 |
| - std::memory_order_release); |
1476 |
| - } |
| 1467 | + { |
| 1468 | + std::size_t current_record_count = |
| 1469 | + concurrency_file->record_count.load(std::memory_order_acquire); |
| 1470 | + std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock}; |
| 1471 | + if (current_record_count != last_record_count) |
| 1472 | + { |
| 1473 | + last_record_count = current_record_count; |
| 1474 | + last_free_job_timepoint = std::chrono::system_clock::now(); |
| 1475 | + } |
1477 | 1476 |
|
1478 |
| - concurrency_file->background_thread_waiter_cv.notify_all(); |
1479 |
| - } |
| 1477 | + if (concurrency_file->current_file) |
| 1478 | + { |
| 1479 | + fflush(concurrency_file->current_file.get()); |
| 1480 | + } |
1480 | 1481 |
|
1481 |
| - // Detach running thread because it will exit soon |
1482 |
| - std::unique_ptr<std::thread> background_flush_thread; |
1483 |
| - { |
1484 |
| - std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock}; |
1485 |
| - background_flush_thread.swap(concurrency_file->background_flush_thread); |
1486 |
| - } |
1487 |
| - if (background_flush_thread && background_flush_thread->joinable()) |
1488 |
| - { |
1489 |
| - background_flush_thread->detach(); |
1490 |
| - } |
1491 |
| - })); |
| 1482 | + concurrency_file->flushed_record_count.store(current_record_count, |
| 1483 | + std::memory_order_release); |
| 1484 | + } |
| 1485 | + |
| 1486 | + concurrency_file->background_thread_waiter_cv.notify_all(); |
| 1487 | + } |
| 1488 | + |
| 1489 | + // Detach running thread because it will exit soon |
| 1490 | + std::unique_ptr<std::thread> background_flush_thread; |
| 1491 | + { |
| 1492 | + std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock}; |
| 1493 | + background_flush_thread.swap(concurrency_file->background_flush_thread); |
| 1494 | + } |
| 1495 | + if (background_flush_thread && background_flush_thread->joinable()) |
| 1496 | + { |
| 1497 | + background_flush_thread->detach(); |
| 1498 | + } |
| 1499 | + })); |
| 1500 | +#if OPENTELEMETRY_HAVE_EXCEPTIONS |
| 1501 | + } |
| 1502 | + catch (std::exception &e) |
| 1503 | + { |
| 1504 | + OTEL_INTERNAL_LOG_WARN("[OTLP FILE Client] Try to spawn background but got a exception: " |
| 1505 | + << e.what() << ".Data writing may experience some delays."); |
| 1506 | + } |
| 1507 | + catch (...) |
| 1508 | + { |
| 1509 | + OTEL_INTERNAL_LOG_WARN( |
| 1510 | + "[OTLP FILE Client] Try to spawn background but got a unknown exception.Data writing may " |
| 1511 | + "experience some delays."); |
| 1512 | + } |
| 1513 | +#endif |
1492 | 1514 | }
|
1493 | 1515 |
|
1494 | 1516 | private:
|
|
0 commit comments