Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
Patrick Stuedi <stu@zurich.ibm.com>
Adrian Schuepbach <dri@zurich.ibm.com>
Jonas Pfefferle <jpf@zurich.ibm.com>
Animesh Trivedi <atr@zurich.ibm.com>

8 changes: 6 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>com.ibm.darpc</groupId>
<artifactId>darpc</artifactId>
<packaging>jar</packaging>
<version>1.3</version>
<version>1.4</version>
<name>darpc</name>
<description>DaRPC (Data Center RPC) is a Java library for low latency Remote Procedure Call (RPC)</description>
<url>http://github.com/zrlio/darpc</url>
Expand All @@ -30,7 +30,11 @@
<name>Animesh Trivedi</name>
<email>atr@zurich.ibm.com</email>
</developer>
</developers>
<developer>
<name>Adrian Schuepbach</name>
<email>dri@zurich.ibm.com</email>
</developer>
</developers>

<scm>
<connection>scm:git:git://github.com/zrlio/darpc.git</connection>
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/com/ibm/darpc/DaRPCClientGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
import com.ibm.disni.rdma.verbs.RdmaCmId;

public class DaRPCClientGroup<R extends DaRPCMessage, T extends DaRPCMessage> extends DaRPCEndpointGroup<DaRPCClientEndpoint<R,T>, R, T> {
public static <R extends DaRPCMessage, T extends DaRPCMessage> DaRPCClientGroup<R, T> createClientGroup(DaRPCProtocol<R, T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
DaRPCClientGroup<R,T> group = new DaRPCClientGroup<R,T>(protocol, timeout, maxinline, recvQueue, sendQueue);
public static <R extends DaRPCMessage, T extends DaRPCMessage> DaRPCClientGroup<R, T> createClientGroup(DaRPCProtocol<R, T> protocol, DaRPCMemPool<DaRPCClientEndpoint<R,T>, R, T> memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
DaRPCClientGroup<R,T> group = new DaRPCClientGroup<R,T>(protocol, memPool, timeout, maxinline, recvQueue, sendQueue);
group.init(new RpcClientFactory<R,T>(group));
return group;
}
private DaRPCClientGroup(DaRPCProtocol<R, T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue)
}

private DaRPCClientGroup(DaRPCProtocol<R, T> protocol, DaRPCMemPool<DaRPCClientEndpoint<R,T>, R, T> memPool, int timeout, int maxinline, int recvQueue, int sendQueue)
throws Exception {
super(protocol, timeout, maxinline, recvQueue, sendQueue);
super(protocol, memPool, timeout, maxinline, recvQueue, sendQueue);
}


@Override
public void allocateResources(DaRPCClientEndpoint<R, T> endpoint) throws Exception {
Expand All @@ -38,18 +38,18 @@ public IbvQP createQpProvider(DaRPCClientEndpoint<R, T> endpoint) throws IOExcep
IbvQP qp = this.createQP(endpoint.getIdPriv(), endpoint.getPd(), cq);
return qp;
}

public static class RpcClientFactory<R extends DaRPCMessage, T extends DaRPCMessage> implements RdmaEndpointFactory<DaRPCClientEndpoint<R,T>> {
private DaRPCClientGroup<R, T> group;

public RpcClientFactory(DaRPCClientGroup<R, T> group){
this.group = group;
}

@Override
public DaRPCClientEndpoint<R,T> createEndpoint(RdmaCmId id, boolean serverSide) throws IOException {
return new DaRPCClientEndpoint<R,T>(group, id, serverSide);
}
}
}

}
129 changes: 59 additions & 70 deletions src/main/java/com/ibm/darpc/DaRPCEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
Expand All @@ -39,113 +38,103 @@

