Skip to content

Commit 098a2ed

Browse files
committed
Merge branch 'main' of https://github.com/apache/celeborn into diskFullReserveSlotsRejection
2 parents 7f7ffac + 71a7d0a commit 098a2ed

43 files changed

Lines changed: 1936 additions & 223 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/docker-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
tar -xzf apache-celeborn-${VERSION}-bin.tgz
3434
3535
- name: Login to Docker Hub
36-
uses: docker/login-action@v3
36+
uses: docker/login-action@4907a6ddec9925e35a0a9e82d7399ccc52663121
3737
with:
3838
username: ${{ secrets.DOCKERHUB_USER }}
3939
password: ${{ secrets.DOCKERHUB_TOKEN }}

.github/workflows/grafana.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@ jobs:
4444
check-latest: true
4545
# See https://github.com/grafana/dashboard-linter
4646
- name: Install dashboard-linter
47-
run: go install github.com/grafana/dashboard-linter@latest
47+
run: |
48+
git clone https://github.com/grafana/dashboard-linter.git
49+
cd dashboard-linter
50+
go build -o dashboard-linter
4851
- name: Lint Celeborn Dashboards
4952
run: |
5053
for dashboard in assets/grafana/*.json; do
5154
python3 dev/lint_grafana.py "$dashboard"
52-
dashboard-linter lint "$dashboard" --config assets/grafana/.lint --strict
55+
./dashboard-linter/dashboard-linter lint "$dashboard" --config assets/grafana/.lint --strict
5356
done

assets/grafana/celeborn-dashboard.json

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4063,6 +4063,182 @@
40634063
],
40644064
"title": "metrics_PartitionFileSizeBytes_Max",
40654065
"type": "timeseries"
4066+
},
4067+
{
4068+
"datasource": {
4069+
"type": "prometheus",
4070+
"uid": "${DS_PROMETHEUS}"
4071+
},
4072+
"description": "Cumulative count of metadata DB (RocksDB) operations performed by the worker, broken down by operation (read|write) and status (success|fail).",
4073+
"fieldConfig": {
4074+
"defaults": {
4075+
"color": {
4076+
"mode": "palette-classic"
4077+
},
4078+
"custom": {
4079+
"axisCenteredZero": false,
4080+
"axisColorMode": "text",
4081+
"axisLabel": "",
4082+
"axisPlacement": "auto",
4083+
"barAlignment": 0,
4084+
"drawStyle": "line",
4085+
"fillOpacity": 0,
4086+
"gradientMode": "none",
4087+
"hideFrom": {
4088+
"legend": false,
4089+
"tooltip": false,
4090+
"viz": false
4091+
},
4092+
"lineInterpolation": "linear",
4093+
"lineWidth": 1,
4094+
"pointSize": 5,
4095+
"scaleDistribution": {
4096+
"type": "linear"
4097+
},
4098+
"showPoints": "auto",
4099+
"spanNulls": false,
4100+
"stacking": {
4101+
"group": "A",
4102+
"mode": "none"
4103+
},
4104+
"thresholdsStyle": {
4105+
"mode": "off"
4106+
}
4107+
},
4108+
"mappings": [],
4109+
"thresholds": {
4110+
"mode": "absolute",
4111+
"steps": [
4112+
{
4113+
"color": "green"
4114+
}
4115+
]
4116+
}
4117+
},
4118+
"overrides": []
4119+
},
4120+
"gridPos": {
4121+
"h": 8,
4122+
"w": 12,
4123+
"x": 0,
4124+
"y": 216
4125+
},
4126+
"id": 270,
4127+
"options": {
4128+
"legend": {
4129+
"calcs": [],
4130+
"displayMode": "list",
4131+
"placement": "bottom",
4132+
"showLegend": true
4133+
},
4134+
"tooltip": {
4135+
"mode": "single",
4136+
"sort": "none"
4137+
}
4138+
},
4139+
"targets": [
4140+
{
4141+
"datasource": {
4142+
"type": "prometheus",
4143+
"uid": "${DS_PROMETHEUS}"
4144+
},
4145+
"editorMode": "code",
4146+
"expr": "metrics_MetadataOperationStatusCount_Count{role=\"Worker\", instance=~\"${instance}\"}",
4147+
"legendFormat": "{{dbBackend}} {{operation}}/{{status}} ${baseLegend}",
4148+
"range": true,
4149+
"refId": "A"
4150+
}
4151+
],
4152+
"title": "metrics_MetadataOperationStatusCount_Count",
4153+
"type": "timeseries"
4154+
},
4155+
{
4156+
"datasource": {
4157+
"type": "prometheus",
4158+
"uid": "${DS_PROMETHEUS}"
4159+
},
4160+
"description": "Number of metadata DB (RocksDB) operations on the worker over the last 1 minute, by operation (read|write) and status (success|fail). Useful for spotting RocksDB error spikes that the cumulative chart visually flattens.",
4161+
"fieldConfig": {
4162+
"defaults": {
4163+
"color": {
4164+
"mode": "palette-classic"
4165+
},
4166+
"custom": {
4167+
"axisCenteredZero": false,
4168+
"axisColorMode": "text",
4169+
"axisLabel": "",
4170+
"axisPlacement": "auto",
4171+
"barAlignment": 0,
4172+
"drawStyle": "line",
4173+
"fillOpacity": 0,
4174+
"gradientMode": "none",
4175+
"hideFrom": {
4176+
"legend": false,
4177+
"tooltip": false,
4178+
"viz": false
4179+
},
4180+
"lineInterpolation": "linear",
4181+
"lineWidth": 1,
4182+
"pointSize": 5,
4183+
"scaleDistribution": {
4184+
"type": "linear"
4185+
},
4186+
"showPoints": "auto",
4187+
"spanNulls": false,
4188+
"stacking": {
4189+
"group": "A",
4190+
"mode": "none"
4191+
},
4192+
"thresholdsStyle": {
4193+
"mode": "off"
4194+
}
4195+
},
4196+
"mappings": [],
4197+
"thresholds": {
4198+
"mode": "absolute",
4199+
"steps": [
4200+
{
4201+
"color": "green"
4202+
}
4203+
]
4204+
}
4205+
},
4206+
"overrides": []
4207+
},
4208+
"gridPos": {
4209+
"h": 8,
4210+
"w": 12,
4211+
"x": 12,
4212+
"y": 216
4213+
},
4214+
"id": 271,
4215+
"options": {
4216+
"legend": {
4217+
"calcs": [],
4218+
"displayMode": "list",
4219+
"placement": "bottom",
4220+
"showLegend": true
4221+
},
4222+
"tooltip": {
4223+
"mode": "single",
4224+
"sort": "none"
4225+
}
4226+
},
4227+
"targets": [
4228+
{
4229+
"datasource": {
4230+
"type": "prometheus",
4231+
"uid": "${DS_PROMETHEUS}"
4232+
},
4233+
"editorMode": "code",
4234+
"expr": "increase(metrics_MetadataOperationStatusCount_Count{role=\"Worker\", instance=~\"${instance}\"}[1m])",
4235+
"legendFormat": "{{dbBackend}} {{operation}}/{{status}} ${baseLegend}",
4236+
"range": true,
4237+
"refId": "A"
4238+
}
4239+
],
4240+
"title": "metrics_MetadataOperationStatusCount_increase_1m",
4241+
"type": "timeseries"
40664242
}
40674243
],
40684244
"title": "Worker",

common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.celeborn.common.network.util;
1919

20-
import java.nio.channels.spi.SelectorProvider;
20+
import static org.apache.celeborn.common.network.util.IOMode.NIO;
21+
2122
import java.util.ArrayList;
2223
import java.util.Collections;
2324
import java.util.HashMap;
@@ -31,16 +32,17 @@
3132
import io.netty.buffer.PooledByteBufAllocator;
3233
import io.netty.buffer.UnpooledByteBufAllocator;
3334
import io.netty.channel.Channel;
34-
import io.netty.channel.DefaultSelectStrategyFactory;
3535
import io.netty.channel.EventLoopGroup;
36+
import io.netty.channel.IoHandlerFactory;
37+
import io.netty.channel.MultiThreadIoEventLoopGroup;
3638
import io.netty.channel.ServerChannel;
37-
import io.netty.channel.epoll.EpollEventLoopGroup;
39+
import io.netty.channel.epoll.EpollIoHandler;
3840
import io.netty.channel.epoll.EpollServerSocketChannel;
3941
import io.netty.channel.epoll.EpollSocketChannel;
40-
import io.netty.channel.kqueue.KQueueEventLoopGroup;
42+
import io.netty.channel.kqueue.KQueueIoHandler;
4143
import io.netty.channel.kqueue.KQueueServerSocketChannel;
4244
import io.netty.channel.kqueue.KQueueSocketChannel;
43-
import io.netty.channel.nio.NioEventLoopGroup;
45+
import io.netty.channel.nio.NioIoHandler;
4446
import io.netty.channel.socket.nio.NioServerSocketChannel;
4547
import io.netty.channel.socket.nio.NioSocketChannel;
4648
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -75,23 +77,29 @@ public static EventLoopGroup createEventLoop(
7577
IOMode mode, int numThreads, boolean conflictAvoidChooserEnable, String threadPrefix) {
7678
ThreadFactory threadFactory = createThreadFactory(threadPrefix);
7779

80+
IoHandlerFactory handlerFactory;
7881
switch (mode) {
7982
case NIO:
80-
return conflictAvoidChooserEnable
81-
? new NioEventLoopGroup(
82-
numThreads,
83-
new ThreadPerTaskExecutor(threadFactory),
84-
ConflictAvoidEventExecutorChooserFactory.INSTANCE,
85-
SelectorProvider.provider(),
86-
DefaultSelectStrategyFactory.INSTANCE)
87-
: new NioEventLoopGroup(numThreads, threadFactory);
83+
handlerFactory = NioIoHandler.newFactory();
84+
break;
8885
case EPOLL:
89-
return new EpollEventLoopGroup(numThreads, threadFactory);
86+
handlerFactory = EpollIoHandler.newFactory();
87+
break;
9088
case KQUEUE:
91-
return new KQueueEventLoopGroup(numThreads, threadFactory);
89+
handlerFactory = KQueueIoHandler.newFactory();
90+
break;
9291
default:
9392
throw new IllegalArgumentException("Unknown io mode: " + mode);
9493
}
94+
if (mode == NIO && conflictAvoidChooserEnable) {
95+
return new MultiThreadIoEventLoopGroup(
96+
numThreads,
97+
new ThreadPerTaskExecutor(threadFactory),
98+
ConflictAvoidEventExecutorChooserFactory.INSTANCE,
99+
handlerFactory);
100+
} else {
101+
return new MultiThreadIoEventLoopGroup(numThreads, threadFactory, handlerFactory);
102+
}
95103
}
96104

97105
/** Returns the correct (client) SocketChannel class based on IOMode. */

