Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
Patrick Stuedi <stu@zurich.ibm.com>
Adrian Schuepbach <dri@zurich.ibm.com>
Jonas Pfefferle <jpf@zurich.ibm.com>
Animesh Trivedi <atr@zurich.ibm.com>

8 changes: 6 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>com.ibm.darpc</groupId>
<artifactId>darpc</artifactId>
<packaging>jar</packaging>
<version>1.3</version>
<version>1.4</version>
<name>darpc</name>
<description>DaRPC (Data Center RPC) is a Java library for low latency Remote Procedure Call (RPC)</description>
<url>http://github.com/zrlio/darpc</url>
Expand All @@ -30,7 +30,11 @@
<name>Animesh Trivedi</name>
<email>atr@zurich.ibm.com</email>
</developer>
</developers>
<developer>
<name>Adrian Schuepbach</name>
<email>dri@zurich.ibm.com</email>
</developer>
</developers>

<scm>
<connection>scm:git:git://github.com/zrlio/darpc.git</connection>
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/ibm/darpc/DaRPCClientGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
import com.ibm.disni.rdma.verbs.RdmaCmId;

public class DaRPCClientGroup<R extends DaRPCMessage, T extends DaRPCMessage> extends DaRPCEndpointGroup<DaRPCClientEndpoint<R,T>, R, T> {
public static <R extends DaRPCMessage, T extends DaRPCMessage> DaRPCClientGroup<R, T> createClientGroup(DaRPCProtocol<R, T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
DaRPCClientGroup<R,T> group = new DaRPCClientGroup<R,T>(protocol, timeout, maxinline, recvQueue, sendQueue);
public static <R extends DaRPCMessage, T extends DaRPCMessage> DaRPCClientGroup<R, T> createClientGroup(DaRPCProtocol<R, T> protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
DaRPCClientGroup<R,T> group = new DaRPCClientGroup<R,T>(protocol, memPool, timeout, maxinline, recvQueue, sendQueue);
group.init(new RpcClientFactory<R,T>(group));
return group;
}

private DaRPCClientGroup(DaRPCProtocol<R, T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue)
private DaRPCClientGroup(DaRPCProtocol<R, T> protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue)
throws Exception {
super(protocol, timeout, maxinline, recvQueue, sendQueue);
super(protocol, memPool, timeout, maxinline, recvQueue, sendQueue);
}


Expand Down
102 changes: 55 additions & 47 deletions src/main/java/com/ibm/darpc/DaRPCEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
public abstract class DaRPCEndpoint<R extends DaRPCMessage, T extends DaRPCMessage> extends RdmaEndpoint {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private static final int headerSize = 4;

public abstract void dispatchReceive(ByteBuffer buffer, int ticket, int recvIndex) throws IOException;
public abstract void dispatchSend(int ticket) throws IOException;

private DaRPCEndpointGroup<? extends DaRPCEndpoint<R,T>, R, T> rpcGroup;
private ByteBuffer dataBuffer;
private IbvMr dataMr;
private int lkey;
private ByteBuffer receiveBuffer;
private ByteBuffer sendBuffer;
private ByteBuffer[] recvBufs;
Expand All @@ -56,96 +56,104 @@ public abstract class DaRPCEndpoint<R extends DaRPCMessage, T extends DaRPCMessa
private ConcurrentHashMap<Integer, SVCPostSend> pendingPostSend;
private ArrayBlockingQueue<SVCPostSend> freePostSend;
private AtomicLong ticketCount;
private int pipelineLength;
final private int sendPipelineLength;
final private int recvPipelineLength;
private int payloadSize;
private int rawBufferSize;
private int maxinline;
private AtomicLong messagesSent;
private AtomicLong messagesReceived;


public DaRPCEndpoint(DaRPCEndpointGroup<? extends DaRPCEndpoint<R,T>, R, T> endpointGroup, RdmaCmId idPriv, boolean serverSide) throws IOException {
super(endpointGroup, idPriv, serverSide);
this.rpcGroup = endpointGroup;
this.maxinline = rpcGroup.getMaxInline();
this.payloadSize = rpcGroup.getBufferSize();
this.rawBufferSize = headerSize + this.payloadSize;
this.pipelineLength = rpcGroup.recvQueueSize();
this.freePostSend = new ArrayBlockingQueue<SVCPostSend>(pipelineLength);
this.sendPipelineLength = rpcGroup.sendQueueSize();
this.recvPipelineLength = rpcGroup.recvQueueSize();
this.freePostSend = new ArrayBlockingQueue<SVCPostSend>(sendPipelineLength);
this.pendingPostSend = new ConcurrentHashMap<Integer, SVCPostSend>();
this.recvBufs = new ByteBuffer[pipelineLength];
this.sendBufs = new ByteBuffer[pipelineLength];
this.recvCall = new SVCPostRecv[pipelineLength];
this.sendCall = new SVCPostSend[pipelineLength];
this.recvBufs = new ByteBuffer[recvPipelineLength];
this.sendBufs = new ByteBuffer[sendPipelineLength];
this.recvCall = new SVCPostRecv[recvPipelineLength];
this.sendCall = new SVCPostSend[sendPipelineLength];
this.ticketCount = new AtomicLong(0);
this.messagesSent = new AtomicLong(0);
this.messagesReceived = new AtomicLong(0);
logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", pipeline " + pipelineLength);
logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", send pipeline "
+ sendPipelineLength + ", receive pipeline " + recvPipelineLength);
}

public void init() throws IOException {
int sendBufferOffset = pipelineLength * rawBufferSize;
int sendBufferOffset = recvPipelineLength * rawBufferSize;

/* Main data buffer for sends and receives. Will be split into two regions,
* one for sends and one for receives.
*/
dataBuffer = ByteBuffer.allocateDirect(pipelineLength * rawBufferSize * 2);
/* Only do one memory registration with the IB card. */
dataMr = registerMemory(dataBuffer).execute().free().getMr();
try {
dataBuffer = rpcGroup.getWRBuffer(this, sendPipelineLength * rawBufferSize + recvPipelineLength * rawBufferSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. Why make the mempool so complicated but than only get one big chunk out of it and slice them yourself?

Copy link
Contributor

@PepperJo PepperJo Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I get that each connection might have different send/recv queue size, but at least in the group they always have the same so it might make sense to push the mempool instanciation to the group and simplify it. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the idea behind this implementation is that the memory pool registers chunks of memory with the card end every endpoint/every QP allocates a pieces of this already registered memory chunk.

The reason, why it is not instantiated in the group is to make the application (Crail) able to read properties, instantiate and "configure" the mempool and then pass to the DaRPC library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this allocator. Instead we use a simple one with fixed blocksize.

lkey = rpcGroup.getLKey(this, dataBuffer);
} catch (Exception e) {
throw new IOException(e);
}

/* Receive memory region is the first half of the main buffer. */
dataBuffer.limit(dataBuffer.position() + sendBufferOffset);
receiveBuffer = dataBuffer.slice();

/* Send memory region is the second half of the main buffer. */
dataBuffer.position(sendBufferOffset);
dataBuffer.limit(dataBuffer.position() + sendBufferOffset);
dataBuffer.limit(dataBuffer.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is dataBuffer.capacity always == position + sendBufferOffset? Or could getWRBuffer return a buffer with higher capacity and just limit set to something lower

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the same.

sendBuffer = dataBuffer.slice();

for(int i = 0; i < pipelineLength; i++) {
/* Create single receive buffers within the receive region in form of slices. */
receiveBuffer.position(i * rawBufferSize);
receiveBuffer.limit(receiveBuffer.position() + rawBufferSize);
recvBufs[i] = receiveBuffer.slice();

for(int i = 0; i < sendPipelineLength; i++) {
/* Create single send buffers within the send region in form of slices. */
sendBuffer.position(i * rawBufferSize);
sendBuffer.limit(sendBuffer.position() + rawBufferSize);
sendBufs[i] = sendBuffer.slice();

this.recvCall[i] = setupRecvTask(i);
this.sendCall[i] = setupSendTask(i);
freePostSend.add(sendCall[i]);
}
for(int i = 0; i < recvPipelineLength; i++) {
/* Create single receive buffers within the receive region in form of slices. */
receiveBuffer.position(i * rawBufferSize);
receiveBuffer.limit(receiveBuffer.position() + rawBufferSize);
recvBufs[i] = receiveBuffer.slice();

this.recvCall[i] = setupRecvTask(i);
recvCall[i].execute();
}
}

@Override
public synchronized void close() throws IOException, InterruptedException {
rpcGroup.freeBuffer(this, dataBuffer);
super.close();
deregisterMemory(dataMr);
}

}

public long getMessagesSent() {
return messagesSent.get();
}

public long getMessagesReceived() {
return messagesReceived.get();
}

protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOException {
SVCPostSend postSend = freePostSend.poll();
if (postSend != null){
int index = (int) postSend.getWrMod(0).getWr_id();
sendBufs[index].putInt(0, ticket);
sendBufs[index].position(4);
int written = 4 + message.write(sendBufs[index]);
sendBufs[index].position(headerSize);
int written = headerSize + message.write(sendBufs[index]);
postSend.getWrMod(0).getSgeMod(0).setLength(written);
postSend.getWrMod(0).setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
if (written <= maxinline) {
postSend.getWrMod(0).setSend_flags(postSend.getWrMod(0).getSend_flags() | IbvSendWR.IBV_SEND_INLINE);
}
}
pendingPostSend.put(ticket, postSend);
postSend.execute();
messagesSent.incrementAndGet();
Expand All @@ -154,33 +162,33 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti
return false;
}
}

protected void postRecv(int index) throws IOException {
recvCall[index].execute();
}
}

public void freeSend(int ticket) throws IOException {
SVCPostSend sendOperation = pendingPostSend.remove(ticket);
if (sendOperation == null) {
throw new IOException("no pending ticket " + ticket + ", current ticket count " + ticketCount.get());
}
this.freePostSend.add(sendOperation);
}
}

