Skip to content

Commit 0f1a6ae

Browse files
committed
fetch: update from std.Thread.Pool to std.Io
1 parent d0ba664 commit 0f1a6ae

File tree

2 files changed

+33
-41
lines changed

2 files changed

+33
-41
lines changed

src/Package/Fetch.zig

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,12 @@ const assert = std.debug.assert;
3838
const ascii = std.ascii;
3939
const Allocator = std.mem.Allocator;
4040
const Cache = std.Build.Cache;
41-
const ThreadPool = std.Thread.Pool;
42-
const WaitGroup = std.Thread.WaitGroup;
4341
const git = @import("Fetch/git.zig");
4442
const Package = @import("../Package.zig");
4543
const Manifest = Package.Manifest;
4644
const ErrorBundle = std.zig.ErrorBundle;
4745

4846
arena: std.heap.ArenaAllocator,
49-
io: Io,
5047
location: Location,
5148
location_tok: std.zig.Ast.TokenIndex,
5249
hash_tok: std.zig.Ast.OptionalTokenIndex,
@@ -104,7 +101,8 @@ pub const LazyStatus = enum {
104101

105102
/// Contains shared state among all `Fetch` tasks.
106103
pub const JobQueue = struct {
107-
mutex: std.Thread.Mutex = .{},
104+
io: Io,
105+
mutex: Io.Mutex = .init,
108106
/// It's an array hash map so that it can be sorted before rendering the
109107
/// dependencies.zig source file.
110108
/// Protected by `mutex`.
@@ -115,8 +113,7 @@ pub const JobQueue = struct {
115113
all_fetches: std.ArrayList(*Fetch) = .empty,
116114

117115
http_client: *std.http.Client,
118-
thread_pool: *ThreadPool,
119-
wait_group: WaitGroup = .{},
116+
group: Io.Group = .init,
120117
global_cache: Cache.Directory,
121118
/// If true then, no fetching occurs, and:
122119
/// * The `global_cache` directory is assumed to be the direct parent
@@ -320,13 +317,14 @@ pub const Location = union(enum) {
320317

321318
pub const RunError = error{
322319
OutOfMemory,
320+
Canceled,
323321
/// This error code is intended to be handled by inspecting the
324322
/// `error_bundle` field.
325323
FetchFailed,
326324
};
327325

328326
pub fn run(f: *Fetch) RunError!void {
329-
const io = f.io;
327+
const io = f.job_queue.io;
330328
const eb = &f.error_bundle;
331329
const arena = f.arena.allocator();
332330
const gpa = f.arena.child_allocator;
@@ -488,7 +486,7 @@ fn runResource(
488486
resource: *Resource,
489487
remote_hash: ?Package.Hash,
490488
) RunError!void {
491-
const io = f.io;
489+
const io = f.job_queue.io;
492490
defer resource.deinit(io);
493491
const arena = f.arena.allocator();
494492
const eb = &f.error_bundle;
@@ -702,7 +700,8 @@ fn loadManifest(f: *Fetch, pkg_root: Cache.Path) RunError!void {
702700
}
703701

704702
fn queueJobsForDeps(f: *Fetch) RunError!void {
705-
const io = f.io;
703+
const io = f.job_queue.io;
704+
706705
assert(f.job_queue.recursive);
707706

708707
// If the package does not have a build.zig.zon file then there are no dependencies.
@@ -722,8 +721,8 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
722721
const prog_names = try parent_arena.alloc([]const u8, deps.len);
723722
var new_fetch_index: usize = 0;
724723

725-
f.job_queue.mutex.lock();
726-
defer f.job_queue.mutex.unlock();
724+
try f.job_queue.mutex.lock(io);
725+
defer f.job_queue.mutex.unlock(io);
727726

728727
try f.job_queue.all_fetches.ensureUnusedCapacity(gpa, new_fetches.len);
729728
try f.job_queue.table.ensureUnusedCapacity(gpa, @intCast(new_fetches.len));
@@ -792,7 +791,6 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
792791
f.job_queue.all_fetches.appendAssumeCapacity(new_fetch);
793792
}
794793
new_fetch.* = .{
795-
.io = io,
796794
.arena = std.heap.ArenaAllocator.init(gpa),
797795
.location = location,
798796
.location_tok = dep.location_tok,
@@ -830,11 +828,9 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
830828
break :nf .{ new_fetches[0..new_fetch_index], prog_names[0..new_fetch_index] };
831829
};
832830

833-
// Now it's time to give tasks to the thread pool.
834-
const thread_pool = f.job_queue.thread_pool;
835-
831+
// Now it's time to dispatch tasks.
836832
for (new_fetches, prog_names) |*new_fetch, prog_name| {
837-
thread_pool.spawnWg(&f.job_queue.wait_group, workerRun, .{ new_fetch, prog_name });
833+
f.job_queue.group.async(io, workerRun, .{ new_fetch, prog_name });
838834
}
839835
}
840836

@@ -848,6 +844,7 @@ pub fn workerRun(f: *Fetch, prog_name: []const u8) void {
848844

849845
run(f) catch |err| switch (err) {
850846
error.OutOfMemory => f.oom_flag = true,
847+
error.Canceled => {},
851848
error.FetchFailed => {
852849
// Nothing to do because the errors are already reported in `error_bundle`,
853850
// and a reference is kept to the `Fetch` task inside `all_fetches`.
@@ -992,7 +989,7 @@ const FileType = enum {
992989
const init_resource_buffer_size = git.Packet.max_data_length;
993990

994991
fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void {
995-
const io = f.io;
992+
const io = f.job_queue.io;
996993
const arena = f.arena.allocator();
997994
const eb = &f.error_bundle;
998995

@@ -1281,12 +1278,16 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) RunError!Unpack
12811278
return res;
12821279
}
12831280

1284-
fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutOfMemory, FetchFailed }!UnpackResult {
1281+
fn unzip(
1282+
f: *Fetch,
1283+
out_dir: fs.Dir,
1284+
reader: *Io.Reader,
1285+
) error{ ReadFailed, OutOfMemory, Canceled, FetchFailed }!UnpackResult {
12851286
// We write the entire contents to a file first because zip files
12861287
// must be processed back to front and they could be too large to
12871288
// load into memory.
12881289

1289-
const io = f.io;
1290+
const io = f.job_queue.io;
12901291
const cache_root = f.job_queue.global_cache;
12911292
const prefix = "tmp/";
12921293
const suffix = ".zip";
@@ -1306,6 +1307,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
13061307
.read = true,
13071308
}) catch |err| switch (err) {
13081309
error.PathAlreadyExists => continue,
1310+
error.Canceled => return error.Canceled,
13091311
else => |e| return f.fail(
13101312
f.location_tok,
13111313
try eb.printString("failed to create temporary zip file: {t}", .{e}),
@@ -1348,7 +1350,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
13481350
}
13491351

13501352
fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult {
1351-
const io = f.io;
1353+
const io = f.job_queue.io;
13521354
const arena = f.arena.allocator();
13531355
// TODO don't try to get a gpa from an arena. expose this dependency higher up
13541356
// because the backing of arena could be page allocator
@@ -1486,11 +1488,11 @@ const ComputedHash = struct {
14861488
/// hashed* and must not be present on the file system when calling this
14871489
/// function.
14881490
fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash {
1491+
const io = f.job_queue.io;
14891492
// All the path name strings need to be in memory for sorting.
14901493
const arena = f.arena.allocator();
14911494
const gpa = f.arena.child_allocator;
14921495
const eb = &f.error_bundle;
1493-
const thread_pool = f.job_queue.thread_pool;
14941496
const root_dir = pkg_path.root_dir.handle;
14951497

14961498
// Collect all files, recursively, then sort.
@@ -1514,10 +1516,8 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
15141516
{
15151517
// The final hash will be a hash of each file hashed independently. This
15161518
// allows hashing in parallel.
1517-
var wait_group: WaitGroup = .{};
1518-
// `computeHash` is called from a worker thread so there must not be
1519-
// any waiting without working or a deadlock could occur.
1520-
defer thread_pool.waitAndWork(&wait_group);
1519+
var group: Io.Group = .init;
1520+
defer group.wait(io);
15211521

15221522
while (walker.next() catch |err| {
15231523
try eb.addRootErrorMessage(.{ .msg = try eb.printString(
@@ -1542,7 +1542,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
15421542
.fs_path = fs_path,
15431543
.failure = undefined, // to be populated by the worker
15441544
};
1545-
thread_pool.spawnWg(&wait_group, workerDeleteFile, .{ root_dir, deleted_file });
1545+
group.async(io, workerDeleteFile, .{ root_dir, deleted_file });
15461546
try deleted_files.append(deleted_file);
15471547
continue;
15481548
}
@@ -1570,7 +1570,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
15701570
.failure = undefined, // to be populated by the worker
15711571
.size = undefined, // to be populated by the worker
15721572
};
1573-
thread_pool.spawnWg(&wait_group, workerHashFile, .{ root_dir, hashed_file });
1573+
group.async(io, workerHashFile, .{ root_dir, hashed_file });
15741574
try all_files.append(hashed_file);
15751575
}
15761576
}
@@ -2241,7 +2241,6 @@ fn saveEmbedFile(comptime tarball_name: []const u8, dir: fs.Dir) !void {
22412241

22422242
// Builds Fetch with required dependencies, clears dependencies on deinit().
22432243
const TestFetchBuilder = struct {
2244-
thread_pool: ThreadPool,
22452244
http_client: std.http.Client,
22462245
global_cache_directory: Cache.Directory,
22472246
job_queue: Fetch.JobQueue,
@@ -2256,13 +2255,12 @@ const TestFetchBuilder = struct {
22562255
) !*Fetch {
22572256
const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{});
22582257

2259-
try self.thread_pool.init(.{ .allocator = allocator });
22602258
self.http_client = .{ .allocator = allocator, .io = io };
22612259
self.global_cache_directory = .{ .handle = cache_dir, .path = null };
22622260

22632261
self.job_queue = .{
2262+
.io = io,
22642263
.http_client = &self.http_client,
2265-
.thread_pool = &self.thread_pool,
22662264
.global_cache = self.global_cache_directory,
22672265
.recursive = false,
22682266
.read_only = false,
@@ -2273,7 +2271,6 @@ const TestFetchBuilder = struct {
22732271

22742272
self.fetch = .{
22752273
.arena = std.heap.ArenaAllocator.init(allocator),
2276-
.io = io,
22772274
.location = .{ .path_or_url = path_or_url },
22782275
.location_tok = 0,
22792276
.hash_tok = .none,
@@ -2309,7 +2306,6 @@ const TestFetchBuilder = struct {
23092306
self.fetch.prog_node.end();
23102307
self.global_cache_directory.handle.close();
23112308
self.http_client.deinit();
2312-
self.thread_pool.deinit();
23132309
}
23142310

23152311
fn packageDir(self: *TestFetchBuilder) !fs.Dir {

src/main.zig

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5139,8 +5139,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
51395139
defer fetch_prog_node.end();
51405140

51415141
var job_queue: Package.Fetch.JobQueue = .{
5142+
.io = io,
51425143
.http_client = &http_client,
5143-
.thread_pool = &thread_pool,
51445144
.global_cache = dirs.global_cache,
51455145
.read_only = false,
51465146
.recursive = true,
@@ -5173,7 +5173,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
51735173

51745174
var fetch: Package.Fetch = .{
51755175
.arena = std.heap.ArenaAllocator.init(gpa),
5176-
.io = io,
51775176
.location = .{ .relative_path = phantom_package_root },
51785177
.location_tok = 0,
51795178
.hash_tok = .none,
@@ -5207,10 +5206,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
52075206
&fetch,
52085207
);
52095208

5210-
job_queue.thread_pool.spawnWg(&job_queue.wait_group, Package.Fetch.workerRun, .{
5211-
&fetch, "root",
5212-
});
5213-
job_queue.wait_group.wait();
5209+
job_queue.group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" });
5210+
job_queue.group.wait(io);
52145211

52155212
try job_queue.consolidateErrors();
52165213

@@ -6899,8 +6896,8 @@ fn cmdFetch(
68996896
defer global_cache_directory.handle.close();
69006897

69016898
var job_queue: Package.Fetch.JobQueue = .{
6899+
.io = io,
69026900
.http_client = &http_client,
6903-
.thread_pool = &thread_pool,
69046901
.global_cache = global_cache_directory,
69056902
.recursive = false,
69066903
.read_only = false,
@@ -6912,7 +6909,6 @@ fn cmdFetch(
69126909

69136910
var fetch: Package.Fetch = .{
69146911
.arena = std.heap.ArenaAllocator.init(gpa),
6915-
.io = io,
69166912
.location = .{ .path_or_url = path_or_url },
69176913
.location_tok = 0,
69186914
.hash_tok = .none,
@@ -6942,7 +6938,7 @@ fn cmdFetch(
69426938
defer fetch.deinit();
69436939

69446940
fetch.run() catch |err| switch (err) {
6945-
error.OutOfMemory => fatal("out of memory", .{}),
6941+
error.OutOfMemory, error.Canceled => |e| return e,
69466942
error.FetchFailed => {}, // error bundle checked below
69476943
};
69486944

0 commit comments

Comments
 (0)