public abstract class DaRPCEndpoint<R extends DaRPCMessage, T extends DaRPCMessage> extends RdmaEndpoint {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private static final int headerSize = 4;
static final int HEADERSIZE = 4; //size of the ticket

public abstract void dispatchReceive(ByteBuffer buffer, int ticket, int recvIndex) throws IOException;
public abstract void dispatchSend(int ticket) throws IOException;

private DaRPCEndpointGroup<? extends DaRPCEndpoint<R,T>, R, T> rpcGroup;
private ByteBuffer dataBuffer;
private IbvMr dataMr;
private ByteBuffer receiveBuffer;
private ByteBuffer sendBuffer;
private ByteBuffer[] recvBufs;
private ByteBuffer[] sendBufs;
private SVCPostRecv[] recvCall;
private SVCPostSend[] sendCall;
private ConcurrentHashMap<Integer, SVCPostSend> pendingPostSend;
private ArrayBlockingQueue<SVCPostSend> freePostSend;
private AtomicLong ticketCount;
private int pipelineLength;
final private int sendPipelineLength;
final private int recvPipelineLength;
private int payloadSize;
private int rawBufferSize;
private int maxinline;
private AtomicLong messagesSent;
private AtomicLong messagesReceived;


public DaRPCEndpoint(DaRPCEndpointGroup<? extends DaRPCEndpoint<R,T>, R, T> endpointGroup, RdmaCmId idPriv, boolean serverSide) throws IOException {
super(endpointGroup, idPriv, serverSide);
this.rpcGroup = endpointGroup;
this.maxinline = rpcGroup.getMaxInline();
this.payloadSize = rpcGroup.getBufferSize();
this.rawBufferSize = headerSize + this.payloadSize;
this.pipelineLength = rpcGroup.recvQueueSize();
this.freePostSend = new ArrayBlockingQueue<SVCPostSend>(pipelineLength);
this.rawBufferSize = HEADERSIZE + this.payloadSize;
this.sendPipelineLength = rpcGroup.sendQueueSize();
this.recvPipelineLength = rpcGroup.recvQueueSize();
this.freePostSend = new ArrayBlockingQueue<SVCPostSend>(sendPipelineLength);
this.pendingPostSend = new ConcurrentHashMap<Integer, SVCPostSend>();
this.recvBufs = new ByteBuffer[pipelineLength];
this.sendBufs = new ByteBuffer[pipelineLength];
this.recvCall = new SVCPostRecv[pipelineLength];
this.sendCall = new SVCPostSend[pipelineLength];
this.recvBufs = new ByteBuffer[recvPipelineLength];
this.sendBufs = new ByteBuffer[sendPipelineLength];
this.recvCall = new SVCPostRecv[recvPipelineLength];
this.sendCall = new SVCPostSend[sendPipelineLength];
this.ticketCount = new AtomicLong(0);
this.messagesSent = new AtomicLong(0);
this.messagesReceived = new AtomicLong(0);
logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", pipeline " + pipelineLength);
logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", send pipeline "
+ sendPipelineLength + ", receive pipeline " + recvPipelineLength);
}

public void init() throws IOException {
int sendBufferOffset = pipelineLength * rawBufferSize;

/* Main data buffer for sends and receives. Will be split into two regions,
* one for sends and one for receives.
*/
dataBuffer = ByteBuffer.allocateDirect(pipelineLength * rawBufferSize * 2);
/* Only do one memory registration with the IB card. */
dataMr = registerMemory(dataBuffer).execute().free().getMr();

/* Receive memory region is the first half of the main buffer. */
dataBuffer.limit(dataBuffer.position() + sendBufferOffset);
receiveBuffer = dataBuffer.slice();

/* Send memory region is the second half of the main buffer. */
dataBuffer.position(sendBufferOffset);
dataBuffer.limit(dataBuffer.position() + sendBufferOffset);
sendBuffer = dataBuffer.slice();

for(int i = 0; i < pipelineLength; i++) {
/* Create single receive buffers within the receive region in form of slices. */
receiveBuffer.position(i * rawBufferSize);
receiveBuffer.limit(receiveBuffer.position() + rawBufferSize);
recvBufs[i] = receiveBuffer.slice();

/* Create single send buffers within the send region in form of slices. */
sendBuffer.position(i * rawBufferSize);
sendBuffer.limit(sendBuffer.position() + rawBufferSize);
sendBufs[i] = sendBuffer.slice();
for(int i = 0; i < sendPipelineLength; i++) {
try {
sendBufs[i] = rpcGroup.getWRBuffer(this);
} catch (Exception e) {
throw new IOException(e);
}

this.recvCall[i] = setupRecvTask(i);
this.sendCall[i] = setupSendTask(i);
freePostSend.add(sendCall[i]);
}
for(int i = 0; i < recvPipelineLength; i++) {
try {
recvBufs[i] = rpcGroup.getWRBuffer(this);
} catch (Exception e) {
throw new IOException(e);
}

this.recvCall[i] = setupRecvTask(i);
recvCall[i].execute();
}
}

@Override
public synchronized void close() throws IOException, InterruptedException {
for(int i = 0; i < sendPipelineLength; i++) {
rpcGroup.freeBuffer(this, sendBufs[i]);
}
for(int i = 0; i < recvPipelineLength; i++) {
rpcGroup.freeBuffer(this, recvBufs[i]);
}
super.close();
deregisterMemory(dataMr);
}

}

