Skip to content

Commit 5ed927b

Browse files
laramielcopybara-github
authored andcommitted
Improvements to interal_os utilities.
Subprocess now supports Pipe redirects. PiperOrigin-RevId: 738088103 Change-Id: Iadaf5a30f7ad16fc2fbe66be0f6c16d618a68c06
1 parent 04ed68e commit 5ed927b

16 files changed

+477
-180
lines changed

tensorstore/internal/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ tensorstore_cc_library(
418418
hdrs = ["flat_cord_builder.h"],
419419
deps = [
420420
"//tensorstore/internal/os:memory_region",
421+
"//tensorstore/util:span",
421422
"@com_google_absl//absl/base:core_headers",
422423
"@com_google_absl//absl/log:absl_check",
423424
"@com_google_absl//absl/strings:cord",

tensorstore/internal/flat_cord_builder.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "absl/log/absl_check.h"
2727
#include "absl/strings/cord.h"
2828
#include "tensorstore/internal/os/memory_region.h"
29+
#include "tensorstore/util/span.h"
2930

3031
namespace tensorstore {
3132
namespace internal {
@@ -75,6 +76,10 @@ class FlatCordBuilder {
7576
inuse_ = size;
7677
}
7778

79+
tensorstore::span<char> available_span() {
80+
return tensorstore::span(region_.data() + inuse_, available());
81+
}
82+
7883
/// Append data to the builder.
7984
void Append(std::string_view sv) {
8085
if (ABSL_PREDICT_FALSE(sv.empty())) return;
@@ -83,7 +88,12 @@ class FlatCordBuilder {
8388
inuse_ += sv.size();
8489
}
8590

86-
absl::Cord Build() && { return std::move(region_).as_cord(); }
91+
absl::Cord Build() && {
92+
if (inuse_ == region_.size()) {
93+
return std::move(region_).as_cord();
94+
}
95+
return std::move(region_).as_cord().Subcord(0, inuse_);
96+
}
8797

8898
private:
8999
internal_os::MemoryRegion region_;

tensorstore/internal/os/BUILD

+7-5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ tensorstore_cc_library(
7777
"//tensorstore/internal/tracing",
7878
"//tensorstore/util:quote_string",
7979
"//tensorstore/util:result",
80+
"//tensorstore/util:span",
8081
"//tensorstore/util:status",
8182
"@com_google_absl//absl/base:config",
8283
"@com_google_absl//absl/base:core_headers",
@@ -128,6 +129,7 @@ tensorstore_cc_test(
128129
deps = [
129130
":file_util",
130131
"//tensorstore/internal/testing:scoped_directory",
132+
"//tensorstore/util:span",
131133
"//tensorstore/util:status_testutil",
132134
"@com_google_absl//absl/status",
133135
"@com_google_absl//absl/strings",
@@ -170,6 +172,7 @@ tensorstore_cc_test(
170172
":file_util",
171173
"//tensorstore/internal/testing:scoped_directory",
172174
"//tensorstore/util:result",
175+
"//tensorstore/util:span",
173176
"//tensorstore/util:status",
174177
"//tensorstore/util:status_testutil",
175178
"@com_google_absl//absl/log:absl_check",
@@ -191,12 +194,13 @@ tensorstore_cc_library(
191194
tags = ["msvc"],
192195
deps = [
193196
":error_code",
197+
":file_util",
194198
":include_windows",
195199
":wstring",
196200
"//tensorstore/util:result",
197201
"//tensorstore/util:status",
198202
"@com_google_absl//absl/container:flat_hash_map",
199-
"@com_google_absl//absl/container:inlined_vector",
203+
"@com_google_absl//absl/container:flat_hash_set",
200204
"@com_google_absl//absl/log:absl_check",
201205
"@com_google_absl//absl/log:absl_log",
202206
"@com_google_absl//absl/status",
@@ -212,21 +216,20 @@ tensorstore_cc_test(
212216
"msvc",
213217
],
214218
deps = [
219+
":file_util",
215220
":subprocess",
216221
"//tensorstore/internal:env",
217222
"//tensorstore/internal:path",
218223
"//tensorstore/internal/testing:scoped_directory",
219224
"//tensorstore/util:result",
220-
"//tensorstore/util:status",
225+
"//tensorstore/util:span",
221226
"//tensorstore/util:status_testutil",
222227
"@com_google_absl//absl/container:flat_hash_map",
223228
"@com_google_absl//absl/log:absl_log",
224229
"@com_google_absl//absl/status",
225230
"@com_google_absl//absl/strings",
226231
"@com_google_absl//absl/time",
227232
"@com_google_googletest//:gtest",
228-
"@com_google_riegeli//riegeli/bytes:fd_reader",
229-
"@com_google_riegeli//riegeli/bytes:read_all",
230233
],
231234
)
232235

@@ -286,7 +289,6 @@ tensorstore_cc_library(
286289
hdrs = ["memory_region.h"],
287290
deps = [
288291
"//tensorstore/util:result",
289-
"@com_google_absl//absl/debugging:leak_check",
290292
"@com_google_absl//absl/log:absl_check",
291293
"@com_google_absl//absl/log:absl_log",
292294
"@com_google_absl//absl/strings:cord",

tensorstore/internal/os/file_lister_test.cc

+6-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "tensorstore/internal/os/file_util.h"
2929
#include "tensorstore/internal/testing/scoped_directory.h"
3030
#include "tensorstore/util/result.h"
31+
#include "tensorstore/util/span.h"
3132
#include "tensorstore/util/status.h"
3233
#include "tensorstore/util/status_testutil.h"
3334

@@ -39,10 +40,9 @@ using ::tensorstore::internal_os::FsyncDirectory;
3940
using ::tensorstore::internal_os::FsyncFile;
4041
using ::tensorstore::internal_os::MakeDirectory;
4142
using ::tensorstore::internal_os::OpenDirectoryDescriptor;
42-
using ::tensorstore::internal_os::OpenExistingFileForReading;
4343
using ::tensorstore::internal_os::OpenFileWrapper;
4444
using ::tensorstore::internal_os::OpenFlags;
45-
using ::tensorstore::internal_os::ReadFromFile;
45+
using ::tensorstore::internal_os::PReadFromFile;
4646
using ::tensorstore::internal_os::RecursiveFileList;
4747
using ::tensorstore::internal_os::WriteToFile;
4848
using ::tensorstore::internal_testing::ScopedTemporaryDirectory;
@@ -191,8 +191,8 @@ TEST(RecursiveFileListEntryTest, DeleteWithOpenFile) {
191191

192192
{
193193
// Open file; it should be deleted.
194-
auto f =
195-
OpenExistingFileForReading(absl::StrCat(tmpdir.path(), "/read.txt"));
194+
auto f = OpenFileWrapper(absl::StrCat(tmpdir.path(), "/read.txt"),
195+
OpenFlags::DefaultRead);
196196
EXPECT_THAT(f, IsOk());
197197

198198
std::vector<std::string> files;
@@ -213,7 +213,8 @@ TEST(RecursiveFileListEntryTest, DeleteWithOpenFile) {
213213
IsOk());
214214

215215
char buf[16];
216-
EXPECT_THAT(ReadFromFile(f->get(), buf, 3, 0), IsOkAndHolds(3));
216+
EXPECT_THAT(PReadFromFile(f->get(), tensorstore::span(buf, 3), 0),
217+
IsOkAndHolds(3));
217218
}
218219

219220
std::vector<std::string> files;

tensorstore/internal/os/file_lock.cc

+16-16
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ absl::Status FileLock::Delete() && {
5353
assert(fd_ != FileDescriptorTraits::Invalid());
5454

5555
auto fd = std::exchange(fd_, FileDescriptorTraits::Invalid());
56-
auto status = internal_os::DeleteOpenFile(fd, lock_path_);
56+
auto status = DeleteOpenFile(fd, lock_path_);
5757
Unlock(fd);
5858
FileDescriptorTraits::Close(fd);
5959
return MaybeAnnotateStatus(std::move(status), "Failed to clean lock file");
@@ -70,13 +70,13 @@ Result<FileLock> AcquireFileLock(std::string lock_path) {
7070
using private_t = FileLock::private_t;
7171
TENSORSTORE_ASSIGN_OR_RETURN(
7272
UniqueFileDescriptor fd,
73-
internal_os::OpenFileWrapper(lock_path, OpenFlags::DefaultWrite));
73+
OpenFileWrapper(lock_path, OpenFlags::DefaultWrite));
7474
FileInfo a, b;
7575
FileInfo* info = &a;
7676

7777
// Is this a network filesystem?
78-
TENSORSTORE_RETURN_IF_ERROR(internal_os::GetFileInfo(fd.get(), info));
79-
if (!internal_os::IsRegularFile(*info)) {
78+
TENSORSTORE_RETURN_IF_ERROR(GetFileInfo(fd.get(), info));
79+
if (!IsRegularFile(*info)) {
8080
return absl::FailedPreconditionError(
8181
absl::StrCat("Not a regular file: ", lock_path));
8282
}
@@ -85,20 +85,19 @@ Result<FileLock> AcquireFileLock(std::string lock_path) {
8585
while (true) {
8686
// Acquire lock.
8787
TENSORSTORE_ASSIGN_OR_RETURN(
88-
auto unlock_fn, internal_os::AcquireFdLock(fd.get()),
88+
auto unlock_fn, AcquireFdLock(fd.get()),
8989
MaybeAnnotateStatus(_, absl::StrCat("Failed to acquire lock on file: ",
9090
QuoteString(lock_path))));
9191

9292
// Reopening the file should give the same value since the lock is held.
9393
TENSORSTORE_ASSIGN_OR_RETURN(
9494
UniqueFileDescriptor other_fd,
95-
internal_os::OpenFileWrapper(lock_path, OpenFlags::DefaultWrite));
95+
OpenFileWrapper(lock_path,
96+
OpenFlags::DefaultWrite | OpenFlags::CloseOnExec));
9697

9798
FileInfo* other_info = info == &a ? &b : &a;
98-
TENSORSTORE_RETURN_IF_ERROR(
99-
internal_os::GetFileInfo(other_fd.get(), other_info));
100-
if (internal_os::GetDeviceId(a) == internal_os::GetDeviceId(b) &&
101-
internal_os::GetFileId(a) == internal_os::GetFileId(b)) {
99+
TENSORSTORE_RETURN_IF_ERROR(GetFileInfo(other_fd.get(), other_info));
100+
if (GetDeviceId(a) == GetDeviceId(b) && GetFileId(a) == GetFileId(b)) {
102101
// Lock was acquired successfully.
103102
return FileLock(private_t(), std::move(lock_path), fd.release(),
104103
std::move(unlock_fn));
@@ -122,10 +121,11 @@ Result<FileLock> AcquireExclusiveFile(std::string lock_path,
122121

123122
// Determine whether the lock file is stale.
124123
auto detect_stale_lock = [&]() mutable {
125-
auto read_fd = OpenFileWrapper(lock_path, OpenFlags::OpenReadOnly);
124+
auto read_fd = OpenFileWrapper(
125+
lock_path, OpenFlags::OpenReadOnly | OpenFlags::CloseOnExec);
126126
if (read_fd.ok()) {
127127
TENSORSTORE_RETURN_IF_ERROR(GetFileInfo(read_fd->get(), &info));
128-
if (!internal_os::IsRegularFile(info)) {
128+
if (!IsRegularFile(info)) {
129129
// A lock file must be a regular file, not a symlink or directory.
130130
return absl::FailedPreconditionError(
131131
absl::StrCat("Not a regular file: ", lock_path));
@@ -148,11 +148,11 @@ Result<FileLock> AcquireExclusiveFile(std::string lock_path,
148148
n++;
149149
if (m > 1000) m = 1000;
150150

151-
auto fd = internal_os::OpenFileWrapper(
152-
lock_path,
153-
OpenFlags::Create | OpenFlags::Exclusive | OpenFlags::OpenReadWrite);
151+
auto fd = OpenFileWrapper(
152+
lock_path, OpenFlags::Create | OpenFlags::Exclusive |
153+
OpenFlags::OpenReadWrite | OpenFlags::CloseOnExec);
154154
if (fd.ok()) {
155-
TENSORSTORE_RETURN_IF_ERROR(internal_os::GetFileInfo(fd->get(), &info));
155+
TENSORSTORE_RETURN_IF_ERROR(GetFileInfo(fd->get(), &info));
156156
return FileLock(private_t{}, std::move(lock_path), fd->release(),
157157
std::nullopt);
158158
}

tensorstore/internal/os/file_util.cc

+13-8
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@
2121
#include <string>
2222

2323
#include "tensorstore/util/result.h"
24+
#include "tensorstore/util/span.h"
2425
#include "tensorstore/util/status.h"
2526

2627
namespace tensorstore {
2728
namespace internal_os {
2829

2930
Result<std::string> ReadAllToString(const std::string& path) {
30-
TENSORSTORE_ASSIGN_OR_RETURN(
31-
auto fd, OpenFileWrapper(path, internal_os::OpenFlags::OpenReadOnly));
31+
TENSORSTORE_ASSIGN_OR_RETURN(auto fd,
32+
OpenFileWrapper(path, OpenFlags::OpenReadOnly));
3233

33-
internal_os::FileInfo info;
34-
TENSORSTORE_RETURN_IF_ERROR(internal_os::GetFileInfo(fd.get(), &info));
34+
FileInfo info;
35+
TENSORSTORE_RETURN_IF_ERROR(GetFileInfo(fd.get(), &info));
3536

3637
// Handle the case where the file is empty.
3738
std::string result(internal_os::GetSize(info), 0);
@@ -42,7 +43,8 @@ Result<std::string> ReadAllToString(const std::string& path) {
4243
size_t offset = 0;
4344
TENSORSTORE_ASSIGN_OR_RETURN(
4445
auto read,
45-
internal_os::ReadFromFile(fd.get(), &result[0], result.size(), 0));
46+
PReadFromFile(fd.get(), tensorstore::span(&result[0], result.size()),
47+
offset));
4648
offset += read;
4749

4850
while (true) {
@@ -53,7 +55,8 @@ Result<std::string> ReadAllToString(const std::string& path) {
5355
char buffer[4096];
5456
TENSORSTORE_ASSIGN_OR_RETURN(
5557
read,
56-
internal_os::ReadFromFile(fd.get(), buffer, sizeof(buffer), offset));
58+
PReadFromFile(fd.get(), tensorstore::span(buffer, sizeof(buffer)),
59+
offset));
5760
if (read > 0) {
5861
// Amortized resize; double the size of the buffer.
5962
if (read > result.size()) {
@@ -63,8 +66,10 @@ Result<std::string> ReadAllToString(const std::string& path) {
6366
}
6467
} else {
6568
TENSORSTORE_ASSIGN_OR_RETURN(
66-
read, internal_os::ReadFromFile(fd.get(), &result[offset],
67-
result.size() - offset, offset));
69+
read, PReadFromFile(
70+
fd.get(),
71+
tensorstore::span(&result[offset], result.size() - offset),
72+
offset));
6873
}
6974
if (read == 0) {
7075
result.resize(offset);

tensorstore/internal/os/file_util.h

+14-10
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424

2525
#include <string>
2626
#include <string_view>
27-
#include <utility>
2827

2928
#include "absl/status/status.h"
3029
#include "absl/strings/cord.h"
3130
#include "absl/time/time.h"
3231
#include "tensorstore/internal/os/memory_region.h"
3332
#include "tensorstore/internal/os/unique_handle.h"
3433
#include "tensorstore/util/result.h"
34+
#include "tensorstore/util/span.h"
3535

3636
// Include system headers last to reduce impact of macros.
3737
#ifndef _WIN32
@@ -119,8 +119,10 @@ enum class OpenFlags : int {
119119
Create = O_CREAT,
120120
Append = O_APPEND,
121121
Exclusive = O_EXCL,
122+
CloseOnExec = O_CLOEXEC,
122123

123-
DefaultWrite = O_CREAT | O_WRONLY,
124+
DefaultRead = O_RDONLY | O_CLOEXEC,
125+
DefaultWrite = O_CREAT | O_WRONLY | O_CLOEXEC,
124126
};
125127

126128
inline OpenFlags operator|(OpenFlags a, OpenFlags b) {
@@ -136,20 +138,22 @@ inline OpenFlags operator&(OpenFlags a, OpenFlags b) {
136138
Result<UniqueFileDescriptor> OpenFileWrapper(const std::string& path,
137139
OpenFlags flags);
138140

139-
inline Result<UniqueFileDescriptor> OpenExistingFileForReading(
140-
const std::string& path) {
141-
return OpenFileWrapper(path, OpenFlags::OpenReadOnly);
142-
}
141+
/// Reads from an open file.
142+
///
143+
/// \param fd Open file descriptor.
144+
/// \param buffer[out] Pointer to memory where data will be stored.
145+
/// \returns Number of bytes read or a failure absl::Status code.
146+
Result<ptrdiff_t> ReadFromFile(FileDescriptor fd,
147+
tensorstore::span<char> buffer);
143148

144149
/// Reads from an open file.
145150
///
146151
/// \param fd Open file descriptor.
147-
/// \param buf[out] Pointer to memory where data will be stored.
148-
/// \param count Maximum number of bytes to read.
152+
/// \param buffer[out] Pointer to memory where data will be stored.
149153
/// \param offset Byte offset within file at which to start reading.
150154
/// \returns Number of bytes read or a failure absl::Status code.
151-
Result<ptrdiff_t> ReadFromFile(FileDescriptor fd, void* buf, size_t count,
152-
int64_t offset);
155+
Result<ptrdiff_t> PReadFromFile(FileDescriptor fd,
156+
tensorstore::span<char> buffer, int64_t offset);
153157

154158
/// Reads the entire file into a string.
155159
///

tensorstore/internal/os/file_util_posix.cc

+27-5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include "tensorstore/internal/tracing/logged_trace_span.h"
5858
#include "tensorstore/util/quote_string.h"
5959
#include "tensorstore/util/result.h"
60+
#include "tensorstore/util/span.h"
6061
#include "tensorstore/util/status.h"
6162

6263
#if ABSL_HAVE_MMAP
@@ -257,15 +258,36 @@ Result<UniqueFileDescriptor> OpenFileWrapper(const std::string& path,
257258
}
258259
}
259260

260-
Result<ptrdiff_t> ReadFromFile(FileDescriptor fd, void* buf, size_t count,
261-
int64_t offset) {
261+
Result<ptrdiff_t> ReadFromFile(FileDescriptor fd,
262+
tensorstore::span<char> buffer) {
262263
LoggedTraceSpan tspan(__func__, detail_logging.Level(1),
263-
{{"fd", fd}, {"count", count}, {"offset", offset}});
264+
{{"fd", fd}, {"count", buffer.size()}});
264265
ssize_t n;
265266
do {
266267
PotentiallyBlockingRegion region;
267-
n = ::pread(fd, buf, count, static_cast<off_t>(offset));
268-
} while ((n < 0) && (errno == EINTR || errno == EAGAIN));
268+
n = ::read(fd, buffer.data(), buffer.size());
269+
} while ((n < 0) && (errno == EINTR));
270+
if (n >= 0) {
271+
return n;
272+
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
273+
return 0;
274+
}
275+
auto status = StatusFromOsError(errno, "Failed to read from file");
276+
return std::move(tspan).EndWithStatus(std::move(status));
277+
}
278+
279+
Result<ptrdiff_t> PReadFromFile(FileDescriptor fd,
280+
tensorstore::span<char> buffer,
281+
int64_t offset) {
282+
LoggedTraceSpan tspan(
283+
__func__, detail_logging.Level(1),
284+
{{"fd", fd}, {"count", buffer.size()}, {"offset", offset}});
285+
ssize_t n;
286+
do {
287+
PotentiallyBlockingRegion region;
288+
n = ::pread(fd, buffer.data(), buffer.size(), static_cast<off_t>(offset));
289+
} while ((n < 0) &&
290+
(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
269291
if (n >= 0) {
270292
return n;
271293
}

0 commit comments

Comments
 (0)