Skip to content

Commit

Permalink
HADOOP-19270 Use stable sort in commandQueue (#7038)
Browse files Browse the repository at this point in the history
  • Loading branch information
geatrigger authored Mar 4, 2025
1 parent f552664 commit 103b054
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface AuditCommandParser {

/**
* Initialize this parser with the given configuration. Guaranteed to be
* called prior to any calls to {@link #parse(Text, Function)}.
* called prior to any calls to {@link #parse(Long, Text, Function)}.
*
* @param conf The Configuration to be used to set up this parser.
* @throws IOException if error on initializing a parser.
Expand All @@ -46,14 +46,15 @@ public interface AuditCommandParser {
* between the start of the audit log and this command) into absolute
* timestamps.
*
* @param sequence Sequence order of input line.
* @param inputLine Single input line to convert.
* @param relativeToAbsolute Function converting relative timestamps
* (in milliseconds) to absolute timestamps
* (in milliseconds).
* @return A command representing the input line.
* @throws IOException if error on parsing.
*/
AuditReplayCommand parse(Text inputLine,
AuditReplayCommand parse(Long sequence, Text inputLine,
Function<Long, Long> relativeToAbsolute) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void initialize(Configuration conf) throws IOException {
}

@Override
public AuditReplayCommand parse(Text inputLine,
public AuditReplayCommand parse(Long sequence, Text inputLine,
Function<Long, Long> relativeToAbsolute) throws IOException {
Matcher m = logLineParseRegex.matcher(inputLine.toString());
if (!m.find()) {
Expand Down Expand Up @@ -147,7 +147,8 @@ public AuditReplayCommand parse(Text inputLine,
}
}

return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp),
return new AuditReplayCommand(sequence,
relativeToAbsolute.apply(relativeTimestamp),
// Split the UGI on space to remove the auth and proxy portions of it
SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(),
parameterMap.get("cmd").replace("(options:", "(options="),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public void initialize(Configuration conf) throws IOException {
}

@Override
public AuditReplayCommand parse(Text inputLine,
public AuditReplayCommand parse(Long sequence, Text inputLine,
Function<Long, Long> relativeToAbsolute) throws IOException {
String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
long absoluteTimestamp = relativeToAbsolute
.apply(Long.parseLong(fields[0]));
return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2],
return new AuditReplayCommand(sequence, absoluteTimestamp, fields[1], fields[2],
fields[3], fields[4], fields[5]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ class AuditReplayCommand implements Delayed {
private static final Pattern SIMPLE_UGI_PATTERN = Pattern
.compile("([^/@ ]*).*?");

private Long sequence;
private long absoluteTimestamp;
private String ugi;
private String command;
private String src;
private String dest;
private String sourceIP;

AuditReplayCommand(long absoluteTimestamp, String ugi, String command,
AuditReplayCommand(Long sequence, long absoluteTimestamp, String ugi, String command,
String src, String dest, String sourceIP) {
this.sequence = sequence;
this.absoluteTimestamp = absoluteTimestamp;
this.ugi = ugi;
this.command = command;
Expand All @@ -60,6 +62,9 @@ class AuditReplayCommand implements Delayed {
this.sourceIP = sourceIP;
}

Long getSequence() {
return sequence;
}
long getAbsoluteTimestamp() {
return absoluteTimestamp;
}
Expand Down Expand Up @@ -103,8 +108,12 @@ public long getDelay(TimeUnit unit) {

@Override
public int compareTo(Delayed o) {
return Long.compare(absoluteTimestamp,
((AuditReplayCommand) o).absoluteTimestamp);
int result = Long.compare(absoluteTimestamp,
((AuditReplayCommand) o).absoluteTimestamp);
if (result != 0) {
return result;
}
return Long.compare(sequence, ((AuditReplayCommand) o).sequence);
}

/**
Expand All @@ -122,9 +131,10 @@ boolean isPoison() {
* information besides a timestamp; other getter methods wil return null.
*/
private static final class PoisonPillCommand extends AuditReplayCommand {
private static final Long DEFAULT_SEQUENCE = -1L;

private PoisonPillCommand(long absoluteTimestamp) {
super(absoluteTimestamp, null, null, null, null, null);
super(DEFAULT_SEQUENCE, absoluteTimestamp, null, null, null, null, null);
}

@Override
Expand All @@ -144,9 +154,9 @@ public boolean equals(Object other) {
return false;
}
AuditReplayCommand o = (AuditReplayCommand) other;
return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi)
&& command.equals(o.command) && src.equals(o.src) && dest.equals(o.dest)
&& sourceIP.equals(o.sourceIP);
return sequence.equals(o.sequence) && absoluteTimestamp == o.absoluteTimestamp
&& ugi.equals(o.ugi) && command.equals(o.command) && src.equals(o.src)
&& dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
}

@Override
Expand All @@ -156,8 +166,8 @@ public int hashCode() {

@Override
public String toString() {
return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, "
+ "command=%s, src=%s, dest=%s, sourceIP=%s",
absoluteTimestamp, ugi, command, src, dest, sourceIP);
return String.format("AuditReplayCommand(sequence=%d, absoluteTimestamp=%d, "
+ "ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s",
sequence, absoluteTimestamp, ugi, command, src, dest, sourceIP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public enum CommandType {
private int numThreads;
private double rateFactor;
private long highestTimestamp;
private Long highestSequence;
private List<AuditReplayThread> threads;
private DelayQueue<AuditReplayCommand> commandQueue;
private Function<Long, Long> relativeToAbsoluteTimestamp;
Expand Down Expand Up @@ -246,7 +247,7 @@ public void setup(final Mapper.Context context) throws IOException {
@Override
public void map(LongWritable lineNum, Text inputLine, Mapper.Context context)
throws IOException, InterruptedException {
AuditReplayCommand cmd = commandParser.parse(inputLine,
AuditReplayCommand cmd = commandParser.parse(lineNum.get(), inputLine,
relativeToAbsoluteTimestamp);
long delay = cmd.getDelay(TimeUnit.MILLISECONDS);
// Prevent from loading too many elements into memory all at once
Expand All @@ -255,6 +256,7 @@ public void map(LongWritable lineNum, Text inputLine, Mapper.Context context)
}
commandQueue.put(cmd);
highestTimestamp = cmd.getAbsoluteTimestamp();
highestSequence = cmd.getSequence();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TestAuditLogDirectParser {

private static final long START_TIMESTAMP = 10000;
private AuditLogDirectParser parser;
private Long sequence = 1L;

@Before
public void setup() throws Exception {
Expand All @@ -53,55 +54,55 @@ private Text getAuditString(String timestamp, String ugi, String cmd,
public void testSimpleInput() throws Exception {
Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
"listStatus", "sourcePath", "null");
AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
"listStatus", "sourcePath", "null", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
public void testInputWithEquals() throws Exception {
Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
"listStatus", "day=1970", "null");
AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
"listStatus", "day=1970", "null", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
public void testInputWithRenameOptions() throws Exception {
Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
"rename (options=[TO_TRASH])", "sourcePath", "destPath");
AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
"rename (options=[TO_TRASH])", "sourcePath", "destPath", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
public void testInputWithTokenAuth() throws Exception {
Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser (auth:TOKEN)",
"create", "sourcePath", "null");
AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser",
"create", "sourcePath", "null", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
public void testInputWithProxyUser() throws Exception {
Text in = getAuditString("1970-01-01 00:00:11,000",
"proxyUser (auth:TOKEN) via fakeUser", "create", "sourcePath", "null");
AuditReplayCommand expected = new AuditReplayCommand(1000, "proxyUser",
"create", "sourcePath", "null", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000,
"proxyUser", "create", "sourcePath", "null", "0.0.0.0");
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
public void testParseDefaultDateFormat() throws Exception {
Text in = getAuditString("1970-01-01 13:00:00,000",
"ignored", "ignored", "ignored", "ignored");
AuditReplayCommand expected = new AuditReplayCommand(
AuditReplayCommand expected = new AuditReplayCommand(sequence,
13 * 60 * 60 * 1000 - START_TIMESTAMP,
"ignored", "ignored", "ignored", "ignored", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
Expand All @@ -114,9 +115,9 @@ public void testParseCustomDateFormat() throws Exception {
parser.initialize(conf);
Text in = getAuditString("1970-01-01 01:00:00,000 PM",
"ignored", "ignored", "ignored", "ignored");
AuditReplayCommand expected = new AuditReplayCommand(13 * 60 * 60 * 1000,
AuditReplayCommand expected = new AuditReplayCommand(sequence, 13 * 60 * 60 * 1000,
"ignored", "ignored", "ignored", "ignored", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
Expand All @@ -128,9 +129,9 @@ public void testParseCustomTimeZone() throws Exception {
parser.initialize(conf);
Text in = getAuditString("1970-01-01 01:00:00,000",
"ignored", "ignored", "ignored", "ignored");
AuditReplayCommand expected = new AuditReplayCommand(0,
AuditReplayCommand expected = new AuditReplayCommand(sequence, 0,
"ignored", "ignored", "ignored", "ignored", "0.0.0.0");
assertEquals(expected, parser.parse(in, Function.identity()));
assertEquals(expected, parser.parse(sequence, in, Function.identity()));
}

@Test
Expand All @@ -144,9 +145,9 @@ public void testParseCustomAuditLineFormat() throws Exception {
conf.set(AuditLogDirectParser.AUDIT_LOG_PARSE_REGEX_KEY,
"CUSTOM FORMAT \\((?<timestamp>.+?)\\) (?<message>.+)");
parser.initialize(conf);
AuditReplayCommand expected = new AuditReplayCommand(0,
AuditReplayCommand expected = new AuditReplayCommand(sequence, 0,
"fakeUser", "fakeCommand", "src", "null", "0.0.0.0");
assertEquals(expected, parser.parse(auditLine, Function.identity()));
assertEquals(expected, parser.parse(sequence, auditLine, Function.identity()));
}

}

0 comments on commit 103b054

Please sign in to comment.