Skip to content

Commit cd6cda2

Browse files
committed
file: Support zstd compression
1 parent b2e2bc4 commit cd6cda2

File tree

6 files changed

+120
-43
lines changed

6 files changed

+120
-43
lines changed

lib/kernel/src/file.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ An IO device as returned by `open/2`.
282282
Size :: non_neg_integer(),
283283
Delay :: non_neg_integer()}
284284
| 'delayed_write' | {'read_ahead', Size :: pos_integer()}
285-
| 'read_ahead' | 'compressed' | 'compressed_one'
285+
| 'read_ahead' | 'compressed' | 'compressed_one' |
286+
{zstd, zstd:compress_parameters() |
287+
zstd:decompress_parameters()}
286288
| {'encoding', unicode:encoding()}
287289
| sync.
288290
-type deep_list() :: [char() | atom() | deep_list()].

lib/kernel/src/raw_file_io.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ match_list(_Other) -> false.
6060

6161
match_compressed(compressed) -> true;
6262
match_compressed(compressed_one) -> true;
63+
match_compressed({zstd, _}) -> true;
6364
match_compressed(_Other) -> false.
6465

6566
match_delayed({delayed_write, _Size, _Timeout}) -> true;

lib/kernel/src/raw_file_io_deflate.erl

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,28 @@ init({Owner, Secret, [compressed]}) ->
4242
monitor => Monitor,
4343
secret => Secret,
4444
position => 0,
45-
zlib => Z },
46-
{ok, opening, Data}.
45+
deflate => fun(Data) -> zlib:deflate(Z, Data) end,
46+
flush => fun() -> zlib:deflate(Z, [], finish) end },
47+
{ok, opening, Data};
48+
init({Owner, Secret, [{zstd, Parameters}]}) ->
49+
Monitor = monitor(process, Owner),
50+
try zstd:context(compress, Parameters) of
51+
{ok, Z} ->
52+
Data =
53+
#{ owner => Owner,
54+
monitor => Monitor,
55+
secret => Secret,
56+
position => 0,
57+
deflate => fun(Data) -> zstd:compress(Data, Z) end,
58+
flush => fun() ->
59+
{done, Data} = zstd:finish(Z, []),
60+
Data
61+
end},
62+
{ok, opening, Data}
63+
catch
64+
_:_ ->
65+
{error, badarg}
66+
end.
4767

4868
opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
4969
case raw_file_io:open(Filename, Modes) of
@@ -113,9 +133,9 @@ opened(_Event, _Request, _Data) ->
113133
keep_state_and_data.
114134

