Skip to content

Commit ec9b3bf

Browse files
authored
refactor: apply some Java best practices (#181)
Signed-off-by: Keran Yang <[email protected]>
1 parent 1abeb29 commit ec9b3bf

34 files changed

+1016
-1094
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
package io.numaproj.numaflow.accumulator;
22

33
class Constants {
4-
public static final String SUCCESS = "SUCCESS";
5-
6-
public static final String DELIMITER = ":";
7-
8-
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
9-
10-
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/accumulator.sock";
11-
12-
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info";
13-
14-
public static final int DEFAULT_PORT = 50051;
15-
16-
public static final String DEFAULT_HOST = "localhost";
17-
18-
public static final String EOF = "EOF";
4+
public static final String SUCCESS = "SUCCESS";
5+
public static final String DELIMITER = ":";
6+
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
7+
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/accumulator.sock";
8+
public static final String DEFAULT_SERVER_INFO_FILE_PATH =
9+
"/var/run/numaflow/accumulator-server-info";
10+
public static final int DEFAULT_PORT = 50051;
11+
public static final String DEFAULT_HOST = "localhost";
12+
public static final String EOF = "EOF";
13+
14+
// Private constructor to prevent instantiation
15+
private Constants() {
16+
throw new IllegalStateException("Utility class 'Constants' should not be instantiated");
17+
}
1918
}
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,61 @@
11
package io.numaproj.numaflow.accumulator.model;
22

3-
import lombok.Getter;
4-
53
import java.time.Instant;
64
import java.util.Map;
5+
import lombok.Getter;
76

8-
/**
9-
* Message is used to wrap the data returned by Accumulator functions.
10-
*/
7+
/** Message is used to wrap the data returned by Accumulator functions. */
118
@Getter
129
public class Message {
13-
private final Instant eventTime;
14-
private final Instant watermark;
15-
private final Map<String, String> headers;
16-
private final String id;
17-
private String[] keys;
18-
private byte[] value;
19-
private String[] tags;
10+
private final Instant eventTime;
11+
private final Instant watermark;
12+
private final Map<String, String> headers;
13+
private final String id;
14+
private String[] keys;
15+
private byte[] value;
16+
private String[] tags;
2017

21-
/**
22-
* Constructor for constructing message from Datum, it is advised to use the
23-
* incoming Datum to construct the message, because event time, watermark, id
24-
* and headers of the message are derived from the Datum. Only use custom
25-
* implementation of the Datum if you know what you are doing.
26-
*
27-
* @param datum {@link Datum} object
28-
*/
29-
public Message(Datum datum) {
30-
this.keys = datum.getKeys();
31-
this.value = datum.getValue();
32-
this.headers = datum.getHeaders();
33-
this.eventTime = datum.getEventTime();
34-
this.watermark = datum.getWatermark();
35-
this.id = datum.getID();
36-
this.tags = null;
37-
}
18+
/**
19+
* Constructor for constructing message from Datum, it is advised to use the incoming Datum to
20+
* construct the message, because event time, watermark, id and headers of the message are derived
21+
* from the Datum. Only use custom implementation of the Datum if you know what you are doing.
22+
*
23+
* @param datum {@link Datum} object
24+
*/
25+
public Message(Datum datum) {
26+
this.keys = datum.getKeys();
27+
this.value = datum.getValue();
28+
this.headers = datum.getHeaders();
29+
this.eventTime = datum.getEventTime();
30+
this.watermark = datum.getWatermark();
31+
this.id = datum.getID();
32+
this.tags = null;
33+
}
3834

39-
/*
40-
* sets the value of the message
41-
*
42-
* @param value byte array of the value
43-
*/
44-
public void setValue(byte[] value) {
45-
this.value = value;
46-
}
35+
/*
36+
* sets the value of the message
37+
*
38+
* @param value byte array of the value
39+
*/
40+
public void setValue(byte[] value) {
41+
this.value = value;
42+
}
4743

48-
/*
49-
* sets the keys of the message
50-
*
51-
* @param keys string array of the keys
52-
*/
53-
public void setKeys(String[] keys) {
54-
this.keys = keys;
55-
}
44+
/*
45+
* sets the keys of the message
46+
*
47+
* @param keys string array of the keys
48+
*/
49+
public void setKeys(String[] keys) {
50+
this.keys = keys;
51+
}
5652

57-
/*
58-
* sets the tags of the message, tags are used for conditional forwarding
59-
*
60-
* @param tags string array of the tags
61-
*/
62-
public void setTags(String[] tags) {
63-
this.tags = tags;
64-
}
53+
/*
54+
* sets the tags of the message, tags are used for conditional forwarding
55+
*
56+
* @param tags string array of the tags
57+
*/
58+
public void setTags(String[] tags) {
59+
this.tags = tags;
60+
}
6561
}
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
package io.numaproj.numaflow.batchmapper;
22

33
class Constants {
4-
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
5-
6-
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/batchmap.sock";
7-
8-
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info";
9-
10-
public static final int DEFAULT_PORT = 50051;
11-
12-
public static final String DEFAULT_HOST = "localhost";
13-
14-
public static final String SUCCESS = "SUCCESS";
15-
16-
public static final String MAP_MODE_KEY = "MAP_MODE";
17-
18-
public static final String MAP_MODE = "batch-map";
4+
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
5+
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/batchmap.sock";
6+
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info";
7+
public static final int DEFAULT_PORT = 50051;
8+
public static final String DEFAULT_HOST = "localhost";
9+
public static final String SUCCESS = "SUCCESS";
10+
public static final String MAP_MODE_KEY = "MAP_MODE";
11+
public static final String MAP_MODE = "batch-map";
12+
13+
// Private constructor to prevent instantiation
14+
private Constants() {
15+
throw new IllegalStateException("Utility class 'Constants' should not be instantiated");
16+
}
1917
}
20-

src/main/java/io/numaproj/numaflow/batchmapper/Message.java

+47-51
Original file line numberDiff line numberDiff line change
@@ -2,57 +2,53 @@
22

33
import lombok.Getter;
44

5-
/**
6-
* Message is used to wrap the data returned by Mapper.
7-
*/
8-
5+
/** Message is used to wrap the data returned by Mapper. */
96
@Getter
107
public class Message {
11-
public static final String DROP = "U+005C__DROP__";
12-
13-
private final String[] keys;
14-
private final byte[] value;
15-
private final String[] tags;
16-
17-
18-
/**
19-
* used to create Message with value, keys and tags(used for conditional forwarding)
20-
*
21-
* @param value message value
22-
* @param keys message keys
23-
* @param tags message tags which will be used for conditional forwarding
24-
*/
25-
public Message(byte[] value, String[] keys, String[] tags) {
26-
this.keys = keys;
27-
this.value = value;
28-
this.tags = tags;
29-
}
30-
31-
/**
32-
* used to create Message with value.
33-
*
34-
* @param value message value
35-
*/
36-
public Message(byte[] value) {
37-
this(value, null, null);
38-
}
39-
40-
/**
41-
* used to create Message with value and keys.
42-
*
43-
* @param value message value
44-
* @param keys message keys
45-
*/
46-
public Message(byte[] value, String[] keys) {
47-
this(value, keys, null);
48-
}
49-
50-
/**
51-
* creates a Message which will be dropped
52-
*
53-
* @return returns the Message which will be dropped
54-
*/
55-
public static Message toDrop() {
56-
return new Message(new byte[0], null, new String[]{DROP});
57-
}
8+
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
9+
private final String[] keys;
10+
private final byte[] value;
11+
private final String[] tags;
12+
13+
/**
14+
* used to create Message with value, keys and tags(used for conditional forwarding)
15+
*
16+
* @param value message value
17+
* @param keys message keys
18+
* @param tags message tags which will be used for conditional forwarding
19+
*/
20+
public Message(byte[] value, String[] keys, String[] tags) {
21+
// defensive copy - once the Message is created, the caller should not be able to modify it.
22+
this.keys = keys == null ? null : keys.clone();
23+
this.value = value == null ? null : value.clone();
24+
this.tags = tags == null ? null : tags.clone();
25+
}
26+
27+
/**
28+
* used to create Message with value.
29+
*
30+
* @param value message value
31+
*/
32+
public Message(byte[] value) {
33+
this(value, null, null);
34+
}
35+
36+
/**
37+
* used to create Message with value and keys.
38+
*
39+
* @param value message value
40+
* @param keys message keys
41+
*/
42+
public Message(byte[] value, String[] keys) {
43+
this(value, keys, null);
44+
}
45+
46+
/**
47+
* creates a Message which will be dropped
48+
*
49+
* @return returns the Message which will be dropped
50+
*/
51+
public static Message toDrop() {
52+
return new Message(new byte[0], null, DROP_TAGS);
53+
}
5854
}

src/main/java/io/numaproj/numaflow/info/ServerInfo.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package io.numaproj.numaflow.info;
22

3+
import static java.util.Map.entry;
4+
35
import com.fasterxml.jackson.annotation.JsonProperty;
6+
import java.util.Map;
47
import lombok.AllArgsConstructor;
58
import lombok.Getter;
69
import lombok.NoArgsConstructor;
710
import lombok.Setter;
811

9-
import java.util.Map;
10-
11-
import static java.util.Map.entry;
12-
1312
/**
1413
* Server Information to be used by client to determine:
1514
* - protocol: what is right protocol to use (UDS or TCP)
@@ -22,7 +21,7 @@
2221
@Setter
2322
@NoArgsConstructor
2423
@AllArgsConstructor
25-
public class ServerInfo {
24+
public final class ServerInfo {
2625
// Specify the minimum Numaflow version required by the current SDK version
2726
// To update this value, please follow the instructions for
2827
// MINIMUM_NUMAFLOW_VERSION in

src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java

-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package io.numaproj.numaflow.info;
22

33
public interface ServerInfoAccessor {
4-
String INFO_EOF = "U+005C__END__";
5-
64
/**
75
* Get current runtime numaflow-java SDK version.
86
*/
@@ -11,7 +9,6 @@ public interface ServerInfoAccessor {
119
/**
1210
* Delete filePath if it exists.
1311
* Write serverInfo to filePath in Json format.
14-
* Append {@link ServerInfoAccessor#INFO_EOF} as a new line to indicate end of file.
1512
*
1613
* @param serverInfo server information POJO
1714
* @param filePath file path to write to

0 commit comments

Comments
 (0)