Skip to content

Commit 8605484

Browse files
authored
[FLINK-37620][state/forst] ForSt Sync mode support remote storage (#26422)
1 parent 14e85ec commit 8605484

File tree

11 files changed

+219
-164
lines changed

11 files changed

+219
-164
lines changed

Diff for: docs/content.zh/docs/ops/state/disaggregated_state.md

+14
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,20 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state
150150
checkpoint and fast recovery, since the ForSt will perform file copy between the primary
151151
storage location and the checkpoint directory during checkpointing and recovery.
152152

153+
#### ForSt Local Storage Location
154+
155+
By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When
156+
using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**.
157+
Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access
158+
along with asynchronous remote state access could help achieve better overall throughput.
159+
If you want the operators with synchronous state APIs to store state in remote, the following configuration will help:
160+
```yaml
161+
state.backend.forst.sync.enforce-local: false
162+
```
163+
And you can specify the local storage location via:
164+
```yaml
165+
state.backend.forst.local-dir: path-to-local-dir
166+
```
153167

154168
#### ForSt File Cache
155169

Diff for: docs/content/docs/ops/state/disaggregated_state.md

+14
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,20 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state
150150
checkpoint and fast recovery, since the ForSt will perform file copy between the primary
151151
storage location and the checkpoint directory during checkpointing and recovery.
152152

153+
#### ForSt Local Storage Location
154+
155+
By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When
156+
using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**.
157+
Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access
158+
along with asynchronous remote state access could help achieve better overall throughput.
159+
If you want the operators with synchronous state APIs to store state in remote, the following configuration will help:
160+
```yaml
161+
state.backend.forst.sync.enforce-local: false
162+
```
163+
And you can specify the local storage location via:
164+
```yaml
165+
state.backend.forst.local-dir: path-to-local-dir
166+
```
153167

154168
#### ForSt File Cache
155169

Diff for: docs/layouts/shortcodes/generated/forst_configuration.html

+6
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@
116116
<td>String</td>
117117
<td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td>
118118
</tr>
119+
<tr>
120+
<td><h5>state.backend.forst.sync.enforce-local</h5></td>
121+
<td style="word-wrap: break-word;">true</td>
122+
<td>Boolean</td>
123+
<td>Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job.</td>
124+
</tr>
119125
<tr>
120126
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>
121127
<td style="word-wrap: break-word;">128</td>

Diff for: docs/layouts/shortcodes/generated/state_backend_forst_section.html

+6
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
<td>String</td>
5151
<td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td>
5252
</tr>
53+
<tr>
54+
<td><h5>state.backend.forst.sync.enforce-local</h5></td>
55+
<td style="word-wrap: break-word;">true</td>
56+
<td>Boolean</td>
57+
<td>Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job.</td>
58+
</tr>
5359
<tr>
5460
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>
5561
<td style="word-wrap: break-word;">128</td>

Diff for: flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java

-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,6 @@ private ForStRestoreOperation getForStRestoreOperation(
359359
// env. We expect to directly use the dfs directory in flink env or local directory as
360360
// working dir. We will implement this in ForStDB later, but before that, we achieved this
361361
// by setting the dbPath to "/" when the dfs directory existed.
362-
// TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir
363362
Path instanceForStPath =
364363
optionsContainer.getRemoteForStPath() == null
365364
? optionsContainer.getLocalForStPath()

Diff for: flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java

+11
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ public class ForStOptions {
6666
CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT,
6767
LOCAL_DIR_AS_PRIMARY_SHORTCUT));
6868

69+
@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
70+
public static final ConfigOption<Boolean> SYNC_ENFORCE_LOCAL =
71+
ConfigOptions.key("state.backend.forst.sync.enforce-local")
72+
.booleanType()
73+
.defaultValue(true)
74+
.withDescription(
75+
"Whether to enforce local state for operators in synchronous mode when"
76+
+ " enabling disaggregated state. This is useful in cases where "
77+
+ "both synchronous operators and asynchronous operators are used "
78+
+ "in the same job.");
79+
6980
@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
7081
public static final ConfigOption<String> CACHE_DIRECTORY =
7182
ConfigOptions.key("state.backend.forst.cache.dir")

Diff for: flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java

+43-42
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.common.ExecutionConfig;
2424
import org.apache.flink.api.common.JobID;
25+
import org.apache.flink.api.java.tuple.Tuple2;
2526
import org.apache.flink.configuration.CheckpointingOptions;
2627
import org.apache.flink.configuration.ConfigOption;
2728
import org.apache.flink.configuration.Configuration;
@@ -188,8 +189,12 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend
188189
/** The recovery claim mode. */
189190
private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;
190191

192+
/** Whether to share the ForSt remote directory with checkpoint directory. */
191193
private boolean remoteShareWithCheckpoint;
192194

195+
/** Whether to use local directory as primary directory in synchronous mode. */
196+
private boolean forceSyncLocal;
197+
193198
// ------------------------------------------------------------------------
194199

195200
/** Creates a new {@code ForStStateBackend} for storing state. */
@@ -203,6 +208,7 @@ public ForStStateBackend() {
203208
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
204209
this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
205210
this.remoteShareWithCheckpoint = false;
211+
this.forceSyncLocal = true;
206212
}
207213

208214
/**
@@ -237,6 +243,7 @@ private ForStStateBackend(
237243
: new Path(remoteDirStr);
238244
}
239245
}
246+
this.forceSyncLocal = config.get(ForStOptions.SYNC_ENFORCE_LOCAL);
240247

241248
this.priorityQueueConfig =
242249
ForStPriorityQueueConfig.fromOtherAndConfiguration(
@@ -409,31 +416,7 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
409416

410417
lazyInitializeForJob(env, fileCompatibleIdentifier);
411418

412-
String opChildPath =
413-
String.format(
414-
"op_%s_attempt_%s",
415-
fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber());
416-
417-
Path localBasePath =
418-
new Path(
419-
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath)
420-
.getAbsolutePath());
421-
Path remoteBasePath = null;
422-
if (remoteForStDirectory != null) {
423-
remoteBasePath =
424-
new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath);
425-
} else if (remoteShareWithCheckpoint) {
426-
if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) {
427-
Path sharedStateDirectory =
428-
((FsCheckpointStorageAccess) env.getCheckpointStorageAccess())
429-
.getSharedStateDirectory();
430-
remoteBasePath = new Path(sharedStateDirectory, opChildPath);
431-
LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath);
432-
} else {
433-
LOG.warn(
434-
"Remote ForSt directory can't be set, because checkpoint directory isn't on file system.");
435-
}
436-
}
419+
Tuple2<Path, Path> localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env);
437420

438421
final OpaqueMemoryResource<ForStSharedResources> sharedResources =
439422
ForStOperationUtils.allocateSharedCachesIfConfigured(
@@ -448,8 +431,8 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
448431
final ForStResourceContainer resourceContainer =
449432
createOptionsAndResourceContainer(
450433
sharedResources,
451-
localBasePath,
452-
remoteBasePath,
434+
localAndRemoteBasePath.f0,
435+
localAndRemoteBasePath.f1,
453436
env.getCheckpointStorageAccess(),
454437
parameters.getMetricGroup(),
455438
nativeMetricOptions.isStatisticsEnabled());
@@ -505,17 +488,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
505488

506489
lazyInitializeForJob(env, fileCompatibleIdentifier);
507490

508-
Path instanceBasePath =
509-
new Path(
510-
new File(
511-
getNextStoragePath(),
512-
"job_"
513-
+ jobId
514-
+ "_op_"
515-
+ fileCompatibleIdentifier
516-
+ "_uuid_"
517-
+ UUID.randomUUID())
518-
.getAbsolutePath());
491+
Tuple2<Path, Path> localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env);
519492

520493
LocalRecoveryConfig localRecoveryConfig =
521494
env.getTaskStateManager().createLocalRecoveryConfig();
@@ -533,10 +506,10 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
533506
final ForStResourceContainer resourceContainer =
534507
createOptionsAndResourceContainer(
535508
sharedResources,
536-
instanceBasePath,
537-
null,
509+
localAndRemoteBasePath.f0,
510+
forceSyncLocal ? null : localAndRemoteBasePath.f1,
538511
env.getCheckpointStorageAccess(),
539-
null,
512+
parameters.getMetricGroup(),
540513
nativeMetricOptions.isStatisticsEnabled());
541514

542515
ExecutionConfig executionConfig = env.getExecutionConfig();
@@ -549,7 +522,6 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
549522
new ForStSyncKeyedStateBackendBuilder<>(
550523
parameters.getOperatorIdentifier(),
551524
env.getUserCodeClassLoader().asClassLoader(),
552-
instanceBasePath,
553525
resourceContainer,
554526
stateName -> resourceContainer.getColumnOptions(),
555527
parameters.getKvStateRegistry(),
@@ -818,6 +790,35 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon
818790
return configuration;
819791
}
820792

793+
Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment env) {
794+
String opChildPath =
795+
String.format(
796+
"op_%s_attempt_%s",
797+
operatorIdentifier, env.getTaskInfo().getAttemptNumber());
798+
799+
Path localBasePath =
800+
new Path(
801+
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath)
802+
.getAbsolutePath());
803+
Path remoteBasePath = null;
804+
if (remoteForStDirectory != null) {
805+
remoteBasePath =
806+
new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath);
807+
} else if (remoteShareWithCheckpoint) {
808+
if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) {
809+
Path sharedStateDirectory =
810+
((FsCheckpointStorageAccess) env.getCheckpointStorageAccess())
811+
.getSharedStateDirectory();
812+
remoteBasePath = new Path(sharedStateDirectory, opChildPath);
813+
LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath);
814+
} else {
815+
LOG.warn(
816+
"Remote ForSt directory can't be set, because checkpoint directory isn't on file system.");
817+
}
818+
}
819+
return Tuple2.of(localBasePath, remoteBasePath);
820+
}
821+
821822
@VisibleForTesting
822823
ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) {
823824
return createOptionsAndResourceContainer(null, localBasePath, null, null, null, false);

Diff for: flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
2929
import static org.apache.flink.util.Preconditions.checkNotNull;
3030

31-
/** The configuration of rocksDB priority queue state implementation. */
31+
/** The configuration of ForSt priority queue state implementation. */
3232
public class ForStPriorityQueueConfig implements Serializable {
3333

3434
private static final long serialVersionUID = 1L;
@@ -39,17 +39,17 @@ public class ForStPriorityQueueConfig implements Serializable {
3939
private @Nullable ForStStateBackend.PriorityQueueStateType priorityQueueStateType;
4040

4141
/** cache size per keyGroup for rocksDB priority queue state. */
42-
private int rocksDBPriorityQueueSetCacheSize;
42+
private int forStDBPriorityQueueSetCacheSize;
4343

4444
public ForStPriorityQueueConfig() {
4545
this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE);
4646
}
4747

4848
public ForStPriorityQueueConfig(
4949
ForStStateBackend.PriorityQueueStateType priorityQueueStateType,
50-
int rocksDBPriorityQueueSetCacheSize) {
50+
int forStDBPriorityQueueSetCacheSize) {
5151
this.priorityQueueStateType = priorityQueueStateType;
52-
this.rocksDBPriorityQueueSetCacheSize = rocksDBPriorityQueueSetCacheSize;
52+
this.forStDBPriorityQueueSetCacheSize = forStDBPriorityQueueSetCacheSize;
5353
}
5454

5555
/**
@@ -70,10 +70,10 @@ public void setPriorityQueueStateType(ForStStateBackend.PriorityQueueStateType t
7070
* Gets the cache size of rocksDB priority queue set. It will fall back to the default value if
7171
* it is not explicitly set.
7272
*/
73-
public int getRocksDBPriorityQueueSetCacheSize() {
74-
return rocksDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
73+
public int getForStDBPriorityQueueSetCacheSize() {
74+
return forStDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
7575
? FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue()
76-
: rocksDBPriorityQueueSetCacheSize;
76+
: forStDBPriorityQueueSetCacheSize;
7777
}
7878

7979
public static ForStPriorityQueueConfig fromOtherAndConfiguration(
@@ -83,10 +83,10 @@ public static ForStPriorityQueueConfig fromOtherAndConfiguration(
8383
? config.get(TIMER_SERVICE_FACTORY)
8484
: other.priorityQueueStateType;
8585
int cacheSize =
86-
(other.rocksDBPriorityQueueSetCacheSize
86+
(other.forStDBPriorityQueueSetCacheSize
8787
== UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE)
8888
? config.get(FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE)
89-
: other.rocksDBPriorityQueueSetCacheSize;
89+
: other.forStDBPriorityQueueSetCacheSize;
9090
return new ForStPriorityQueueConfig(priorityQueueType, cacheSize);
9191
}
9292

0 commit comments

Comments
 (0)