Skip to content

Commit 744f4f1

Browse files
authored
Add IncKHop Alogrithm (#458) (#459)
1 parent cace54f commit 744f4f1

57 files changed

Lines changed: 2001 additions & 122 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,32 @@ public class ConnectorConfigKeys implements Serializable {
8989
.key("geaflow.dsl.skip.header")
9090
.defaultValue(false)
9191
.description("Whether skip the header for csv format.");
92+
93+
public static final ConfigKey GEAFLOW_DSL_SOURCE_FILE_PARALLEL_MOD = ConfigKeys
94+
.key("geaflow.dsl.source.file.parallel.mod")
95+
.defaultValue(false)
96+
.description("Whether read single file by index");
97+
98+
public static final ConfigKey GEAFLOW_DSL_SINK_FILE_COLLISION = ConfigKeys
99+
.key("geaflow.dsl.sink.file.collision")
100+
.defaultValue("newfile")
101+
.description("Whether create new file when collision occurs.");
102+
103+
public static final ConfigKey GEAFLOW_DSL_FILE_LINE_SPLIT_SIZE = ConfigKeys
104+
.key("geaflow.dsl.file.line.split.size")
105+
.defaultValue(-1)
106+
.description("file line split size set by user");
107+
108+
public static final ConfigKey GEAFLOW_DSL_SOURCE_ENABLE_UPLOAD_METRICS = ConfigKeys
109+
.key("geaflow.dsl.source.enable.upload.metrics")
110+
.defaultValue(true)
111+
.description("source enable upload metrics");
112+
113+
public static final ConfigKey GEAFLOW_DSL_SINK_ENABLE_SKIP = ConfigKeys
114+
.key("geaflow.dsl.sink.enable.skip")
115+
.defaultValue(false)
116+
.description("sink enable skip");
117+
92118
}
93119

94120

geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/DSLConfigKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public class DSLConfigKeys implements Serializable {
4545
.noDefaultValue()
4646
.description("The table type.");
4747

48+
public static final ConfigKey GEAFLOW_DSL_TABLE_PARALLELISM = ConfigKeys
49+
.key("geaflow.dsl.table.parallelism")
50+
.defaultValue(1)
51+
.description("The table parallelism.");
52+
4853
public static final ConfigKey GEAFLOW_DSL_MAX_TRAVERSAL = ConfigKeys
4954
.key("geaflow.dsl.max.traversal")
5055
.defaultValue(64)

geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/FrameworkConfigKeys.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
public class FrameworkConfigKeys implements Serializable {
2222

23+
private static final long serialVersionUID = 0L;
24+
2325
public static final ConfigKey ENABLE_EXTRA_OPTIMIZE = ConfigKeys
2426
.key("geaflow.extra.optimize.enable")
2527
.defaultValue(false)
@@ -151,5 +153,15 @@ public class FrameworkConfigKeys implements Serializable {
151153
.defaultValue(false)
152154
.description("whether enable iteration asp mode, disabled by default");
153155

156+
public static final ConfigKey ADD_INVOKE_VIDS_EACH_ITERATION = ConfigKeys
157+
.key("geaflow.add.invoke.vids.each.iteration")
158+
.defaultValue(true)
159+
.description("");
160+
161+
public static final ConfigKey UDF_MATERIALIZE_GRAPH_IN_FINISH = ConfigKeys
162+
.key("geaflow.udf.materialize.graph.in.finish")
163+
.defaultValue(false)
164+
.description("in dynmic graph, whether udf function materialize graph in finish");
165+
154166
}
155167

geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/driver/Driver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public Boolean executePipeline(Pipeline pipeline) {
103103
try {
104104
return future.get();
105105
} catch (Throwable e) {
106+
LOGGER.error(e.getMessage(), e);
106107
throw new GeaflowRuntimeException(e);
107108
}
108109
}

geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/AbstractSlice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public PipelineSliceReader createSliceReader(long startBatchId, PipelineSliceLis
5959
throw new GeaflowRuntimeException("slice is already created:" + sliceId);
6060
}
6161

62-
LOGGER.info("creating reader for {} {} with startBatch:{}",
62+
LOGGER.debug("creating reader for {} {} with startBatch:{}",
6363
taskLogTag, sliceId, startBatchId);
6464

6565
sliceReader = new DisposableSliceReader(this, startBatchId, listener);

geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/PipelineSlice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void release() {
141141
isReleased = true;
142142
}
143143

144-
LOGGER.info("{}: released {} with bufferSize:{}", taskLogTag, sliceId, bufferSize);
144+
LOGGER.debug("{}: released {} with bufferSize:{}", taskLogTag, sliceId, bufferSize);
145145
if (reader != null) {
146146
reader.release();
147147
}

geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SliceManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void register(SliceId sliceId, IPipelineSlice slice) {
3636
if (this.slices.containsKey(sliceId)) {
3737
throw new GeaflowRuntimeException("slice already registered: " + sliceId);
3838
}
39-
LOGGER.info("register slice {} {}", sliceId, slice.getClass().getSimpleName());
39+
LOGGER.debug("register slice {} {}", sliceId, slice.getClass().getSimpleName());
4040
this.slices.put(sliceId, slice);
4141
synchronized (this.pipeline2slices) {
4242
long pipelineId = sliceId.getWriterId().getPipelineId();

geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/cache/TemporaryGraphCache.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616

1717
import com.antgroup.geaflow.model.graph.edge.IEdge;
1818
import com.antgroup.geaflow.model.graph.vertex.IVertex;
19-
import java.util.ArrayList;
20-
import java.util.HashMap;
21-
import java.util.HashSet;
22-
import java.util.List;
23-
import java.util.Map;
24-
import java.util.Set;
19+
import java.util.*;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2522

2623
public class TemporaryGraphCache<K, VV, EV> {
2724

25+
private static final Logger LOGGER = LoggerFactory.getLogger(TemporaryGraphCache.class);
26+
2827
private final Set<K> vertexIds;
2928
private final Map<K, IVertex<K, VV>> vertices;
3029
private final Map<K, List<IEdge<K, EV>>> vertexEdges;

geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/com/antgroup/geaflow/operator/impl/graph/traversal/dynamic/AbstractDynamicGraphVertexCentricTraversalOp.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.antgroup.geaflow.api.graph.function.vc.IncVertexCentricTraversalFunction.IncVertexCentricTraversalFuncContext;
2121
import com.antgroup.geaflow.api.graph.function.vc.IncVertexCentricTraversalFunction.TraversalHistoricalGraph;
2222
import com.antgroup.geaflow.collector.ICollector;
23+
import com.antgroup.geaflow.common.config.Configuration;
24+
import com.antgroup.geaflow.common.config.keys.FrameworkConfigKeys;
2325
import com.antgroup.geaflow.model.graph.message.DefaultGraphMessage;
2426
import com.antgroup.geaflow.model.graph.message.IGraphMessage;
2527
import com.antgroup.geaflow.model.record.RecordArgs.GraphRecordNames;
@@ -53,6 +55,7 @@ public abstract class AbstractDynamicGraphVertexCentricTraversalOp<K, VV, EV, M,
5355
protected IncGraphVCTraversalCtxImpl graphVCTraversalCtx;
5456
protected IncVertexCentricTraversalFunction<K, VV, EV, M, R> incVcTraversalFunction;
5557

58+
protected boolean addInvokeVIdsEachIteration = false;
5659
protected Set<K> invokeVIds;
5760
protected List<ITraversalResponse<R>> responses;
5861

@@ -76,6 +79,8 @@ public void open(OpContext opContext) {
7679
this.graphVCTraversalCtx = new IncGraphVCTraversalCtxImpl(getIdentify(), messageCollector);
7780
this.incVcTraversalFunction.open(this.graphVCTraversalCtx);
7881

82+
this.addInvokeVIdsEachIteration = Configuration.getBoolean(FrameworkConfigKeys.ADD_INVOKE_VIDS_EACH_ITERATION,
83+
opContext.getRuntimeContext().getConfiguration().getConfigMap());
7984
this.invokeVIds = new HashSet<>();
8085
this.responses = new ArrayList<>();
8186

@@ -103,7 +108,9 @@ public void doFinishIteration(long iterations) {
103108
this.graphMsgBox.processInMessage(new MsgProcessFunc<K, M>() {
104109
@Override
105110
public void process(K vertexId, List<M> messages) {
106-
invokeVIds.add(vertexId);
111+
if (addInvokeVIdsEachIteration) {
112+
invokeVIds.add(vertexId);
113+
}
107114
graphVCTraversalCtx.init(iterations, vertexId);
108115
incVcTraversalFunction.compute(vertexId, messages.iterator());
109116
}

geaflow/geaflow-dsl/geaflow-dsl-common/src/main/java/com/antgroup/geaflow/dsl/common/algo/AlgorithmRuntimeContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.antgroup.geaflow.dsl.common.algo;
1616

17+
import com.antgroup.geaflow.common.config.Configuration;
1718
import com.antgroup.geaflow.dsl.common.data.Row;
1819
import com.antgroup.geaflow.dsl.common.data.RowEdge;
1920
import com.antgroup.geaflow.dsl.common.types.GraphSchema;
@@ -24,6 +25,10 @@ public interface AlgorithmRuntimeContext<K, M> {
2425

2526
List<RowEdge> loadEdges(EdgeDirection direction);
2627

28+
List<RowEdge> loadStaticEdges(EdgeDirection direction);
29+
30+
List<RowEdge> loadDynamicEdges(EdgeDirection direction);
31+
2732
void sendMessage(K vertexId, M message);
2833

2934
void updateVertexValue(Row value);
@@ -33,4 +38,6 @@ public interface AlgorithmRuntimeContext<K, M> {
3338
long getCurrentIterationId();
3439

3540
GraphSchema getGraphSchema();
41+
42+
Configuration getConfig();
3643
}

0 commit comments

Comments
 (0)