-
Notifications
You must be signed in to change notification settings - Fork 17
Add memory pool and register memory regions only once. #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
a7daf82
cc896b6
7e06fc2
d88de1f
e7a3c6d
bf52e02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,5 @@ | ||
| Patrick Stuedi <stu@zurich.ibm.com> | ||
| Adrian Schuepbach <dri@zurich.ibm.com> | ||
| Jonas Pfefferle <jpf@zurich.ibm.com> | ||
| Animesh Trivedi <atr@zurich.ibm.com> | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,13 +40,13 @@ | |
| public abstract class DaRPCEndpoint<R extends DaRPCMessage, T extends DaRPCMessage> extends RdmaEndpoint { | ||
| private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); | ||
| private static final int headerSize = 4; | ||
|
|
||
| public abstract void dispatchReceive(ByteBuffer buffer, int ticket, int recvIndex) throws IOException; | ||
| public abstract void dispatchSend(int ticket) throws IOException; | ||
|
|
||
| private DaRPCEndpointGroup<? extends DaRPCEndpoint<R,T>, R, T> rpcGroup; | ||
| private ByteBuffer dataBuffer; | ||
| private IbvMr dataMr; | ||
| private int lkey; | ||
| private ByteBuffer receiveBuffer; | ||
| private ByteBuffer sendBuffer; | ||
| private ByteBuffer[] recvBufs; | ||
|
|
@@ -56,96 +56,104 @@ public abstract class DaRPCEndpoint<R extends DaRPCMessage, T extends DaRPCMessa | |
| private ConcurrentHashMap<Integer, SVCPostSend> pendingPostSend; | ||
| private ArrayBlockingQueue<SVCPostSend> freePostSend; | ||
| private AtomicLong ticketCount; | ||
| private int pipelineLength; | ||
| final private int sendPipelineLength; | ||
| final private int recvPipelineLength; | ||
| private int payloadSize; | ||
| private int rawBufferSize; | ||
| private int maxinline; | ||
| private AtomicLong messagesSent; | ||
| private AtomicLong messagesReceived; | ||
|
|
||
|
|
||
| public DaRPCEndpoint(DaRPCEndpointGroup<? extends DaRPCEndpoint<R,T>, R, T> endpointGroup, RdmaCmId idPriv, boolean serverSide) throws IOException { | ||
| super(endpointGroup, idPriv, serverSide); | ||
| this.rpcGroup = endpointGroup; | ||
| this.maxinline = rpcGroup.getMaxInline(); | ||
| this.payloadSize = rpcGroup.getBufferSize(); | ||
| this.rawBufferSize = headerSize + this.payloadSize; | ||
| this.pipelineLength = rpcGroup.recvQueueSize(); | ||
| this.freePostSend = new ArrayBlockingQueue<SVCPostSend>(pipelineLength); | ||
| this.sendPipelineLength = rpcGroup.sendQueueSize(); | ||
| this.recvPipelineLength = rpcGroup.recvQueueSize(); | ||
| this.freePostSend = new ArrayBlockingQueue<SVCPostSend>(sendPipelineLength); | ||
| this.pendingPostSend = new ConcurrentHashMap<Integer, SVCPostSend>(); | ||
| this.recvBufs = new ByteBuffer[pipelineLength]; | ||
| this.sendBufs = new ByteBuffer[pipelineLength]; | ||
| this.recvCall = new SVCPostRecv[pipelineLength]; | ||
| this.sendCall = new SVCPostSend[pipelineLength]; | ||
| this.recvBufs = new ByteBuffer[recvPipelineLength]; | ||
| this.sendBufs = new ByteBuffer[sendPipelineLength]; | ||
| this.recvCall = new SVCPostRecv[recvPipelineLength]; | ||
| this.sendCall = new SVCPostSend[sendPipelineLength]; | ||
| this.ticketCount = new AtomicLong(0); | ||
| this.messagesSent = new AtomicLong(0); | ||
| this.messagesReceived = new AtomicLong(0); | ||
| logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", pipeline " + pipelineLength); | ||
| logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", send pipeline " | ||
| + sendPipelineLength + ", receive pipeline " + recvPipelineLength); | ||
| } | ||
|
|
||
| public void init() throws IOException { | ||
| int sendBufferOffset = pipelineLength * rawBufferSize; | ||
| int sendBufferOffset = recvPipelineLength * rawBufferSize; | ||
|
|
||
| /* Main data buffer for sends and receives. Will be split into two regions, | ||
| * one for sends and one for receives. | ||
| */ | ||
| dataBuffer = ByteBuffer.allocateDirect(pipelineLength * rawBufferSize * 2); | ||
| /* Only do one memory registration with the IB card. */ | ||
| dataMr = registerMemory(dataBuffer).execute().free().getMr(); | ||
| try { | ||
| dataBuffer = rpcGroup.getWRBuffer(this, sendPipelineLength * rawBufferSize + recvPipelineLength * rawBufferSize); | ||
| lkey = rpcGroup.getLKey(this, dataBuffer); | ||
| } catch (Exception e) { | ||
| throw new IOException(e); | ||
| } | ||
|
|
||
| /* Receive memory region is the first half of the main buffer. */ | ||
| dataBuffer.limit(dataBuffer.position() + sendBufferOffset); | ||
| receiveBuffer = dataBuffer.slice(); | ||
|
|
||
| /* Send memory region is the second half of the main buffer. */ | ||
| dataBuffer.position(sendBufferOffset); | ||
| dataBuffer.limit(dataBuffer.position() + sendBufferOffset); | ||
| dataBuffer.limit(dataBuffer.capacity()); | ||
|
||
| sendBuffer = dataBuffer.slice(); | ||
|
|
||
| for(int i = 0; i < pipelineLength; i++) { | ||
| /* Create single receive buffers within the receive region in form of slices. */ | ||
| receiveBuffer.position(i * rawBufferSize); | ||
| receiveBuffer.limit(receiveBuffer.position() + rawBufferSize); | ||
| recvBufs[i] = receiveBuffer.slice(); | ||
|
|
||
| for(int i = 0; i < sendPipelineLength; i++) { | ||
| /* Create single send buffers within the send region in form of slices. */ | ||
| sendBuffer.position(i * rawBufferSize); | ||
| sendBuffer.limit(sendBuffer.position() + rawBufferSize); | ||
| sendBufs[i] = sendBuffer.slice(); | ||
|
|
||
| this.recvCall[i] = setupRecvTask(i); | ||
| this.sendCall[i] = setupSendTask(i); | ||
| freePostSend.add(sendCall[i]); | ||
| } | ||
| for(int i = 0; i < recvPipelineLength; i++) { | ||
| /* Create single receive buffers within the receive region in form of slices. */ | ||
| receiveBuffer.position(i * rawBufferSize); | ||
| receiveBuffer.limit(receiveBuffer.position() + rawBufferSize); | ||
| recvBufs[i] = receiveBuffer.slice(); | ||
|
|
||
| this.recvCall[i] = setupRecvTask(i); | ||
| recvCall[i].execute(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized void close() throws IOException, InterruptedException { | ||
| rpcGroup.freeBuffer(this, dataBuffer); | ||
| super.close(); | ||
| deregisterMemory(dataMr); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| public long getMessagesSent() { | ||
| return messagesSent.get(); | ||
| } | ||
|
|
||
| public long getMessagesReceived() { | ||
| return messagesReceived.get(); | ||
| } | ||
|
|
||
| protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOException { | ||
| SVCPostSend postSend = freePostSend.poll(); | ||
| if (postSend != null){ | ||
| int index = (int) postSend.getWrMod(0).getWr_id(); | ||
| sendBufs[index].putInt(0, ticket); | ||
| sendBufs[index].position(4); | ||
| int written = 4 + message.write(sendBufs[index]); | ||
| sendBufs[index].position(headerSize); | ||
| int written = headerSize + message.write(sendBufs[index]); | ||
| postSend.getWrMod(0).getSgeMod(0).setLength(written); | ||
| postSend.getWrMod(0).setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); | ||
| if (written <= maxinline) { | ||
| postSend.getWrMod(0).setSend_flags(postSend.getWrMod(0).getSend_flags() | IbvSendWR.IBV_SEND_INLINE); | ||
| } | ||
| } | ||
| pendingPostSend.put(ticket, postSend); | ||
| postSend.execute(); | ||
| messagesSent.incrementAndGet(); | ||
|
|
@@ -154,33 +162,33 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti | |
| return false; | ||
| } | ||
| } | ||
|
|
||
| protected void postRecv(int index) throws IOException { | ||
| recvCall[index].execute(); | ||
| } | ||
| } | ||
|
|
||
| public void freeSend(int ticket) throws IOException { | ||
| SVCPostSend sendOperation = pendingPostSend.remove(ticket); | ||
| if (sendOperation == null) { | ||
| throw new IOException("no pending ticket " + ticket + ", current ticket count " + ticketCount.get()); | ||
| } | ||
| this.freePostSend.add(sendOperation); | ||
| } | ||
| } | ||
|
|
||
| public void dispatchCqEvent(IbvWC wc) throws IOException { | ||
| if (wc.getStatus() == 5){ | ||
| //flush | ||
| return; | ||
| } else if (wc.getStatus() != 0){ | ||
| throw new IOException("Faulty operation! wc.status " + wc.getStatus()); | ||
| } | ||
| } | ||
|
|
||
| if (wc.getOpcode() == 128){ | ||
| //receiving a message | ||
| int index = (int) wc.getWr_id(); | ||
| ByteBuffer recvBuffer = recvBufs[index]; | ||
| int ticket = recvBuffer.getInt(0); | ||
| recvBuffer.position(4); | ||
| recvBuffer.position(headerSize); | ||
| dispatchReceive(recvBuffer, ticket, index); | ||
| } else if (wc.getOpcode() == 0) { | ||
| //send completion | ||
|
|
@@ -190,17 +198,17 @@ public void dispatchCqEvent(IbvWC wc) throws IOException { | |
| dispatchSend(ticket); | ||
| } else { | ||
| throw new IOException("Unkown opcode " + wc.getOpcode()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private SVCPostSend setupSendTask(int wrid) throws IOException { | ||
| ArrayList<IbvSendWR> sendWRs = new ArrayList<IbvSendWR>(1); | ||
| LinkedList<IbvSge> sgeList = new LinkedList<IbvSge>(); | ||
|
|
||
| IbvSge sge = new IbvSge(); | ||
| sge.setAddr(MemoryUtils.getAddress(sendBufs[wrid])); | ||
| sge.setLength(rawBufferSize); | ||
| sge.setLkey(dataMr.getLkey()); | ||
| sge.setLkey(lkey); | ||
| sgeList.add(sge); | ||
|
|
||
| IbvSendWR sendWR = new IbvSendWR(); | ||
|
|
@@ -220,7 +228,7 @@ private SVCPostRecv setupRecvTask(int wrid) throws IOException { | |
| IbvSge sge = new IbvSge(); | ||
| sge.setAddr(MemoryUtils.getAddress(recvBufs[wrid])); | ||
| sge.setLength(rawBufferSize); | ||
| sge.setLkey(dataMr.getLkey()); | ||
| sge.setLkey(lkey); | ||
| sgeList.add(sge); | ||
|
|
||
| IbvRecvWR recvWR = new IbvRecvWR(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,34 +24,36 @@ | |
| import java.io.IOException; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import com.ibm.disni.rdma.verbs.*; | ||
| import com.ibm.disni.rdma.*; | ||
|
|
||
|
|
||
| public abstract class DaRPCEndpointGroup<E extends DaRPCEndpoint<R,T>, R extends DaRPCMessage, T extends DaRPCMessage> extends RdmaEndpointGroup<E> { | ||
| private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); | ||
| private static int DARPC_VERSION = 50; | ||
|
|
||
| private int recvQueueSize; | ||
| private int sendQueueSize; | ||
| private int timeout; | ||
| private int bufferSize; | ||
| private int maxInline; | ||
|
|
||
| private DaRPCMemPool memPool; | ||
|
|
||
| public static int getVersion(){ | ||
| return DARPC_VERSION; | ||
| } | ||
| protected DaRPCEndpointGroup(DaRPCProtocol<R,T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { | ||
| } | ||
|
|
||
| protected DaRPCEndpointGroup(DaRPCProtocol<R,T> protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { | ||
|
||
| super(timeout); | ||
| this.recvQueueSize = recvQueue; | ||
| this.sendQueueSize = Math.max(recvQueue, sendQueue); | ||
| this.sendQueueSize = sendQueue; | ||
| this.timeout = timeout; | ||
| this.bufferSize = Math.max(protocol.createRequest().size(), protocol.createResponse().size()); | ||
| this.maxInline = maxinline; | ||
| } | ||
|
|
||
| this.memPool = memPool; | ||
| } | ||
|
|
||
| protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IOException{ | ||
| IbvQPInitAttr attr = new IbvQPInitAttr(); | ||
| attr.cap().setMax_recv_wr(recvQueueSize); | ||
|
|
@@ -61,33 +63,47 @@ protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IO | |
| attr.cap().setMax_inline_data(maxInline); | ||
| attr.setQp_type(IbvQP.IBV_QPT_RC); | ||
| attr.setRecv_cq(cq); | ||
| attr.setSend_cq(cq); | ||
| attr.setSend_cq(cq); | ||
| IbvQP qp = id.createQP(pd, attr); | ||
| return qp; | ||
| } | ||
|
|
||
| public int getTimeout() { | ||
| return timeout; | ||
| } | ||
|
|
||
| public int getBufferSize() { | ||
| return bufferSize; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| public void close() throws IOException, InterruptedException { | ||
| super.close(); | ||
| memPool.close(); | ||
| logger.info("rpc group down"); | ||
| } | ||
| } | ||
|
|
||
| public int recvQueueSize() { | ||
| return recvQueueSize; | ||
| } | ||
|
|
||
| public int sendQueueSize() { | ||
| return sendQueueSize; | ||
| } | ||
| } | ||
|
|
||
| public int getMaxInline() { | ||
| return maxInline; | ||
| } | ||
|
|
||
| ByteBuffer getWRBuffer(RdmaEndpoint endpoint, int size) throws Exception { | ||
| return memPool.getBuffer(endpoint, size); | ||
| } | ||
|
|
||
| void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException { | ||
| memPool.freeBuffer(endpoint, b); | ||
| } | ||
|
|
||
| int getLKey(RdmaEndpoint endpoint, ByteBuffer b) { | ||
| return memPool.getLKey(endpoint, b); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| package com.ibm.darpc; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| import com.ibm.disni.rdma.RdmaEndpoint; | ||
|
|
||
| public interface DaRPCMemPool { | ||
| void close() throws IOException; | ||
| ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException; | ||
| void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException; | ||
| public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this. Why make the mempool so complicated but than only get one big chunk out of it and slice them yourself?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I get that each connection might have different send/recv queue size, but at least in the group they always have the same so it might make sense to push the mempool instanciation to the group and simplify it. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the idea behind this implementation is that the memory pool registers chunks of memory with the card end every endpoint/every QP allocates a pieces of this already registered memory chunk.
The reason, why it is not instantiated in the group is to make the application (Crail) able to read properties, instantiate and "configure" the mempool and then pass to the DaRPC library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this allocator. Instead we use a simple one with fixed blocksize.