-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add reconfig poll patch #1801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add reconfig poll patch #1801
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -293,9 +293,9 @@ Status PollFileSystemForConfig( | |
|
||
// Determines if, for any servables in 'config', the file system doesn't | ||
// currently contain at least one version under its base path. | ||
Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config) { | ||
std::map<string, std::vector<ServableData<StoragePath>>> | ||
versions_by_servable_name; | ||
Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config, | ||
std::map<string, std::vector<ServableData<StoragePath>>>& | ||
versions_by_servable_name) { | ||
TF_RETURN_IF_ERROR( | ||
PollFileSystemForConfig(config, &versions_by_servable_name)); | ||
for (const auto& entry : versions_by_servable_name) { | ||
|
@@ -333,15 +333,35 @@ Status FileSystemStoragePathSource::UpdateConfig( | |
const FileSystemStoragePathSourceConfig normalized_config = | ||
NormalizeConfig(config); | ||
|
||
if (normalized_config.fail_if_zero_versions_at_startup() || // NOLINT | ||
normalized_config.servable_versions_always_present()) { | ||
TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config)); | ||
} | ||
bool requireVersionPresent = | ||
normalized_config.fail_if_zero_versions_at_startup() || // NOLINT | ||
normalized_config.servable_versions_always_present(); | ||
|
||
// Only poll filesystem here if necessary | ||
if (requireVersionPresent || aspired_versions_callback_) { | ||
std::map<string, std::vector<ServableData<StoragePath>>> | ||
versions_by_servable_name; | ||
TF_RETURN_IF_ERROR(PollFileSystemForConfig(normalized_config, | ||
&versions_by_servable_name)); | ||
|
||
if (aspired_versions_callback_) { | ||
TF_RETURN_IF_ERROR( | ||
UnaspireServables(GetDeletedServables(config_, normalized_config))); | ||
if (requireVersionPresent) { | ||
TF_RETURN_IF_ERROR(FailIfZeroVersions(normalized_config, | ||
versions_by_servable_name)); | ||
} | ||
|
||
if (aspired_versions_callback_) { | ||
TF_RETURN_IF_ERROR( | ||
UnaspireServables(GetDeletedServables(config_, normalized_config))); | ||
// Always invoke callback after updating config - an RPC thread might be | ||
// waiting for the corresponding events. This is especially important | ||
// if config.file_system_poll_wait_seconds() == 0. | ||
for (const auto& entry : versions_by_servable_name) { | ||
LogVersions(entry.second); | ||
CallAspiredVersionsCallback(entry.first, entry.second); | ||
} | ||
} | ||
} | ||
|
||
config_ = normalized_config; | ||
|
||
return Status::OK(); | ||
|
@@ -401,19 +421,24 @@ Status FileSystemStoragePathSource::PollFileSystemAndInvokeCallback() { | |
<< servable; | ||
continue; | ||
} | ||
for (const ServableData<StoragePath>& version : versions) { | ||
if (version.status().ok()) { | ||
VLOG(1) << "File-system polling update: Servable:" << version.id() | ||
<< "; Servable path: " << version.DataOrDie() | ||
<< "; Polling frequency: " | ||
<< config_.file_system_poll_wait_seconds(); | ||
} | ||
} | ||
LogVersions(versions); | ||
CallAspiredVersionsCallback(servable, versions); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
void FileSystemStoragePathSource::LogVersions( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the other comment, let's put this implementation outside of the class definition, under anonymous namespace. |
||
const std::vector<ServableData<StoragePath>>& versions) { | ||
for (const ServableData<StoragePath> &version : versions) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use auto& |
||
if (version.status().ok()) { | ||
VLOG(1) << "File-system polling update: Servable:" << version.id() | ||
<< "; Servable path: " << version.DataOrDie() | ||
<< "; Polling frequency: " | ||
<< config_.file_system_poll_wait_seconds(); | ||
} | ||
} | ||
} | ||
|
||
Status FileSystemStoragePathSource::UnaspireServables( | ||
const std::set<string>& servable_names) { | ||
for (const string& servable_name : servable_names) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,9 @@ class FileSystemStoragePathSource : public Source<StoragePath> { | |
// such child. | ||
Status PollFileSystemAndInvokeCallback(); | ||
|
||
// Logs servable version information | ||
void LogVersions(const std::vector<ServableData<StoragePath>>& versions); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see any reasons to make it a class method as its logic doesn't involve reading/manipulating class state. Can we remove this declaration from the .h file and add it as a function wrapped with anonymous namespace in the .cc file (the only place where it's used)? |
||
|
||
// Sends empty aspired-versions lists for each servable in 'servable_names'. | ||
Status UnaspireServables(const std::set<string>& servable_names) | ||
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like PollFileSystemForConfig is already called in line 344. So it should be fine to remove it from here and pass a read-only arg versions_by_servable_name to FailIfZeroVersions.