public void dispatchCqEvent(IbvWC wc) throws IOException {
if (wc.getStatus() == 5){
//flush
return;
} else if (wc.getStatus() != 0){
throw new IOException("Faulty operation! wc.status " + wc.getStatus());
}
}

if (wc.getOpcode() == 128){
//receiving a message
int index = (int) wc.getWr_id();
ByteBuffer recvBuffer = recvBufs[index];
int ticket = recvBuffer.getInt(0);
recvBuffer.position(4);
recvBuffer.position(headerSize);
dispatchReceive(recvBuffer, ticket, index);
} else if (wc.getOpcode() == 0) {
//send completion
Expand All @@ -190,17 +198,17 @@ public void dispatchCqEvent(IbvWC wc) throws IOException {
dispatchSend(ticket);
} else {
throw new IOException("Unkown opcode " + wc.getOpcode());
}
}
}
}

private SVCPostSend setupSendTask(int wrid) throws IOException {
ArrayList<IbvSendWR> sendWRs = new ArrayList<IbvSendWR>(1);
LinkedList<IbvSge> sgeList = new LinkedList<IbvSge>();

IbvSge sge = new IbvSge();
sge.setAddr(MemoryUtils.getAddress(sendBufs[wrid]));
sge.setLength(rawBufferSize);
sge.setLkey(dataMr.getLkey());
sge.setLkey(lkey);
sgeList.add(sge);

IbvSendWR sendWR = new IbvSendWR();
Expand All @@ -220,7 +228,7 @@ private SVCPostRecv setupRecvTask(int wrid) throws IOException {
IbvSge sge = new IbvSge();
sge.setAddr(MemoryUtils.getAddress(recvBufs[wrid]));
sge.setLength(rawBufferSize);
sge.setLkey(dataMr.getLkey());
sge.setLkey(lkey);
sgeList.add(sge);

IbvRecvWR recvWR = new IbvRecvWR();
Expand Down
52 changes: 34 additions & 18 deletions src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,36 @@
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import com.ibm.disni.rdma.verbs.*;
import com.ibm.disni.rdma.*;


public abstract class DaRPCEndpointGroup<E extends DaRPCEndpoint<R,T>, R extends DaRPCMessage, T extends DaRPCMessage> extends RdmaEndpointGroup<E> {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private static int DARPC_VERSION = 50;

private int recvQueueSize;
private int sendQueueSize;
private int timeout;
private int bufferSize;
private int maxInline;

private DaRPCMemPool memPool;

public static int getVersion(){
return DARPC_VERSION;
}
protected DaRPCEndpointGroup(DaRPCProtocol<R,T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
}

protected DaRPCEndpointGroup(DaRPCProtocol<R,T> protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part I'm probably missing here, but why don't we have a simple interface defining a DaRPCMemPool, and then have applications pass any implementation of that interface while also providing a default implementation within DaRPC. Currently it looks like DaRPCMemPool is both the new MemPool type and the only implementation. Or I'm I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I changed it. There is an interface now and the application can instantiate a specific implementation of this interface.

super(timeout);
this.recvQueueSize = recvQueue;
this.sendQueueSize = Math.max(recvQueue, sendQueue);
this.sendQueueSize = sendQueue;
this.timeout = timeout;
this.bufferSize = Math.max(protocol.createRequest().size(), protocol.createResponse().size());
this.maxInline = maxinline;
}

this.memPool = memPool;
}

protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IOException{
IbvQPInitAttr attr = new IbvQPInitAttr();
attr.cap().setMax_recv_wr(recvQueueSize);
Expand All @@ -61,33 +63,47 @@ protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IO
attr.cap().setMax_inline_data(maxInline);
attr.setQp_type(IbvQP.IBV_QPT_RC);
attr.setRecv_cq(cq);
attr.setSend_cq(cq);
attr.setSend_cq(cq);
IbvQP qp = id.createQP(pd, attr);
return qp;
}

public int getTimeout() {
return timeout;
}

public int getBufferSize() {
return bufferSize;
}
}


public void close() throws IOException, InterruptedException {
super.close();
memPool.close();
logger.info("rpc group down");
}
}

public int recvQueueSize() {
return recvQueueSize;
}

public int sendQueueSize() {
return sendQueueSize;
}
}

public int getMaxInline() {
return maxInline;
}

ByteBuffer getWRBuffer(RdmaEndpoint endpoint, int size) throws Exception {
return memPool.getBuffer(endpoint, size);
}

void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException {
memPool.freeBuffer(endpoint, b);
}

int getLKey(RdmaEndpoint endpoint, ByteBuffer b) {
return memPool.getLKey(endpoint, b);
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/ibm/darpc/DaRPCMemPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.ibm.darpc;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;

import com.ibm.disni.rdma.RdmaEndpoint;

public interface DaRPCMemPool {
void close() throws IOException;
ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException, NoSuchElementException;
void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException;
public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException;
}
Loading