diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc index a9d200f461d..1762ce036be 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.cc @@ -290,11 +290,9 @@ Status PollFileSystemForConfig( // Determines if, for any servables in 'config', the file system doesn't // currently contain at least one version under its base path. -Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config) { - std::map>> - versions_by_servable_name; - TF_RETURN_IF_ERROR( - PollFileSystemForConfig(config, &versions_by_servable_name)); +Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config, + std::map>>& + versions_by_servable_name) { for (const auto& entry : versions_by_servable_name) { const string& servable = entry.first; const std::vector>& versions = entry.second; @@ -330,14 +328,33 @@ Status FileSystemStoragePathSource::UpdateConfig( const FileSystemStoragePathSourceConfig normalized_config = NormalizeConfig(config); - if (normalized_config.fail_if_zero_versions_at_startup() || // NOLINT - normalized_config.servable_versions_always_present()) { - TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config)); - } + bool requireVersionPresent = + normalized_config.fail_if_zero_versions_at_startup() || // NOLINT + normalized_config.servable_versions_always_present(); - if (aspired_versions_callback_) { - TF_RETURN_IF_ERROR( - UnaspireServables(GetDeletedServables(config_, normalized_config))); + // Only poll filesystem here if necessary + if (requireVersionPresent || aspired_versions_callback_) { + std::map>> + versions_by_servable_name; + TF_RETURN_IF_ERROR(PollFileSystemForConfig(normalized_config, + &versions_by_servable_name)); + + if (requireVersionPresent) { + TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config, + versions_by_servable_name)); + } + + if (aspired_versions_callback_) { + TF_RETURN_IF_ERROR( + UnaspireServables(GetDeletedServables(config_, normalized_config))); + // Always invoke callback after updating config - an RPC thread might be + // waiting for the corresponding events. This is especially important + // if config.file_system_poll_wait_seconds() == 0. + for (const auto& entry : versions_by_servable_name) { + LogVersions(entry.second); + CallAspiredVersionsCallback(entry.first, entry.second); + } + } } config_ = normalized_config; @@ -398,19 +415,24 @@ Status FileSystemStoragePathSource::PollFileSystemAndInvokeCallback() { << servable; continue; } - for (const ServableData& version : versions) { - if (version.status().ok()) { - VLOG(1) << "File-system polling update: Servable:" << version.id() - << "; Servable path: " << version.DataOrDie() - << "; Polling frequency: " - << config_.file_system_poll_wait_seconds(); - } - } + LogVersions(versions); CallAspiredVersionsCallback(servable, versions); } return Status::OK(); } +void FileSystemStoragePathSource::LogVersions( + const std::vector>& versions) { + for (const ServableData &version : versions) { + if (version.status().ok()) { + VLOG(1) << "File-system polling update: Servable:" << version.id() + << "; Servable path: " << version.DataOrDie() + << "; Polling frequency: " + << config_.file_system_poll_wait_seconds(); + } + } +} + Status FileSystemStoragePathSource::UnaspireServables( const std::set& servable_names) { for (const string& servable_name : servable_names) { diff --git a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h index 15ba88c8972..79226bdf17f 100644 --- a/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h +++ b/tensorflow_serving/sources/storage_path/file_system_storage_path_source.h @@ -90,6 +90,9 @@ class FileSystemStoragePathSource : public Source { // such child. Status PollFileSystemAndInvokeCallback(); + // Logs servable version information + void LogVersions(const std::vector>& versions); + // Sends empty aspired-versions lists for each servable in 'servable_names'. Status UnaspireServables(const std::set& servable_names) EXCLUSIVE_LOCKS_REQUIRED(mu_);