common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,34 @@
2222

2323
public class StorageInfo implements Serializable {
2424
public enum Type {
25-
MEMORY(0),
26-
HDD(1),
27-
SSD(2),
28-
HDFS(3),
29-
OSS(4),
30-
S3(5);
25+
MEMORY(0, false, MEMORY_MASK),
26+
HDD(1, false, LOCAL_DISK_MASK),
27+
SSD(2, false, LOCAL_DISK_MASK),
28+
HDFS(3, true, HDFS_MASK),
29+
OSS(4, true, OSS_MASK),
30+
S3(5, true, S3_MASK);
3131

3232
private final int value;
33+
private final boolean isDFS;
34+
private final int mask;
3335

34-
Type(int value) {
36+
Type(int value, boolean isDFS, int mask) {
3537
this.value = value;
38+
this.isDFS = isDFS;
39+
this.mask = mask;
3640
}
3741

3842
public int getValue() {
3943
return value;
4044
}
45+
46+
public boolean isDFS() {
47+
return isDFS;
48+
}
49+
50+
public int getMask() {
51+
return mask;
52+
}
4153
}
4254

4355
public static final Map<Integer, Type> typesMap = new HashMap<>();
@@ -232,6 +244,11 @@ public boolean S3Available() {
232244
return S3Available(availableStorageTypes);
233245
}
234246

247+
public static boolean isAvailable(Type type, int availableStorageTypes) {
248+
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
249+
|| (availableStorageTypes & type.getMask()) > 0;
250+
}
251+
235252
@Override
236253
public boolean equals(Object o) {
237254
if (this == o) return true;

0 commit comments

Comments
 (0)