From 5ce5bf2b5ff31c823a83ff7bc9d21cad37a02ab9 Mon Sep 17 00:00:00 2001 From: nickhill Date: Tue, 17 Dec 2019 11:16:44 -0800 Subject: [PATCH 1/3] Refresh aspired servables/versions following config update Currently when the configured model list is updated via a call to handleReloadConfigRequest, the request thread blocks until any newly added models become available. Their availability however depends on the filesystem polling thread rescanning the filesystem at some periodic interval, meaning that there's an arbitrary delay before the requested changes actually take effect and the RPC returns. This problem may not be very noticeable with the default polling interval of 1 second, but seems undesirable for longer intervals and in particular makes API-based dynamic reconfiguration incompatible with the --file_system_poll_wait_seconds=0 setting (in this case all handleReloadConfigRequest calls time-out and do not take effect). --- .../file_system_storage_path_source.cc | 57 +++++++++++++------ .../file_system_storage_path_source.h | 4 ++ 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc index a9d200f461d..949a00fb1b2 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc @@ -290,11 +290,9 @@ Status PollFileSystemForConfig( // Determines if, for any servables in 'config', the file system doesn't // currently contain at least one version under its base path. -Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config) { - std::map>> - versions_by_servable_name; - TF_RETURN_IF_ERROR( - PollFileSystemForConfig(config, &versions_by_servable_name)); +Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config, + std::map>>& + versions_by_servable_name) { for (const auto& entry : versions_by_servable_name) { const string& servable = entry.first; const std::vector>& versions = entry.second; @@ -330,14 +328,33 @@ Status FileSystemStoragePathSource::UpdateConfig( const FileSystemStoragePathSourceConfig normalized_config = NormalizeConfig(config); - if (normalized_config.fail_if_zero_versions_at_startup() || // NOLINT - normalized_config.servable_versions_always_present()) { - TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config)); + std::map>> + versions_by_servable_name; + + bool requireVersion = + normalized_config.fail_if_zero_versions_at_startup() || // NOLINT + normalized_config.servable_versions_always_present(); + + // Only poll filesystem here if necessary + if (requireVersion || aspired_versions_callback_) { + TF_RETURN_IF_ERROR(PollFileSystemForConfig(normalized_config, + &versions_by_servable_name)); + } + + if (requireVersion) { + TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config, + versions_by_servable_name)); } if (aspired_versions_callback_) { TF_RETURN_IF_ERROR( UnaspireServables(GetDeletedServables(config_, normalized_config))); + // Always invoke callback after updating config - an RPC thread might be + // waiting for the corresponding events. This is especially important + // if config.file_system_poll_wait_seconds() == 0. + for (const auto& entry : versions_by_servable_name) { + LogVersionsAndInvokeCallback(entry.first, entry.second); + } } config_ = normalized_config; @@ -398,19 +415,25 @@ Status FileSystemStoragePathSource::PollFileSystemAndInvokeCallback() { << servable; continue; } - for (const ServableData& version : versions) { - if (version.status().ok()) { - VLOG(1) << "File-system polling update: Servable:" << version.id() - << "; Servable path: " << version.DataOrDie() - << "; Polling frequency: " - << config_.file_system_poll_wait_seconds(); - } - } - CallAspiredVersionsCallback(servable, versions); + LogVersionsAndInvokeCallback(servable, versions); } return Status::OK(); } +void FileSystemStoragePathSource::LogVersionsAndInvokeCallback( + const string& servable, + const std::vector>& versions) { + for (const ServableData &version : versions) { + if (version.status().ok()) { + VLOG(1) << "File-system polling update: Servable:" << version.id() + << "; Servable path: " << version.DataOrDie() + << "; Polling frequency: " + << config_.file_system_poll_wait_seconds(); + } + } + CallAspiredVersionsCallback(servable, versions); +} + Status FileSystemStoragePathSource::UnaspireServables( const std::set& servable_names) { for (const string& servable_name : servable_names) { diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h index 15ba88c8972..87f82a2e9cf 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h @@ -90,6 +90,10 @@ class FileSystemStoragePathSource : public Source { // such child. Status PollFileSystemAndInvokeCallback(); + // Logs servable version information before invoking 'aspired_versions_callback_' + void LogVersionsAndInvokeCallback(const string& servable, + const std::vector>& versions); + // Sends empty aspired-versions lists for each servable in 'servable_names'. Status UnaspireServables(const std::set& servable_names) EXCLUSIVE_LOCKS_REQUIRED(mu_); From 0ef7c5a62d9f9cdbdeb5440aeba5092f4d0c9200 Mon Sep 17 00:00:00 2001 From: nickhill Date: Wed, 18 Dec 2019 14:03:59 -0800 Subject: [PATCH 2/3] Minor rearrangement of logic and rename var for readability --- .../file_system_storage_path_source.cc | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc index 949a00fb1b2..e320534fb27 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc @@ -328,32 +328,31 @@ Status FileSystemStoragePathSource::UpdateConfig( const FileSystemStoragePathSourceConfig normalized_config = NormalizeConfig(config); - std::map>> - versions_by_servable_name; - - bool requireVersion = + bool requireVersionPresent = normalized_config.fail_if_zero_versions_at_startup() || // NOLINT normalized_config.servable_versions_always_present(); // Only poll filesystem here if necessary - if (requireVersion || aspired_versions_callback_) { + if (requireVersionPresent || aspired_versions_callback_) { + std::map>> + versions_by_servable_name; TF_RETURN_IF_ERROR(PollFileSystemForConfig(normalized_config, &versions_by_servable_name)); - } - if (requireVersion) { - TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config, - versions_by_servable_name)); - } + if (requireVersionPresent) { + TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config, + versions_by_servable_name)); + } - if (aspired_versions_callback_) { - TF_RETURN_IF_ERROR( - UnaspireServables(GetDeletedServables(config_, normalized_config))); - // Always invoke callback after updating config - an RPC thread might be - // waiting for the corresponding events. This is especially important - // if config.file_system_poll_wait_seconds() == 0. - for (const auto& entry : versions_by_servable_name) { - LogVersionsAndInvokeCallback(entry.first, entry.second); + if (aspired_versions_callback_) { + TF_RETURN_IF_ERROR( + UnaspireServables(GetDeletedServables(config_, normalized_config))); + // Always invoke callback after updating config - an RPC thread might be + // waiting for the corresponding events. This is especially important + // if config.file_system_poll_wait_seconds() == 0. + for (const auto& entry : versions_by_servable_name) { + LogVersionsAndInvokeCallback(entry.first, entry.second); + } } } config_ = normalized_config; From eaba886f8bf7e9525ab6ce5bcf1c7b2057b8e9f8 Mon Sep 17 00:00:00 2001 From: nickhill Date: Fri, 7 Feb 2020 17:05:57 -0800 Subject: [PATCH 3/3] Split logging logic into separate method per @christisg's suggestion --- .../storage_path/file_system_storage_path_source.cc | 10 +++++----- .../storage_path/file_system_storage_path_source.h | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc index e320534fb27..1762ce036be 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc @@ -351,7 +351,8 @@ Status FileSystemStoragePathSource::UpdateConfig( // waiting for the corresponding events. This is especially important // if config.file_system_poll_wait_seconds() == 0. for (const auto& entry : versions_by_servable_name) { - LogVersionsAndInvokeCallback(entry.first, entry.second); + LogVersions(entry.second); + CallAspiredVersionsCallback(entry.first, entry.second); } } } @@ -414,13 +415,13 @@ Status FileSystemStoragePathSource::PollFileSystemAndInvokeCallback() { << servable; continue; } - LogVersionsAndInvokeCallback(servable, versions); + LogVersions(versions); + CallAspiredVersionsCallback(servable, versions); } return Status::OK(); } -void FileSystemStoragePathSource::LogVersionsAndInvokeCallback( - const string& servable, +void FileSystemStoragePathSource::LogVersions( const std::vector>& versions) { for (const ServableData &version : versions) { if (version.status().ok()) { @@ -430,7 +431,6 @@ void FileSystemStoragePathSource::LogVersionsAndInvokeCallback( << config_.file_system_poll_wait_seconds(); } } - CallAspiredVersionsCallback(servable, versions); } Status FileSystemStoragePathSource::UnaspireServables( diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h index 87f82a2e9cf..79226bdf17f 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h @@ -90,9 +90,8 @@ class FileSystemStoragePathSource : public Source { // such child. Status PollFileSystemAndInvokeCallback(); - // Logs servable version information before invoking 'aspired_versions_callback_' - void LogVersionsAndInvokeCallback(const string& servable, - const std::vector>& versions); + // Logs servable version information + void LogVersions(const std::vector>& versions); // Sends empty aspired-versions lists for each servable in 'servable_names'. Status UnaspireServables(const std::set& servable_names)