public long getMessagesSent() {
return messagesSent.get();
}

public long getMessagesReceived() {
return messagesReceived.get();
}

protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOException {
SVCPostSend postSend = freePostSend.poll();
if (postSend != null){
int index = (int) postSend.getWrMod(0).getWr_id();
sendBufs[index].putInt(0, ticket);
sendBufs[index].position(4);
int written = 4 + message.write(sendBufs[index]);
sendBufs[index].position(HEADERSIZE);
int written = HEADERSIZE + message.write(sendBufs[index]);
postSend.getWrMod(0).getSgeMod(0).setLength(written);
postSend.getWrMod(0).setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
if (written <= maxinline) {
postSend.getWrMod(0).setSend_flags(postSend.getWrMod(0).getSend_flags() | IbvSendWR.IBV_SEND_INLINE);
}
}
pendingPostSend.put(ticket, postSend);
postSend.execute();
messagesSent.incrementAndGet();
Expand All @@ -154,33 +143,33 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti
return false;
}
}

protected void postRecv(int index) throws IOException {
recvCall[index].execute();
}
}

public void freeSend(int ticket) throws IOException {
SVCPostSend sendOperation = pendingPostSend.remove(ticket);
if (sendOperation == null) {
throw new IOException("no pending ticket " + ticket + ", current ticket count " + ticketCount.get());
}
this.freePostSend.add(sendOperation);
}
}

public void dispatchCqEvent(IbvWC wc) throws IOException {
if (wc.getStatus() == 5){
//flush
return;
} else if (wc.getStatus() != 0){
throw new IOException("Faulty operation! wc.status " + wc.getStatus());
}
}

if (wc.getOpcode() == 128){
//receiving a message
int index = (int) wc.getWr_id();
ByteBuffer recvBuffer = recvBufs[index];
int ticket = recvBuffer.getInt(0);
recvBuffer.position(4);
recvBuffer.position(HEADERSIZE);
dispatchReceive(recvBuffer, ticket, index);
} else if (wc.getOpcode() == 0) {
//send completion
Expand All @@ -190,17 +179,17 @@ public void dispatchCqEvent(IbvWC wc) throws IOException {
dispatchSend(ticket);
} else {
throw new IOException("Unkown opcode " + wc.getOpcode());
}
}
}
}

private SVCPostSend setupSendTask(int wrid) throws IOException {
ArrayList<IbvSendWR> sendWRs = new ArrayList<IbvSendWR>(1);
LinkedList<IbvSge> sgeList = new LinkedList<IbvSge>();

IbvSge sge = new IbvSge();
sge.setAddr(MemoryUtils.getAddress(sendBufs[wrid]));
sge.setLength(rawBufferSize);
sge.setLkey(dataMr.getLkey());
sge.setLkey(rpcGroup.getLKey(sendBufs[wrid]));
sgeList.add(sge);

IbvSendWR sendWR = new IbvSendWR();
Expand All @@ -220,7 +209,7 @@ private SVCPostRecv setupRecvTask(int wrid) throws IOException {
IbvSge sge = new IbvSge();
sge.setAddr(MemoryUtils.getAddress(recvBufs[wrid]));
sge.setLength(rawBufferSize);
sge.setLkey(dataMr.getLkey());
sge.setLkey(rpcGroup.getLKey(recvBufs[wrid]));
sgeList.add(sge);

IbvRecvWR recvWR = new IbvRecvWR();
Expand Down
Loading