115135
write(Data, IOVec) ->
116-
#{ handle := PrivateFd, position := Position, zlib := Z } = Data,
136+
#{ handle := PrivateFd, position := Position, deflate := Deflate } = Data,
117137
UncompressedSize = iolist_size(IOVec),
118-
case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, IOVec)]) of
138+
case ?CALL_FD(PrivateFd, write, [Deflate(IOVec)]) of
119139
ok -> {ok, Data#{ position := (Position + UncompressedSize) }};
120140
Other -> Other
121141
end.
@@ -152,8 +172,8 @@ position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
152172
position_1(#{ position := Current }, Desired) when Current > Desired ->
153173
{error, einval}.
154174

155-
flush_deflate_state(#{ handle := PrivateFd, zlib := Z }) ->
156-
case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, [], finish)]) of
175+
flush_deflate_state(#{ handle := PrivateFd, flush := Flush }) ->
176+
case ?CALL_FD(PrivateFd, write, [Flush()]) of
157177
ok -> ok;
158178
Other -> Other
159179
end.

lib/kernel/src/raw_file_io_inflate.erl

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
-behavior(gen_statem).
2626

2727
-export([init/1, callback_mode/0, terminate/3]).
28-
-export([opening/3, opened_gzip/3, opened_passthrough/3]).
28+
-export([opening/3, opened_active/3, opened_passthrough/3]).
2929

3030
-include("file_int.hrl").
3131

@@ -37,41 +37,71 @@ callback_mode() -> state_functions.
3737
init({Owner, Secret, [compressed]}) ->
3838
%% 'reset mode', which resets the inflate state at the end of every stream,
3939
%% allowing us to read concatenated gzip files.
40-
init(Owner, Secret, reset);
40+
init_zlib(Owner, Secret, reset);
4141
init({Owner, Secret, [compressed_one]}) ->
4242
%% 'cut mode', which stops the inflate after one member
4343
%% allowing us to read gzipped tar files
44-
init(Owner, Secret, cut).
44+
init_zlib(Owner, Secret, cut);
45+
init({Owner, Secret, [{zstd, Parameters}]}) ->
46+
%% 'cut mode', which stops the inflate after one member
47+
%% allowing us to read gzipped tar files
48+
init_zstd(Owner, Secret, Parameters).
4549

46-
init(Owner, Secret, Mode) ->
50+
init_zlib(Owner, Secret, Mode) ->
4751
Monitor = monitor(process, Owner),
4852
%% We're using the undocumented inflateInit/3 to set the mode
4953
Z = zlib:open(),
5054
ok = zlib:inflateInit(Z, ?GZIP_WBITS, Mode),
55+
5156
Data =
5257
#{ owner => Owner,
5358
monitor => Monitor,
5459
secret => Secret,
60+
choose_state => fun(PrivateFd) ->
61+
%% The old driver fell back to plain reads
62+
%% if the file didn't start with the magic
63+
%% gzip bytes.
64+
State =
65+
case ?CALL_FD(PrivateFd, read, [2]) of
66+
{ok, <<16#1F, 16#8B>>} ->
67+
opened_active;
68+
_Other ->
69+
opened_passthrough
70+
end,
71+
{ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
72+
State
73+
end,
5574
position => 0,
5675
buffer => prim_buffer:new(),
57-
zlib => Z },
76+
inflate => fun(Data) -> zlib:inflate(Z, Data) end,
77+
reset => fun() -> zlib:inflateReset(Z) end },
5878
{ok, opening, Data}.
5979

60-
%% The old driver fell back to plain reads if the file didn't start with the
61-
%% magic gzip bytes.
62-
choose_decompression_state(PrivateFd) ->
63-
State =
64-
case ?CALL_FD(PrivateFd, read, [2]) of
65-
{ok, <<16#1F, 16#8B>>} -> opened_gzip;
66-
_Other -> opened_passthrough
67-
end,
68-
{ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
69-
State.
80+
init_zstd(Owner, Secret, Parameters) ->
81+
Monitor = monitor(process, Owner),
82+
try zstd:context(decompress, Parameters) of
83+
{ok, Z} ->
84+
Data =
85+
#{ owner => Owner,
86+
monitor => Monitor,
87+
secret => Secret,
88+
choose_state => fun(_) -> opened_active end,
89+
position => 0,
90+
buffer => prim_buffer:new(),
91+
inflate => fun(Data) -> zstd:decompress(Data, Z) end,
92+
reset => fun() -> zstd:reset(Z) end},
93+
{ok, opening, Data}
94+
catch
95+
_:_ ->
96+
{error, badarg}
97+
end.
7098

71-
opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
99+
opening({call, From},
100+
{'$open', Secret, Filename, Modes},
101+
#{ secret := Secret, choose_state := ChooseState } = Data) ->
72102
case raw_file_io:open(Filename, Modes) of
73103
{ok, PrivateFd} ->
74-
NextState = choose_decompression_state(PrivateFd),
104+
NextState = ChooseState(PrivateFd),
75105
NewData = Data#{ handle => PrivateFd },
76106
{next_state, NextState, NewData, [{reply, From, ok}]};
77107
Other ->
@@ -109,16 +139,16 @@ opened_passthrough(_Event, _Request, _Data) ->
109139

110140
%%
111141

112-
opened_gzip(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
142+
opened_active(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
113143
{stop, shutdown};
114144

115-
opened_gzip(info, _Message, _Data) ->
145+
opened_active(info, _Message, _Data) ->
116146
keep_state_and_data;
117147

118-
opened_gzip({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
148+
opened_active({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
119149
internal_close(From, Data);
120150

121-
opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
151+
opened_active({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
122152
case position(Data, Mark) of
123153
{ok, NewData, Result} ->
124154
Response = {ok, Result},
@@ -127,7 +157,7 @@ opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner }
127157
{keep_state_and_data, [{reply, From, Other}]}
128158
end;
129159

130-
opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
160+
opened_active({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
131161
case read(Data, Size) of
132162
{ok, NewData, Result} ->
133163
Response = {ok, Result},
@@ -136,7 +166,7 @@ opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Da
136166
{keep_state_and_data, [{reply, From, Other}]}
137167
end;
138168

139-
opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
169+
opened_active({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
140170
case read_line(Data) of
141171
{ok, NewData, Result} ->
142172
Response = {ok, Result},
@@ -145,20 +175,20 @@ opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Dat
145175
{keep_state_and_data, [{reply, From, Other}]}
146176
end;
147177

148-
opened_gzip({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
178+
opened_active({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
149179
Response = {error, ebadf},
150180
{keep_state_and_data, [{reply, From, Response}]};
151181

152-
opened_gzip({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
182+
opened_active({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
153183
Response = {error, enotsup},
154184
{keep_state_and_data, [{reply, From, Response}]};
155185

156-
opened_gzip({call, _From}, _Request, _Data) ->
186+
opened_active({call, _From}, _Request, _Data) ->
157187
%% The client functions filter this out, so we'll crash if the user does
158188
%% anything stupid on purpose.
159189
{shutdown, protocol_violation};
160190

161-
opened_gzip(_Event, _Request, _Data) ->
191+
opened_active(_Event, _Request, _Data) ->
162192
keep_state_and_data.
163193

164194
%%
@@ -178,8 +208,8 @@ read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize < ReadSize ->
178208
#{ handle := PrivateFd } = Data,
179209
case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
180210
{ok, Compressed} ->
181-
#{ zlib := Z } = Data,
182-
Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
211+
#{ inflate := Inflate } = Data,
212+
Uncompressed = erlang:iolist_to_iovec(Inflate(Compressed)),
183213
prim_buffer:write(Buffer, Uncompressed),
184214
read_1(Data, Buffer, prim_buffer:size(Buffer), ReadSize);
185215
eof when BufferSize > 0 ->
@@ -198,10 +228,10 @@ read_line(#{ buffer := Buffer } = Data) ->
198228
end.
199229

200230
read_line_1(Data, Buffer, not_found) ->
201-
#{ handle := PrivateFd, zlib := Z } = Data,
231+
#{ handle := PrivateFd, inflate := Inflate } = Data,
202232
case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
203233
{ok, Compressed} ->
204-
Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
234+
Uncompressed = erlang:iolist_to_iovec(Inflate(Compressed)),
205235
prim_buffer:write(Buffer, Uncompressed),
206236
read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n));
207237
eof ->
@@ -257,10 +287,10 @@ position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
257287
Other -> Other
258288
end;
259289
position_1(#{ position := Current } = Data, Desired) when Current > Desired ->
260-
#{ handle := PrivateFd, buffer := Buffer, zlib := Z } = Data,
290+
#{ handle := PrivateFd, buffer := Buffer, reset := Reset } = Data,
261291
case ?CALL_FD(PrivateFd, position, [bof]) of
262292
{ok, 0} ->
263-
ok = zlib:inflateReset(Z),
293+
ok = Reset(),
264294
prim_buffer:wipe(Buffer),
265295
position_1(Data#{ position => 0 }, Desired);
266296
Other ->

lib/kernel/test/file_SUITE.erl

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@
7171
read_compressed_cooked/1, read_compressed_cooked_binary/1,
7272
read_cooked_tar_problem/1,
7373
write_compressed/1, compress_errors/1, catenated_gzips/1,
74-
compress_async_crash/1]).
74+
compress_async_crash/1,
75+
zstd/1]).
7576

7677
-export([ make_link/1, read_link_info_for_non_link/1, symlinks/1]).
7778

@@ -171,7 +172,8 @@ groups() ->
171172
[read_compressed_cooked, read_compressed_cooked_binary,
172173
read_cooked_tar_problem, read_not_really_compressed,
173174
write_compressed, compress_errors, catenated_gzips,
174-
compress_async_crash]},
175+
compress_async_crash,
176+
zstd]},
175177
{links, [],
176178
[make_link, read_link_info_for_non_link, symlinks]},
177179
{bench, [],
@@ -3026,6 +3028,25 @@ compress_async_crash_loop(N, Path, ExpectedData) ->
30263028
end,
30273029
compress_async_crash_loop(N - 1, Path, ExpectedData).
30283030

3031+
zstd(Config) when is_list(Config) ->
3032+
DataDir = proplists:get_value(data_dir, Config),
3033+
Path = filename:join(DataDir, "test.zstd"),
3034+
ExpectedData = <<"qwerty">>,
3035+
3036+
_ = ?FILE_MODULE:delete(Path),
3037+
{ok, FdW} = ?FILE_MODULE:open(Path, [write, binary, {zstd, #{}}]),
3038+
ok = ?FILE_MODULE:write(FdW, ExpectedData),
3039+
ok = ?FILE_MODULE:close(FdW),
3040+
3041+
{ok, FdR} = ?FILE_MODULE:open(Path, [read, binary, {zstd, #{}}]),
3042+
{ok, ExpectedData} = ?FILE_MODULE:read(FdR, 1 bsl 10),
3043+
ok = ?FILE_MODULE:close(FdR),
3044+
3045+
{ok, Compressed} = ?FILE_MODULE:read_file(Path),
3046+
ExpectedData = iolist_to_binary(zstd:decompress(Compressed)),
3047+
3048+
ok.
3049+
30293050
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
30303051

30313052
unicode(Config) when is_list(Config) ->

lib/stdlib/src/zstd.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ did not create it.
7272
""".
7373
-moduledoc #{ since => "OTP 28.0" }.
7474

75-
-export_type([context/0, dict/0]).
75+
-export_type([context/0,
76+
dict/0,
77+
compress_parameters/0,
78+
decompress_parameters/0]).
7679

7780
-doc """
7881
A compression or decompression context that can be used

0 commit comments

Comments
 (0)