Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4332: setting ZSTD_NBTHREADS=0 via environment variable #4334

Merged
merged 5 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ extern "C" {
/*------ Version ------*/
#define ZSTD_VERSION_MAJOR 1
#define ZSTD_VERSION_MINOR 5
#define ZSTD_VERSION_RELEASE 7
#define ZSTD_VERSION_RELEASE 8
#define ZSTD_VERSION_NUMBER (ZSTD_VERSION_MAJOR *100*100 + ZSTD_VERSION_MINOR *100 + ZSTD_VERSION_RELEASE)

/*! ZSTD_versionNumber() :
Expand Down
68 changes: 33 additions & 35 deletions programs/zstdcli.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#endif

#ifndef ZSTDCLI_NBTHREADS_DEFAULT
#define ZSTDCLI_NBTHREADS_DEFAULT MAX(1, MIN(4, UTIL_countLogicalCores() / 4))
#define ZSTDCLI_NBTHREADS_DEFAULT (unsigned)(MAX(1, MIN(4, UTIL_countLogicalCores() / 4)))
#endif


Expand Down Expand Up @@ -94,6 +94,7 @@ static U32 g_ldmBucketSizeLog = LDM_PARAM_DEFAULT;


#define DEFAULT_ACCEL 1
#define NBWORKERS_AUTOCPU 0

typedef enum { cover, fastCover, legacy } dictType;

Expand Down Expand Up @@ -685,6 +686,12 @@ static void printVersion(void)
DISPLAYOUT("lz4 version %s\n", FIO_lz4Version());
DISPLAYOUT("lzma version %s\n", FIO_lzmaVersion());

#ifdef ZSTD_MULTITHREAD
DISPLAYOUT("supports Multithreading \n");
#else
DISPLAYOUT("single-thread operations only \n");
#endif

/* posix support */
#ifdef _POSIX_C_SOURCE
DISPLAYOUT("_POSIX_C_SOURCE defined: %ldL\n", (long) _POSIX_C_SOURCE);
Expand Down Expand Up @@ -745,7 +752,7 @@ static void printActualCParams(const char* filename, const char* dictFileName, i

/* Environment variables for parameter setting */
#define ENV_CLEVEL "ZSTD_CLEVEL"
#define ENV_NBTHREADS "ZSTD_NBTHREADS" /* takes lower precedence than directly specifying -T# in the CLI */
#define ENV_NBWORKERS "ZSTD_NBTHREADS" /* takes lower precedence than directly specifying -T# in the CLI */

/* pick up environment variable */
static int init_cLevel(void) {
Expand Down Expand Up @@ -775,26 +782,28 @@ static int init_cLevel(void) {
return ZSTDCLI_CLEVEL_DEFAULT;
}

static unsigned init_nbWorkers(void) {
#ifdef ZSTD_MULTITHREAD
static int default_nbThreads(void) {
const char* const env = getenv(ENV_NBTHREADS);
const char* const env = getenv(ENV_NBWORKERS);
if (env != NULL) {
const char* ptr = env;
if ((*ptr>='0') && (*ptr<='9')) {
unsigned nbThreads;
if (readU32FromCharChecked(&ptr, &nbThreads)) {
DISPLAYLEVEL(2, "Ignore environment variable setting %s=%s: numeric value too large \n", ENV_NBTHREADS, env);
DISPLAYLEVEL(2, "Ignore environment variable setting %s=%s: numeric value too large \n", ENV_NBWORKERS, env);
return ZSTDCLI_NBTHREADS_DEFAULT;
} else if (*ptr == 0) {
return (int)nbThreads;
return nbThreads;
}
}
DISPLAYLEVEL(2, "Ignore environment variable setting %s=%s: not a valid unsigned value \n", ENV_NBTHREADS, env);
DISPLAYLEVEL(2, "Ignore environment variable setting %s=%s: not a valid unsigned value \n", ENV_NBWORKERS, env);
}

return ZSTDCLI_NBTHREADS_DEFAULT;
}
#else
return 1;
#endif
}

#define NEXT_FIELD(ptr) { \
if (*argument == '=') { \
Expand Down Expand Up @@ -874,13 +883,15 @@ int main(int argCount, const char* argv[])
singleThread = 0,
defaultLogicalCores = 0,
showDefaultCParams = 0,
ultra=0,
contentSize=1,
removeSrcFile=0;
ZSTD_ParamSwitch_e mmapDict=ZSTD_ps_auto;
contentSize = 1,
removeSrcFile = 0,
cLevel = init_cLevel(),
ultra = 0,
cLevelLast = MINCLEVEL - 1; /* for benchmark range */
unsigned nbWorkers = init_nbWorkers();
ZSTD_ParamSwitch_e mmapDict = ZSTD_ps_auto;
ZSTD_ParamSwitch_e useRowMatchFinder = ZSTD_ps_auto;
FIO_compressionType_t cType = FIO_zstdCompression;
int nbWorkers = -1; /* -1 means unset */
double compressibility = -1.0; /* lorem ipsum generator */
unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */
size_t chunkSize = 0;
Expand All @@ -890,8 +901,6 @@ int main(int argCount, const char* argv[])
FIO_progressSetting_e progress = FIO_ps_auto;
zstd_operation_mode operation = zom_compress;
ZSTD_compressionParameters compressionParams;
int cLevel = init_cLevel();
int cLevelLast = MINCLEVEL - 1; /* lower than minimum */
unsigned recursive = 0;
unsigned memLimit = 0;
FileNamesTable* filenames = UTIL_allocateFileNamesTable((size_t)argCount); /* argCount >= 1 */
Expand Down Expand Up @@ -930,15 +939,15 @@ int main(int argCount, const char* argv[])
programName = lastNameFromPath(programName);

/* preset behaviors */
if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbWorkers=0, singleThread=0;
if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbWorkers=NBWORKERS_AUTOCPU, singleThread=0;
if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress;
if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; FIO_overwriteMode(prefs); forceStdout=1; followLinks=1; FIO_setPassThroughFlag(prefs, 1); outFileName=stdoutmark; g_displayLevel=1; } /* supports multiple formats */
if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; FIO_overwriteMode(prefs); forceStdout=1; followLinks=1; FIO_setPassThroughFlag(prefs, 1); outFileName=stdoutmark; g_displayLevel=1; } /* behave like zcat, also supports multiple formats */
if (exeNameMatch(programName, ZSTD_GZ)) { /* behave like gzip */
suffix = GZ_EXTENSION; cType = FIO_gzipCompression; removeSrcFile=1;
dictCLevel = cLevel = 6; /* gzip default is -6 */
}
if (exeNameMatch(programName, ZSTD_GUNZIP)) { operation=zom_decompress; removeSrcFile=1; } /* behave like gunzip, also supports multiple formats */
if (exeNameMatch(programName, ZSTD_GUNZIP)) { operation=zom_decompress; removeSrcFile=1; } /* behave like gunzip, also supports multiple formats */
if (exeNameMatch(programName, ZSTD_GZCAT)) { operation=zom_decompress; FIO_overwriteMode(prefs); forceStdout=1; followLinks=1; FIO_setPassThroughFlag(prefs, 1); outFileName=stdoutmark; g_displayLevel=1; } /* behave like gzcat, also supports multiple formats */
if (exeNameMatch(programName, ZSTD_LZMA)) { suffix = LZMA_EXTENSION; cType = FIO_lzmaCompression; removeSrcFile=1; } /* behave like lzma */
if (exeNameMatch(programName, ZSTD_UNLZMA)) { operation=zom_decompress; cType = FIO_lzmaCompression; removeSrcFile=1; } /* behave like unlzma, also supports multiple formats */
Expand Down Expand Up @@ -1081,7 +1090,7 @@ int main(int argCount, const char* argv[])
continue;
}
#endif
if (longCommandWArg(&argument, "--threads")) { NEXT_INT32(nbWorkers); continue; }
if (longCommandWArg(&argument, "--threads")) { NEXT_UINT32(nbWorkers); continue; }
if (longCommandWArg(&argument, "--memlimit")) { NEXT_UINT32(memLimit); continue; }
if (longCommandWArg(&argument, "--memory")) { NEXT_UINT32(memLimit); continue; }
if (longCommandWArg(&argument, "--memlimit-decompress")) { NEXT_UINT32(memLimit); continue; }
Expand Down Expand Up @@ -1287,7 +1296,7 @@ int main(int argCount, const char* argv[])
/* nb of threads (hidden option) */
case 'T':
argument++;
nbWorkers = (int)readU32FromChar(&argument);
nbWorkers = readU32FromChar(&argument);
break;

/* Dictionary Selection level */
Expand Down Expand Up @@ -1332,28 +1341,17 @@ int main(int argCount, const char* argv[])
DISPLAYLEVEL(3, WELCOME_MESSAGE);

#ifdef ZSTD_MULTITHREAD
if ((operation==zom_decompress) && (nbWorkers > 1)) {
DISPLAYLEVEL(2, "Warning : decompression does not support multi-threading\n");
}
Comment on lines -1335 to -1337
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added due to Issue #2918. This may be a useful warning to keep.

Copy link
Contributor Author

@Cyan4973 Cyan4973 Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue is, with current code,
there is nothing to tell that nbWorkers wasn't set.
Whether it's set via the environment variable, or via the core count detection, it's necessarily set to "some value".
There is no difference with a user setting this value manually via command line.

Therefore, on any modern system with >= 8 logical cores, nbWorkers >= 2, and consequently always triggers this warning.

This could be solved by adding yet another variable, which would track if nbWorkers was set explicitly, or if it happens to be set to a value >1 just as a consequence of automatic core count detection. We would also have to decide if setting nbWorkers via environment variable count or not to trigger this warning.

So it's a bit more work, a bit more questions, and specific to the topic of displaying a warning during decompression mode. It felt it could deserve its own PR for proper focus.

if ((nbWorkers==0) && (!singleThread)) {
/* automatically set # workers based on # of reported cpus */
if ((nbWorkers==NBWORKERS_AUTOCPU) && (!singleThread)) {
/* automatically set # workers based on # of reported cpu cores */
if (defaultLogicalCores) {
nbWorkers = UTIL_countLogicalCores();
nbWorkers = (unsigned)UTIL_countLogicalCores();
DISPLAYLEVEL(3, "Note: %d logical core(s) detected \n", nbWorkers);
} else {
nbWorkers = UTIL_countPhysicalCores();
nbWorkers = (unsigned)UTIL_countPhysicalCores();
DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbWorkers);
}
}
/* Resolve to default if nbWorkers is still unset */
if (nbWorkers == -1) {
if (operation == zom_decompress) {
nbWorkers = 1;
} else {
nbWorkers = default_nbThreads();
}
}
if (operation != zom_bench)
if (operation == zom_compress)
DISPLAYLEVEL(4, "Compressing with %u worker threads \n", nbWorkers);
#else
(void)singleThread; (void)nbWorkers; (void)defaultLogicalCores;
Expand Down
4 changes: 0 additions & 4 deletions tests/cli-tests/compression/multi-threaded.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,3 @@ zstd -T0 -f file -q ; zstd -t file.zst
zstd -T0 --auto-threads=logical -f file -q ; zstd -t file.zst
zstd -T0 --auto-threads=physical -f file -q ; zstd -t file.zst
zstd -T0 --jobsize=1M -f file -q ; zstd -t file.zst
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test here that tests the environment variable works?

If we print the number of threads under some verbosity we could do something like:

ZSTD_NBTHREADS=0 zstd -f file -vv 2>&1 | grep "physical core(s) detected"
zstd -tq file.zst
ZSTD_NBTHREADS=4 zstd -f file -vv 2>&1 | grep "4 threads"
zstd -tq file.zst

Copy link
Contributor Author

@Cyan4973 Cyan4973 Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely,
I was considering to do it in another PR, in order to observe the test failure under current conditions, i.e. without the patch.
But I could also just add it to this PR as a follow up commit.


# multi-thread decompression warning test
zstd -T0 -f file -q ; zstd -t file.zst; zstd -T0 -d file.zst -o file3
zstd -T0 -f file -q ; zstd -t file.zst; zstd -T2 -d file.zst -o file4
5 changes: 0 additions & 5 deletions tests/cli-tests/compression/multi-threaded.sh.stderr.exact
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,3 @@ file.zst : 65537 bytes
file.zst : 65537 bytes
file.zst : 65537 bytes
file.zst : 65537 bytes
file.zst : 65537 bytes
file.zst : 65537 bytes
file.zst : 65537 bytes
Warning : decompression does not support multi-threading
file.zst : 65537 bytes
22 changes: 9 additions & 13 deletions tests/playTests.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/sh

set -e # exit immediately on error
# set -x # print commands before execution (debug)
set -x # print commands before execution (debug)

unset ZSTD_CLEVEL
unset ZSTD_NBTHREADS
Expand All @@ -16,13 +16,7 @@ datagen() {
"$DATAGEN_BIN" "$@"
}

zstd() {
if [ -z "$EXE_PREFIX" ]; then
"$ZSTD_BIN" "$@"
else
"$EXE_PREFIX" "$ZSTD_BIN" "$@"
fi
}
alias zstd='$EXE_PREFIX $ZSTD_BIN'

sudoZstd() {
if [ -z "$EXE_PREFIX" ]; then
Expand Down Expand Up @@ -1563,14 +1557,16 @@ then
println "\n===> zstdmt environment variable tests "
echo "multifoo" >> mt_tmp
ZSTD_NBTHREADS=-3 zstd -f mt_tmp # negative value, warn and revert to default setting
ZSTD_NBTHREADS='' zstd -f mt_tmp # empty env var, warn and revert to default setting
ZSTD_NBTHREADS=- zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS=a zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS=+a zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS='' zstd -f mt_tmp # empty env var, warn and revert to default setting
ZSTD_NBTHREADS=- zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS=a zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS=+a zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS=3a7 zstd -f mt_tmp # malformed env var, warn and revert to default setting
ZSTD_NBTHREADS=50000000000 zstd -f mt_tmp # numeric value too large, warn and revert to default setting=
ZSTD_NBTHREADS=2 zstd -f mt_tmp # correct usage
ZSTD_NBTHREADS=1 zstd -f mt_tmp # correct usage: single thread
ZSTD_NBTHREADS=1 zstd -f mt_tmp # correct usage: single worker
ZSTD_NBTHREADS=4 zstd -f mt_tmp -vv 2>&1 | $GREP "4 worker threads" # check message
ZSTD_NBTHREADS=0 zstd -f mt_tmp -vv 2>&1 | $GREP "core(s) detected" # check core count autodetection is triggered
# temporary envvar changes in the above tests would actually persist in macos /bin/sh
unset ZSTD_NBTHREADS
rm -f mt_tmp*
Expand Down