Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ codecov:
coverage:
precision: 2
round: down
range: "75...100"
range: "70...100"
status:
project:
default:
target: 75% # the required coverage value
target: 70% # the required coverage value
threshold: 1% # the leniency in hitting the target

# Specify coverage report paths for multi-project build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,4 @@ public String getJobIndexName() {
public String getJobId() {
return this.jobId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.spi;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

public final class StatusHistoryModel implements ToXContentObject, Writeable {
public static final String JOB_INDEX_NAME = "job_index_name";
public static final String JOB_ID = "job_id";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String COMPLETION_STATUS = "completion_status";

private final String jobIndexName;
private final String jobId;
private final Instant startTime;
private final Instant endTime;
private final int status;
private final long seqNo;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep track of seqNo and primaryTerm. I'd prefer to only keep references to the new logic being introduced and not keep track of these metadata fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design adds to the history list when the lock is created and updates the entry when the lock in released. If the sequence number and primary term are not tracked the history service should be changed to append only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its necessary to track these, but I suppose it doesn't hurt either.

FYI I see that the usage is here in the LockService.

UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME)
                .id(updateLock.getLockId())
                .setIfSeqNo(updateLock.getSeqNo())
                .setIfPrimaryTerm(updateLock.getPrimaryTerm())
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                .doc(updateLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
                .fetchSource(true);

According to the javadoc, this is the purpose:

/**
   * only perform this update request if the document's modification was assigned the given
   * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
   *
   * If the document last modification was assigned a different sequence number a
   * {@link org.opensearch.index.engine.VersionConflictEngineException} will be thrown.
   */

private final long primaryTerm;

public StatusHistoryModel(String jobIndexName, String jobId, Instant startTime, Instant endTime, int status) {
this(jobIndexName, jobId, startTime, endTime, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public StatusHistoryModel(
String jobIndexName,
String jobId,
Instant startTime,
Instant endTime,
int status,
long seqNo,
long primaryTerm
) {
this.jobIndexName = jobIndexName;
this.jobId = jobId;
this.startTime = startTime;
this.endTime = endTime;
this.status = status;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public StatusHistoryModel(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readInstant(), in.readOptionalInstant(), in.readInt(), in.readLong(), in.readLong());
}

public static StatusHistoryModel parse(final XContentParser parser, long seqNo, long primaryTerm) throws IOException {
String jobIndexName = null;
String jobId = null;
Instant startTime = null;
Instant endTime = null;
Integer status = null;

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case JOB_INDEX_NAME:
jobIndexName = parser.text();
break;
case JOB_ID:
jobId = parser.text();
break;
case START_TIME:
startTime = Instant.ofEpochSecond(parser.longValue());
break;
case END_TIME:
if (parser.currentToken() != XContentParser.Token.VALUE_NULL) {
endTime = Instant.ofEpochSecond(parser.longValue());
}
break;
case COMPLETION_STATUS:
status = parser.intValue();
break;
default:
throw new IllegalArgumentException("Unknown field " + fieldName);
}
}

return new StatusHistoryModel(
requireNonNull(jobIndexName, "JobIndexName cannot be null"),
requireNonNull(jobId, "JobId cannot be null"),
requireNonNull(startTime, "startTime cannot be null"),
endTime,
requireNonNull(status, "status cannot be null"),
seqNo,
primaryTerm
);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject()
.field(JOB_INDEX_NAME, this.jobIndexName)
.field(JOB_ID, this.jobId)
.field(START_TIME, this.startTime.getEpochSecond())
.field(COMPLETION_STATUS, this.status);

if (this.endTime != null) {
builder.field(END_TIME, this.endTime.getEpochSecond());
} else {
builder.nullField(END_TIME);
}

return builder.endObject();
}

public String getJobIndexName() {
return jobIndexName;
}

public String getJobId() {
return jobId;
}

public Instant getStartTime() {
return startTime;
}

public Instant getEndTime() {
return endTime;
}

public int getStatus() {
return status;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StatusHistoryModel that = (StatusHistoryModel) o;
return seqNo == that.seqNo
&& primaryTerm == that.primaryTerm
&& jobIndexName.equals(that.jobIndexName)
&& jobId.equals(that.jobId)
&& startTime.equals(that.startTime)
&& Objects.equals(endTime, that.endTime)
&& status == that.status;
}

@Override
public int hashCode() {
return Objects.hash(jobIndexName, jobId, startTime, endTime, status, seqNo, primaryTerm);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.jobIndexName);
out.writeString(this.jobId);
out.writeInstant(this.startTime);
out.writeOptionalInstant(this.endTime);
out.writeInt(this.status);
out.writeLong(this.seqNo);
out.writeLong(this.primaryTerm);
}
}
Loading
Loading