From a7daf8280cb752097218f9c2665cb3d8edce4772 Mon Sep 17 00:00:00 2001 From: Adrian Schuepbach Date: Wed, 20 Sep 2017 15:57:38 +0200 Subject: [PATCH 1/6] Add memory pool and register memory regions only once. Allow independent send/receive queue lengths. --- AUTHORS | 4 + pom.xml | 8 +- .../java/com/ibm/darpc/DaRPCClientGroup.java | 8 +- .../java/com/ibm/darpc/DaRPCEndpoint.java | 62 +-- .../com/ibm/darpc/DaRPCEndpointGroup.java | 21 +- src/main/java/com/ibm/darpc/DaRPCMemPool.java | 435 ++++++++++++++++++ .../com/ibm/darpc/DaRPCServerEndpoint.java | 5 +- .../java/com/ibm/darpc/DaRPCServerGroup.java | 11 +- .../darpc/examples/client/DaRPCClient.java | 4 +- .../darpc/examples/server/DaRPCServer.java | 4 +- 10 files changed, 519 insertions(+), 43 deletions(-) create mode 100644 src/main/java/com/ibm/darpc/DaRPCMemPool.java diff --git a/AUTHORS b/AUTHORS index d86fd74..23a03fe 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1 +1,5 @@ Patrick Stuedi +Adrian Schuepbach +Jonas Pfefferle +Animesh Trivedi + diff --git a/pom.xml b/pom.xml index cca234f..6b8f28a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.ibm.darpc darpc jar - 1.3 + 1.4 darpc DaRPC (Data Center RPC) is a Java library for low latency Remote Procedure Call (RPC) http://github.com/zrlio/darpc @@ -30,7 +30,11 @@ Animesh Trivedi atr@zurich.ibm.com - + + Adrian Schuepbach + dri@zurich.ibm.com + + scm:git:git://github.com/zrlio/darpc.git diff --git a/src/main/java/com/ibm/darpc/DaRPCClientGroup.java b/src/main/java/com/ibm/darpc/DaRPCClientGroup.java index aa4652b..efcf5e9 100644 --- a/src/main/java/com/ibm/darpc/DaRPCClientGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCClientGroup.java @@ -9,15 +9,15 @@ import com.ibm.disni.rdma.verbs.RdmaCmId; public class DaRPCClientGroup extends DaRPCEndpointGroup, R, T> { - public static DaRPCClientGroup createClientGroup(DaRPCProtocol protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { - DaRPCClientGroup group = new DaRPCClientGroup(protocol, timeout, maxinline, recvQueue, sendQueue); + public static DaRPCClientGroup createClientGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { + DaRPCClientGroup group = new DaRPCClientGroup(protocol, memPool, timeout, maxinline, recvQueue, sendQueue); group.init(new RpcClientFactory(group)); return group; } - private DaRPCClientGroup(DaRPCProtocol protocol, int timeout, int maxinline, int recvQueue, int sendQueue) + private DaRPCClientGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { - super(protocol, timeout, maxinline, recvQueue, sendQueue); + super(protocol, memPool, timeout, maxinline, recvQueue, sendQueue); } diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java index eabb4e7..d432a55 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java @@ -46,7 +46,7 @@ public abstract class DaRPCEndpoint, R, T> rpcGroup; private ByteBuffer dataBuffer; - private IbvMr dataMr; + private int lkey; private ByteBuffer receiveBuffer; private ByteBuffer sendBuffer; private ByteBuffer[] recvBufs; @@ -56,7 +56,8 @@ public abstract class DaRPCEndpoint pendingPostSend; private ArrayBlockingQueue freePostSend; private AtomicLong ticketCount; - private int pipelineLength; + private int sendPipelineLength; + private int recvPipelineLength; private int payloadSize; private int rawBufferSize; private int maxinline; @@ -70,28 +71,33 @@ public DaRPCEndpoint(DaRPCEndpointGroup, R, T> endp this.maxinline = rpcGroup.getMaxInline(); this.payloadSize = rpcGroup.getBufferSize(); this.rawBufferSize = headerSize + this.payloadSize; - this.pipelineLength = rpcGroup.recvQueueSize(); - this.freePostSend = new ArrayBlockingQueue(pipelineLength); + this.sendPipelineLength = rpcGroup.sendQueueSize(); + this.recvPipelineLength = rpcGroup.recvQueueSize(); + this.freePostSend = new ArrayBlockingQueue(sendPipelineLength); this.pendingPostSend = new ConcurrentHashMap(); - this.recvBufs = new ByteBuffer[pipelineLength]; - this.sendBufs = new ByteBuffer[pipelineLength]; - this.recvCall = new SVCPostRecv[pipelineLength]; - this.sendCall = new SVCPostSend[pipelineLength]; + this.recvBufs = new ByteBuffer[recvPipelineLength]; + this.sendBufs = new ByteBuffer[sendPipelineLength]; + this.recvCall = new SVCPostRecv[recvPipelineLength]; + this.sendCall = new SVCPostSend[sendPipelineLength]; this.ticketCount = new AtomicLong(0); this.messagesSent = new AtomicLong(0); this.messagesReceived = new AtomicLong(0); - logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", pipeline " + pipelineLength); + logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", send pipeline " + + sendPipelineLength + ", receive pipeline " + recvPipelineLength); } public void init() throws IOException { - int sendBufferOffset = pipelineLength * rawBufferSize; + int sendBufferOffset = recvPipelineLength * rawBufferSize; /* Main data buffer for sends and receives. Will be split into two regions, * one for sends and one for receives. */ - dataBuffer = ByteBuffer.allocateDirect(pipelineLength * rawBufferSize * 2); - /* Only do one memory registration with the IB card. */ - dataMr = registerMemory(dataBuffer).execute().free().getMr(); + try { + dataBuffer = rpcGroup.getWRBuffer(this, sendPipelineLength * rawBufferSize + recvPipelineLength * rawBufferSize); + lkey = rpcGroup.getLKey(this, dataBuffer); + } catch (Exception e) { + throw new IOException(e); + } /* Receive memory region is the first half of the main buffer. */ dataBuffer.limit(dataBuffer.position() + sendBufferOffset); @@ -99,31 +105,33 @@ public void init() throws IOException { /* Send memory region is the second half of the main buffer. */ dataBuffer.position(sendBufferOffset); - dataBuffer.limit(dataBuffer.position() + sendBufferOffset); + dataBuffer.limit(dataBuffer.capacity()); sendBuffer = dataBuffer.slice(); - for(int i = 0; i < pipelineLength; i++) { - /* Create single receive buffers within the receive region in form of slices. */ - receiveBuffer.position(i * rawBufferSize); - receiveBuffer.limit(receiveBuffer.position() + rawBufferSize); - recvBufs[i] = receiveBuffer.slice(); - + for(int i = 0; i < sendPipelineLength; i++) { /* Create single send buffers within the send region in form of slices. */ sendBuffer.position(i * rawBufferSize); sendBuffer.limit(sendBuffer.position() + rawBufferSize); sendBufs[i] = sendBuffer.slice(); - this.recvCall[i] = setupRecvTask(i); this.sendCall[i] = setupSendTask(i); freePostSend.add(sendCall[i]); + } + for(int i = 0; i < recvPipelineLength; i++) { + /* Create single receive buffers within the receive region in form of slices. */ + receiveBuffer.position(i * rawBufferSize); + receiveBuffer.limit(receiveBuffer.position() + rawBufferSize); + recvBufs[i] = receiveBuffer.slice(); + + this.recvCall[i] = setupRecvTask(i); recvCall[i].execute(); } } @Override public synchronized void close() throws IOException, InterruptedException { + rpcGroup.freeBuffer(this, dataBuffer); super.close(); - deregisterMemory(dataMr); } public long getMessagesSent() { @@ -139,8 +147,8 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti if (postSend != null){ int index = (int) postSend.getWrMod(0).getWr_id(); sendBufs[index].putInt(0, ticket); - sendBufs[index].position(4); - int written = 4 + message.write(sendBufs[index]); + sendBufs[index].position(headerSize); + int written = headerSize + message.write(sendBufs[index]); postSend.getWrMod(0).getSgeMod(0).setLength(written); postSend.getWrMod(0).setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); if (written <= maxinline) { @@ -180,7 +188,7 @@ public void dispatchCqEvent(IbvWC wc) throws IOException { int index = (int) wc.getWr_id(); ByteBuffer recvBuffer = recvBufs[index]; int ticket = recvBuffer.getInt(0); - recvBuffer.position(4); + recvBuffer.position(headerSize); dispatchReceive(recvBuffer, ticket, index); } else if (wc.getOpcode() == 0) { //send completion @@ -200,7 +208,7 @@ private SVCPostSend setupSendTask(int wrid) throws IOException { IbvSge sge = new IbvSge(); sge.setAddr(MemoryUtils.getAddress(sendBufs[wrid])); sge.setLength(rawBufferSize); - sge.setLkey(dataMr.getLkey()); + sge.setLkey(lkey); sgeList.add(sge); IbvSendWR sendWR = new IbvSendWR(); @@ -220,7 +228,7 @@ private SVCPostRecv setupRecvTask(int wrid) throws IOException { IbvSge sge = new IbvSge(); sge.setAddr(MemoryUtils.getAddress(recvBufs[wrid])); sge.setLength(rawBufferSize); - sge.setLkey(dataMr.getLkey()); + sge.setLkey(lkey); sgeList.add(sge); IbvRecvWR recvWR = new IbvRecvWR(); diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java b/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java index d95f746..45048f3 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java @@ -24,7 +24,7 @@ import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import java.nio.ByteBuffer; import com.ibm.disni.rdma.verbs.*; import com.ibm.disni.rdma.*; @@ -38,18 +38,20 @@ public abstract class DaRPCEndpointGroup, R extends private int timeout; private int bufferSize; private int maxInline; + private DaRPCMemPool memPool; public static int getVersion(){ return DARPC_VERSION; } - protected DaRPCEndpointGroup(DaRPCProtocol protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { + protected DaRPCEndpointGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { super(timeout); this.recvQueueSize = recvQueue; - this.sendQueueSize = Math.max(recvQueue, sendQueue); + this.sendQueueSize = sendQueue; this.timeout = timeout; this.bufferSize = Math.max(protocol.createRequest().size(), protocol.createResponse().size()); this.maxInline = maxinline; + this.memPool = memPool; } protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IOException{ @@ -76,6 +78,7 @@ public int getBufferSize() { public void close() throws IOException, InterruptedException { super.close(); + memPool.close(); logger.info("rpc group down"); } @@ -90,4 +93,16 @@ public int sendQueueSize() { public int getMaxInline() { return maxInline; } + + ByteBuffer getWRBuffer(RdmaEndpoint endpoint, int size) throws Exception { + return memPool.getBuffer(endpoint, size); + } + + void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { + memPool.freeBuffer(endpoint, b); + } + + int getLKey(RdmaEndpoint endpoint, ByteBuffer b) { + return memPool.getLKey(endpoint, b); + } } diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPool.java b/src/main/java/com/ibm/darpc/DaRPCMemPool.java new file mode 100644 index 0000000..8272c03 --- /dev/null +++ b/src/main/java/com/ibm/darpc/DaRPCMemPool.java @@ -0,0 +1,435 @@ +package com.ibm.darpc; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; + +import com.ibm.disni.rdma.RdmaEndpoint; +import com.ibm.disni.rdma.verbs.IbvMr; +import com.ibm.disni.rdma.verbs.IbvPd; +import com.ibm.disni.util.MemoryUtils; + +public class DaRPCMemPool { + final int defaultAllocationSize = 16 * 1024 * 1024; // 16MB + final int defaultMinAllocationSize = 4 * 1024; // 4KB + final int defaultAlignmentSize = 4 * 1024; // 4KB + final int defaultHugePageLimit = 0; // no huge pages by default + + private HashMap pdMemPool; // One buddy allocator per protection domain + private LinkedList mrs; + private boolean isOpen; + + long currentRegion = 0; + private File dir; + protected int access; // RDMA access flags to use when registering memory regions + private long allocatedHugePageMemory; + + // Configurable values + String hugePagePath = null; + protected int allocationSize; + protected int minAllocationSize; + protected int alignmentSize; + protected int hugePageLimit; + + + + + + public DaRPCMemPool(String hugePagePath, int hugePageLimit, int allocationSize, int minAllocationSize, int alignmentSize) { + isOpen = false; + this.allocationSize = defaultAllocationSize; + this.minAllocationSize = defaultMinAllocationSize; + this.alignmentSize = defaultAlignmentSize; + this.hugePageLimit = defaultHugePageLimit; + this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + + this.hugePagePath = hugePagePath; + + allocatedHugePageMemory = 0; + + if (allocationSize >= 0) { + this.allocationSize = allocationSize; + } + if (minAllocationSize >= 0) { + this.minAllocationSize = minAllocationSize; + } + if (alignmentSize >= 0) { + this.alignmentSize = alignmentSize; + } + if (hugePageLimit >= 0) { + this.hugePageLimit = hugePageLimit; + } + if ((hugePagePath != null) && (!hugePagePath.equals("")) && (this.hugePageLimit > 0)) { + dir = new File(hugePagePath); + if (!dir.exists()){ + dir.mkdirs(); + } + for (File child : dir.listFiles()) { + child.delete(); + } + } else { + this.hugePageLimit = 0; + } + pdMemPool = new HashMap(); + mrs = new LinkedList(); + isOpen = true; + } + + public synchronized void close() { + if (isOpen) { + isOpen = false; + pdMemPool = null; + for (Iterator it = mrs.iterator(); it.hasNext(); ) { + IbvMr m = it.next(); + try { + m.deregMr().execute().free(); + } catch (IOException e) { + System.out.println("Could not unregister memory region."); + e.printStackTrace(); + } + } + mrs = null; + } + + // If hugepages were used, clean and delete the created files and the directory. + if (hugePageLimit > 0) { + if (dir.exists()) { + for (File child : dir.listFiles()) { + child.delete(); + } + dir.delete(); + } + } + } + + public void finalize() { + // Just in case the user did not do that. + close(); + } + + + // the next two methods allocate a buffer from the OS. The first one + // allocates from the regular heap and the second one from huge pages. + // These are two alternatives. If hugepages is configured by the user, + // memory will first be allocated from huge pages and after reacing the limit, + // more memory will be allocated from the regular heap. + + // allocate a buffer from the regular heap + ByteBuffer allocateHeapBuffer() { + ByteBuffer byteBuffer; + + if (alignmentSize > 1) { + ByteBuffer rawBuffer = ByteBuffer.allocateDirect(allocationSize + alignmentSize); + long rawBufferAddress = ((sun.nio.ch.DirectBuffer)rawBuffer).address(); + long alignmentOffset = rawBufferAddress % alignmentSize; + if (alignmentOffset != 0) { + rawBuffer.position(alignmentSize - (int)alignmentOffset); + } + byteBuffer = rawBuffer.slice(); + + } else { + byteBuffer = ByteBuffer.allocateDirect(allocationSize); + } + return (byteBuffer); + } + + // allocate a buffer from hugepages + ByteBuffer allocateHugePageBuffer() throws IOException { + String path = hugePagePath + "/" + currentRegion++ + ".mem"; + RandomAccessFile randomFile = null; + try { + randomFile = new RandomAccessFile(path, "rw"); + } catch (FileNotFoundException e) { + System.out.println("Path " + path + " to huge page directory not found."); + throw e; + } + try { + randomFile.setLength(allocationSize); + } catch (IOException e) { + System.out.println("Coult not set allocation length of mapped random access file on huge page directory."); + randomFile.close(); + throw e; + } + FileChannel channel = randomFile.getChannel(); + MappedByteBuffer mappedBuffer = null; + try { + mappedBuffer = channel.map(MapMode.READ_WRITE, 0, + allocationSize); + } catch (IOException e) { + System.out.println("Could not map the huge page file on path " + path); + randomFile.close(); + throw e; + } + randomFile.close(); + allocatedHugePageMemory += allocationSize; + return (mappedBuffer); + } + + + // Add a new chunk and register it with the IB device. + // This adds a new "root" to the buddy tree. + protected void addNewBuddy(PdMemPool pdm) throws IOException { + BuddyInfo bi = new BuddyInfo(); + + if ((allocatedHugePageMemory + allocationSize) < hugePageLimit) { + bi.buffer = allocateHugePageBuffer(); + } else { + bi.buffer = allocateHeapBuffer(); + } + // Register buffer with IB card + IbvMr mr = pdm.pd.regMr(bi.buffer, access).execute().free().getMr(); + mrs.addLast(mr); + + bi.s = state.FREE; + bi.size = allocationSize; + bi.parent = null; + bi.sibling = null; + bi.lkey = mr.getLkey(); + + if (pdm.freeBuddies.get(allocationSize) == null) { + pdm.freeBuddies.put(allocationSize, new LinkedList()); + } + pdm.freeBuddies.get(allocationSize).add(bi); + } + + protected boolean split(PdMemPool pdm, int size) { + if (size > allocationSize) { + return false; + } + if (!pdm.freeBuddies.containsKey(size)) { + if (!split(pdm, size << 1)) { + // no free buddy, which could be split + return false; + } + } + LinkedList l = pdm.freeBuddies.get(size); + if (l == null) { + return false; + } + BuddyInfo bi = l.removeFirst(); + if (l.size() == 0) { + pdm.freeBuddies.remove(size); + } + bi.s = state.SPLIT; + bi.buffer.position(0); + bi.buffer.limit(size >> 1); + ByteBuffer b1 = bi.buffer.slice(); + bi.buffer.position(size >> 1); + bi.buffer.limit(size); + ByteBuffer b2 = bi.buffer.slice(); + + BuddyInfo bi1 = new BuddyInfo(); + BuddyInfo bi2 = new BuddyInfo(); + bi1.buffer = b1; + bi1.s = state.FREE; + bi1.size = (size >> 1); + bi1.parent = bi; + bi1.sibling = bi2; + bi1.lkey = bi.lkey; + + bi2.buffer = b2; + bi2.s = state.FREE; + bi2.size = (size >> 1); + bi2.parent = bi; + bi2.sibling = bi1; + bi2.lkey = bi.lkey; + + if (pdm.freeBuddies.get(size >> 1) == null) { + pdm.freeBuddies.put(size >> 1, new LinkedList()); + } + pdm.freeBuddies.get(size >> 1).add(bi1); + pdm.freeBuddies.get(size >> 1).add(bi2); + + return true; + } + + + protected ByteBuffer getPower2Buffer(PdMemPool pdm, int size) { + if (!pdm.freeBuddies.containsKey(size)) { + if (!split(pdm, size << 1)) { + // no free buddy, which could be split + return null; + } + } + LinkedList l = pdm.freeBuddies.get(size); + if (l == null) { + return null; + } + BuddyInfo bi = l.removeFirst(); + if (l.size() == 0) { + pdm.freeBuddies.remove(size); + } + bi.s = state.USED; + pdm.usedBuddies.put(MemoryUtils.getAddress(bi.buffer), bi); + return bi.buffer; + } + + synchronized ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { + int i = minAllocationSize; + + if (!pdMemPool.containsKey(endpoint.getPd())) { + pdMemPool.put(endpoint.getPd(), new PdMemPool(endpoint.getPd())); + } + PdMemPool pdm = pdMemPool.get(endpoint.getPd()); + + while(size > i) { + i <<= 1; + } + + ByteBuffer b = getPower2Buffer(pdm, i); + if (b == null) { + addNewBuddy(pdm); + b = getPower2Buffer(pdm, i); + } + return (b); + } + + protected void merge(PdMemPool pdm, BuddyInfo bi) { + if (bi.sibling != null) { + if (bi.sibling.s == state.FREE) { + BuddyInfo parent = bi.parent; + parent.s = state.FREE; + if (pdm.freeBuddies.get(parent.size) == null) { + pdm.freeBuddies.put(parent.size, new LinkedList()); + } + pdm.freeBuddies.get(parent.size).add(parent); + pdm.freeBuddies.get(bi.size).remove(bi.sibling); + pdm.freeBuddies.get(bi.size).remove(bi); + if (pdm.freeBuddies.get(bi.size).size() == 0) { + pdm.freeBuddies.remove(bi.size); + } + merge(pdm, parent); + } + } + } + synchronized void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { + if (b == null) { + return; + } + PdMemPool pdm = pdMemPool.get(endpoint.getPd()); + BuddyInfo bi = pdm.usedBuddies.remove(MemoryUtils.getAddress(b)); + // Buffer is not in the used list. Cannot free. + if (bi == null) { + return; + } + bi.s = state.FREE; + if (pdm.freeBuddies.get(bi.size) == null) { + pdm.freeBuddies.put(bi.size, new LinkedList()); + } + pdm.freeBuddies.get(bi.size).add(bi); + merge(pdm, bi); + } + + int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { + if (b == null) { + System.out.println("getLKey(): Argument buffer is null. Cannot return lkey."); + throw new IllegalArgumentException("getLKey(): Argument buffer is null. Cannot return lkey."); + } + PdMemPool pdm = pdMemPool.get(endpoint.getPd()); + BuddyInfo bi = pdm.usedBuddies.get(MemoryUtils.getAddress(b)); + if (bi != null) { + return bi.lkey; + } else { + System.out.println("getLKey(): This buffer is not allocated. Cannot return lkey."); + throw new IllegalArgumentException("getLKey(): This buffer is not allocated. Cannot return lkey."); + } + } + + void setAllocationSize(int size) { + allocationSize = size; + } + + void setMinAllocationSize(int size) { + minAllocationSize = size; + } + + void setAlignment(int size) { + alignmentSize = size; + } + + int getAllocationSize() { + return allocationSize; + } + + int getMinAllocationSize(int size) { + return minAllocationSize; + } + + int getAlignment(int size) { + return alignmentSize; + } + + void printBuddies() { + System.out.println("Free buddies:\n============"); + for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { + PdMemPool pdm = itpd.next(); + for (Iterator it = pdm.freeBuddies.keySet().iterator(); it.hasNext(); ) { + Integer size = it.next(); + System.out.println("Size: " + size); + LinkedList l = pdm.freeBuddies.get(size); + if (l != null) { + for (Iterator it2 = l.iterator(); it2.hasNext(); ) { + BuddyInfo bi = it2.next(); + System.out.println(bi); + } + } + } + } + System.out.println("============\n"); + + System.out.println("Used buddies:\n============"); + for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { + PdMemPool pdm = itpd.next(); + for (Iterator it = pdm.usedBuddies.values().iterator(); it.hasNext(); ) { + System.out.println(it.next()); + } + } + System.out.println("============\n"); + } + + + + // Internally used + enum state { + FREE, + USED, + SPLIT + } + + // Internally used to keep track of buffer state + class BuddyInfo { + ByteBuffer buffer; + BuddyInfo parent; + BuddyInfo sibling; + state s; + int size; + int lkey; + public String toString() { + return new String("Size= " + size + ", state = " + + (s == state.FREE ? "FREE": s == state.USED ? "USED" : "SPLIT") + + ", address = " + MemoryUtils.getAddress(buffer) + + ", capacity = " + buffer.capacity() + + ", lkey = " + lkey); + } + } + + class PdMemPool { + HashMap> freeBuddies; + HashMap usedBuddies; + IbvPd pd; + + PdMemPool(IbvPd pd) { + this.pd = pd; + freeBuddies = new HashMap>(); + usedBuddies = new HashMap(); + } + } +} diff --git a/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java b/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java index 06fd8e6..b9989af 100644 --- a/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java +++ b/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java @@ -14,6 +14,7 @@ public class DaRPCServerEndpoint private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private DaRPCServerGroup group; + private int eventPoolSize; private ArrayBlockingQueue> eventPool; private ArrayBlockingQueue> lazyEvents; private int getClusterId; @@ -22,13 +23,15 @@ public DaRPCServerEndpoint(DaRPCServerGroup group, RdmaCmId idPriv, boolea super(group, idPriv, serverSide); this.group = group; this.getClusterId = group.newClusterId(); + this.eventPoolSize = Math.max(group.recvQueueSize(), group.sendQueueSize()); this.eventPool = new ArrayBlockingQueue>(group.recvQueueSize()); this.lazyEvents = new ArrayBlockingQueue>(group.recvQueueSize()); + } public void init() throws IOException { super.init(); - for(int i = 0; i < group.recvQueueSize(); i++){ + for(int i = 0; i < this.eventPoolSize; i++){ DaRPCServerEvent event = new DaRPCServerEvent(this, group.createRequest(), group.createResponse()); this.eventPool.add(event); diff --git a/src/main/java/com/ibm/darpc/DaRPCServerGroup.java b/src/main/java/com/ibm/darpc/DaRPCServerGroup.java index 8b77a52..d0a0f47 100644 --- a/src/main/java/com/ibm/darpc/DaRPCServerGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCServerGroup.java @@ -27,14 +27,17 @@ public class DaRPCServerGroup ex private int pollSize; private int clusterSize; - public static DaRPCServerGroup createServerGroup(DaRPCService rpcService, long[] clusterAffinities, int timeout, int maxinline, boolean polling, int recvQueue, int sendQueue, int pollSize, int clusterSize) throws Exception { - DaRPCServerGroup group = new DaRPCServerGroup(rpcService, clusterAffinities, timeout, maxinline, polling, recvQueue, sendQueue, pollSize, clusterSize); + public static DaRPCServerGroup createServerGroup(DaRPCService rpcService, DaRPCMemPool memPool, long[] clusterAffinities, int timeout, int maxinline, boolean polling, + int recvQueue, int sendQueue, int pollSize, int clusterSize) throws Exception { + DaRPCServerGroup group = new DaRPCServerGroup(rpcService, memPool, clusterAffinities, timeout, maxinline, polling, + recvQueue, sendQueue, pollSize, clusterSize); group.init(new RpcServerFactory(group)); return group; } - private DaRPCServerGroup(DaRPCService rpcService, long[] clusterAffinities, int timeout, int maxinline, boolean polling, int recvQueue, int sendQueue, int pollSize, int clusterSize) throws Exception { - super(rpcService, timeout, maxinline, recvQueue, sendQueue); + private DaRPCServerGroup(DaRPCService rpcService, DaRPCMemPool memPool,long[] clusterAffinities, int timeout, int maxinline, + boolean polling, int recvQueue, int sendQueue, int pollSize, int clusterSize) throws Exception { + super(rpcService, memPool, timeout, maxinline, recvQueue, sendQueue); this.rpcService = rpcService; deviceInstance = new ConcurrentHashMap>(); diff --git a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java index fb5a8c3..8b049c7 100644 --- a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java +++ b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java @@ -42,6 +42,7 @@ import com.ibm.darpc.DaRPCClientGroup; import com.ibm.darpc.DaRPCEndpoint; import com.ibm.darpc.DaRPCFuture; +import com.ibm.darpc.DaRPCMemPool; import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.examples.protocol.RdmaRpcProtocol; import com.ibm.darpc.examples.protocol.RdmaRpcRequest; @@ -280,8 +281,9 @@ public void launch(String[] args) throws Exception { ClientThread[] benchmarkTask = new ClientThread[threadCount]; RdmaRpcProtocol rpcProtocol = new RdmaRpcProtocol(); + DaRPCMemPool memPool = new DaRPCMemPool("", 0, -1, -1, -1); System.out.println("starting.. threads " + threadCount + ", connections " + connections + ", server " + ipAddress + ", recvQueue " + recvQueue + ", sendQueue" + sendQueue + ", batchSize " + batchSize + ", mode " + mode); - DaRPCClientGroup group = DaRPCClientGroup.createClientGroup(rpcProtocol, 100, maxinline, recvQueue, sendQueue); + DaRPCClientGroup group = DaRPCClientGroup.createClientGroup(rpcProtocol, memPool, 100, maxinline, recvQueue, sendQueue); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); int k = 0; for (int i = 0; i < rpcConnections.length; i++){ diff --git a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java index 57efef4..6ee41a3 100644 --- a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java +++ b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java @@ -33,6 +33,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import com.ibm.darpc.DaRPCMemPool; import com.ibm.darpc.DaRPCServerEndpoint; import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.examples.protocol.RdmaRpcRequest; @@ -58,7 +59,8 @@ public void run() throws Exception{ } System.out.println("running...server " + ipAddress + ", poolsize " + poolsize + ", maxinline " + maxinline + ", polling " + polling + ", recvQueue " + recvQueue + ", sendQueue " + sendQueue + ", wqSize " + wqSize + ", rpcservice-timeout " + servicetimeout); RdmaRpcService rpcService = new RdmaRpcService(servicetimeout); - DaRPCServerGroup group = DaRPCServerGroup.createServerGroup(rpcService, clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32); + DaRPCMemPool memPool = new DaRPCMemPool(null, 0, -1, -1, -1); + DaRPCServerGroup group = DaRPCServerGroup.createServerGroup(rpcService, memPool, clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32); RdmaServerEndpoint> serverEp = group.createServerEndpoint(); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); serverEp.bind(uri); From cc896b6fc0d9e06761079b4dd1a50d2c46e3b31d Mon Sep 17 00:00:00 2001 From: Adrian Schuepbach Date: Mon, 11 Dec 2017 13:06:12 +0100 Subject: [PATCH 2/6] Added MemPool interface and added a simple version of the mempool. --- .../java/com/ibm/darpc/DaRPCEndpoint.java | 44 +- .../com/ibm/darpc/DaRPCEndpointGroup.java | 33 +- src/main/java/com/ibm/darpc/DaRPCMemPool.java | 432 +--------------- .../com/ibm/darpc/DaRPCMemPoolImplBuddy.java | 472 ++++++++++++++++++ .../com/ibm/darpc/DaRPCMemPoolImplSimple.java | 97 ++++ .../com/ibm/darpc/DaRPCServerEndpoint.java | 30 +- .../darpc/examples/client/DaRPCClient.java | 61 +-- .../darpc/examples/server/DaRPCServer.java | 19 +- 8 files changed, 671 insertions(+), 517 deletions(-) create mode 100644 src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java create mode 100644 src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java index d432a55..4198df7 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java @@ -40,10 +40,10 @@ public abstract class DaRPCEndpoint extends RdmaEndpoint { private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private static final int headerSize = 4; - + public abstract void dispatchReceive(ByteBuffer buffer, int ticket, int recvIndex) throws IOException; public abstract void dispatchSend(int ticket) throws IOException; - + private DaRPCEndpointGroup, R, T> rpcGroup; private ByteBuffer dataBuffer; private int lkey; @@ -56,15 +56,15 @@ public abstract class DaRPCEndpoint pendingPostSend; private ArrayBlockingQueue freePostSend; private AtomicLong ticketCount; - private int sendPipelineLength; - private int recvPipelineLength; + final private int sendPipelineLength; + final private int recvPipelineLength; private int payloadSize; private int rawBufferSize; private int maxinline; private AtomicLong messagesSent; private AtomicLong messagesReceived; - - + + public DaRPCEndpoint(DaRPCEndpointGroup, R, T> endpointGroup, RdmaCmId idPriv, boolean serverSide) throws IOException { super(endpointGroup, idPriv, serverSide); this.rpcGroup = endpointGroup; @@ -85,7 +85,7 @@ public DaRPCEndpoint(DaRPCEndpointGroup, R, T> endp logger.info("RPC client endpoint, with payload buffer size = " + payloadSize + ", send pipeline " + sendPipelineLength + ", receive pipeline " + recvPipelineLength); } - + public void init() throws IOException { int sendBufferOffset = recvPipelineLength * rawBufferSize; @@ -132,16 +132,16 @@ public void init() throws IOException { public synchronized void close() throws IOException, InterruptedException { rpcGroup.freeBuffer(this, dataBuffer); super.close(); - } - + } + public long getMessagesSent() { return messagesSent.get(); } - + public long getMessagesReceived() { return messagesReceived.get(); } - + protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOException { SVCPostSend postSend = freePostSend.poll(); if (postSend != null){ @@ -153,7 +153,7 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti postSend.getWrMod(0).setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); if (written <= maxinline) { postSend.getWrMod(0).setSend_flags(postSend.getWrMod(0).getSend_flags() | IbvSendWR.IBV_SEND_INLINE); - } + } pendingPostSend.put(ticket, postSend); postSend.execute(); messagesSent.incrementAndGet(); @@ -162,27 +162,27 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti return false; } } - + protected void postRecv(int index) throws IOException { recvCall[index].execute(); - } - + } + public void freeSend(int ticket) throws IOException { SVCPostSend sendOperation = pendingPostSend.remove(ticket); if (sendOperation == null) { throw new IOException("no pending ticket " + ticket + ", current ticket count " + ticketCount.get()); } this.freePostSend.add(sendOperation); - } - + } + public void dispatchCqEvent(IbvWC wc) throws IOException { if (wc.getStatus() == 5){ //flush return; } else if (wc.getStatus() != 0){ throw new IOException("Faulty operation! wc.status " + wc.getStatus()); - } - + } + if (wc.getOpcode() == 128){ //receiving a message int index = (int) wc.getWr_id(); @@ -198,9 +198,9 @@ public void dispatchCqEvent(IbvWC wc) throws IOException { dispatchSend(ticket); } else { throw new IOException("Unkown opcode " + wc.getOpcode()); - } - } - + } + } + private SVCPostSend setupSendTask(int wrid) throws IOException { ArrayList sendWRs = new ArrayList(1); LinkedList sgeList = new LinkedList(); diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java b/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java index 45048f3..01429f9 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java @@ -32,18 +32,18 @@ public abstract class DaRPCEndpointGroup, R extends DaRPCMessage, T extends DaRPCMessage> extends RdmaEndpointGroup { private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private static int DARPC_VERSION = 50; - + private int recvQueueSize; private int sendQueueSize; private int timeout; private int bufferSize; private int maxInline; private DaRPCMemPool memPool; - + public static int getVersion(){ return DARPC_VERSION; - } - + } + protected DaRPCEndpointGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { super(timeout); this.recvQueueSize = recvQueue; @@ -52,8 +52,8 @@ protected DaRPCEndpointGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, this.bufferSize = Math.max(protocol.createRequest().size(), protocol.createResponse().size()); this.maxInline = maxinline; this.memPool = memPool; - } - + } + protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IOException{ IbvQPInitAttr attr = new IbvQPInitAttr(); attr.cap().setMax_recv_wr(recvQueueSize); @@ -63,33 +63,34 @@ protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IO attr.cap().setMax_inline_data(maxInline); attr.setQp_type(IbvQP.IBV_QPT_RC); attr.setRecv_cq(cq); - attr.setSend_cq(cq); + attr.setSend_cq(cq); IbvQP qp = id.createQP(pd, attr); return qp; } - + public int getTimeout() { return timeout; } - + public int getBufferSize() { return bufferSize; - } + } + public void close() throws IOException, InterruptedException { super.close(); memPool.close(); logger.info("rpc group down"); - } - + } + public int recvQueueSize() { return recvQueueSize; } - + public int sendQueueSize() { return sendQueueSize; - } - + } + public int getMaxInline() { return maxInline; } @@ -98,7 +99,7 @@ ByteBuffer getWRBuffer(RdmaEndpoint endpoint, int size) throws Exception { return memPool.getBuffer(endpoint, size); } - void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { + void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException { memPool.freeBuffer(endpoint, b); } diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPool.java b/src/main/java/com/ibm/darpc/DaRPCMemPool.java index 8272c03..e7d6ea1 100644 --- a/src/main/java/com/ibm/darpc/DaRPCMemPool.java +++ b/src/main/java/com/ibm/darpc/DaRPCMemPool.java @@ -1,435 +1,13 @@ package com.ibm.darpc; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; import com.ibm.disni.rdma.RdmaEndpoint; -import com.ibm.disni.rdma.verbs.IbvMr; -import com.ibm.disni.rdma.verbs.IbvPd; -import com.ibm.disni.util.MemoryUtils; -public class DaRPCMemPool { - final int defaultAllocationSize = 16 * 1024 * 1024; // 16MB - final int defaultMinAllocationSize = 4 * 1024; // 4KB - final int defaultAlignmentSize = 4 * 1024; // 4KB - final int defaultHugePageLimit = 0; // no huge pages by default - - private HashMap pdMemPool; // One buddy allocator per protection domain - private LinkedList mrs; - private boolean isOpen; - - long currentRegion = 0; - private File dir; - protected int access; // RDMA access flags to use when registering memory regions - private long allocatedHugePageMemory; - - // Configurable values - String hugePagePath = null; - protected int allocationSize; - protected int minAllocationSize; - protected int alignmentSize; - protected int hugePageLimit; - - - - - - public DaRPCMemPool(String hugePagePath, int hugePageLimit, int allocationSize, int minAllocationSize, int alignmentSize) { - isOpen = false; - this.allocationSize = defaultAllocationSize; - this.minAllocationSize = defaultMinAllocationSize; - this.alignmentSize = defaultAlignmentSize; - this.hugePageLimit = defaultHugePageLimit; - this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; - - this.hugePagePath = hugePagePath; - - allocatedHugePageMemory = 0; - - if (allocationSize >= 0) { - this.allocationSize = allocationSize; - } - if (minAllocationSize >= 0) { - this.minAllocationSize = minAllocationSize; - } - if (alignmentSize >= 0) { - this.alignmentSize = alignmentSize; - } - if (hugePageLimit >= 0) { - this.hugePageLimit = hugePageLimit; - } - if ((hugePagePath != null) && (!hugePagePath.equals("")) && (this.hugePageLimit > 0)) { - dir = new File(hugePagePath); - if (!dir.exists()){ - dir.mkdirs(); - } - for (File child : dir.listFiles()) { - child.delete(); - } - } else { - this.hugePageLimit = 0; - } - pdMemPool = new HashMap(); - mrs = new LinkedList(); - isOpen = true; - } - - public synchronized void close() { - if (isOpen) { - isOpen = false; - pdMemPool = null; - for (Iterator it = mrs.iterator(); it.hasNext(); ) { - IbvMr m = it.next(); - try { - m.deregMr().execute().free(); - } catch (IOException e) { - System.out.println("Could not unregister memory region."); - e.printStackTrace(); - } - } - mrs = null; - } - - // If hugepages were used, clean and delete the created files and the directory. - if (hugePageLimit > 0) { - if (dir.exists()) { - for (File child : dir.listFiles()) { - child.delete(); - } - dir.delete(); - } - } - } - - public void finalize() { - // Just in case the user did not do that. - close(); - } - - - // the next two methods allocate a buffer from the OS. The first one - // allocates from the regular heap and the second one from huge pages. - // These are two alternatives. If hugepages is configured by the user, - // memory will first be allocated from huge pages and after reacing the limit, - // more memory will be allocated from the regular heap. - - // allocate a buffer from the regular heap - ByteBuffer allocateHeapBuffer() { - ByteBuffer byteBuffer; - - if (alignmentSize > 1) { - ByteBuffer rawBuffer = ByteBuffer.allocateDirect(allocationSize + alignmentSize); - long rawBufferAddress = ((sun.nio.ch.DirectBuffer)rawBuffer).address(); - long alignmentOffset = rawBufferAddress % alignmentSize; - if (alignmentOffset != 0) { - rawBuffer.position(alignmentSize - (int)alignmentOffset); - } - byteBuffer = rawBuffer.slice(); - - } else { - byteBuffer = ByteBuffer.allocateDirect(allocationSize); - } - return (byteBuffer); - } - - // allocate a buffer from hugepages - ByteBuffer allocateHugePageBuffer() throws IOException { - String path = hugePagePath + "/" + currentRegion++ + ".mem"; - RandomAccessFile randomFile = null; - try { - randomFile = new RandomAccessFile(path, "rw"); - } catch (FileNotFoundException e) { - System.out.println("Path " + path + " to huge page directory not found."); - throw e; - } - try { - randomFile.setLength(allocationSize); - } catch (IOException e) { - System.out.println("Coult not set allocation length of mapped random access file on huge page directory."); - randomFile.close(); - throw e; - } - FileChannel channel = randomFile.getChannel(); - MappedByteBuffer mappedBuffer = null; - try { - mappedBuffer = channel.map(MapMode.READ_WRITE, 0, - allocationSize); - } catch (IOException e) { - System.out.println("Could not map the huge page file on path " + path); - randomFile.close(); - throw e; - } - randomFile.close(); - allocatedHugePageMemory += allocationSize; - return (mappedBuffer); - } - - - // Add a new chunk and register it with the IB device. - // This adds a new "root" to the buddy tree. - protected void addNewBuddy(PdMemPool pdm) throws IOException { - BuddyInfo bi = new BuddyInfo(); - - if ((allocatedHugePageMemory + allocationSize) < hugePageLimit) { - bi.buffer = allocateHugePageBuffer(); - } else { - bi.buffer = allocateHeapBuffer(); - } - // Register buffer with IB card - IbvMr mr = pdm.pd.regMr(bi.buffer, access).execute().free().getMr(); - mrs.addLast(mr); - - bi.s = state.FREE; - bi.size = allocationSize; - bi.parent = null; - bi.sibling = null; - bi.lkey = mr.getLkey(); - - if (pdm.freeBuddies.get(allocationSize) == null) { - pdm.freeBuddies.put(allocationSize, new LinkedList()); - } - pdm.freeBuddies.get(allocationSize).add(bi); - } - - protected boolean split(PdMemPool pdm, int size) { - if (size > allocationSize) { - return false; - } - if (!pdm.freeBuddies.containsKey(size)) { - if (!split(pdm, size << 1)) { - // no free buddy, which could be split - return false; - } - } - LinkedList l = pdm.freeBuddies.get(size); - if (l == null) { - return false; - } - BuddyInfo bi = l.removeFirst(); - if (l.size() == 0) { - pdm.freeBuddies.remove(size); - } - bi.s = state.SPLIT; - bi.buffer.position(0); - bi.buffer.limit(size >> 1); - ByteBuffer b1 = bi.buffer.slice(); - bi.buffer.position(size >> 1); - bi.buffer.limit(size); - ByteBuffer b2 = bi.buffer.slice(); - - BuddyInfo bi1 = new BuddyInfo(); - BuddyInfo bi2 = new BuddyInfo(); - bi1.buffer = b1; - bi1.s = state.FREE; - bi1.size = (size >> 1); - bi1.parent = bi; - bi1.sibling = bi2; - bi1.lkey = bi.lkey; - - bi2.buffer = b2; - bi2.s = state.FREE; - bi2.size = (size >> 1); - bi2.parent = bi; - bi2.sibling = bi1; - bi2.lkey = bi.lkey; - - if (pdm.freeBuddies.get(size >> 1) == null) { - pdm.freeBuddies.put(size >> 1, new LinkedList()); - } - pdm.freeBuddies.get(size >> 1).add(bi1); - pdm.freeBuddies.get(size >> 1).add(bi2); - - return true; - } - - - protected ByteBuffer getPower2Buffer(PdMemPool pdm, int size) { - if (!pdm.freeBuddies.containsKey(size)) { - if (!split(pdm, size << 1)) { - // no free buddy, which could be split - return null; - } - } - LinkedList l = pdm.freeBuddies.get(size); - if (l == null) { - return null; - } - BuddyInfo bi = l.removeFirst(); - if (l.size() == 0) { - pdm.freeBuddies.remove(size); - } - bi.s = state.USED; - pdm.usedBuddies.put(MemoryUtils.getAddress(bi.buffer), bi); - return bi.buffer; - } - - synchronized ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { - int i = minAllocationSize; - - if (!pdMemPool.containsKey(endpoint.getPd())) { - pdMemPool.put(endpoint.getPd(), new PdMemPool(endpoint.getPd())); - } - PdMemPool pdm = pdMemPool.get(endpoint.getPd()); - - while(size > i) { - i <<= 1; - } - - ByteBuffer b = getPower2Buffer(pdm, i); - if (b == null) { - addNewBuddy(pdm); - b = getPower2Buffer(pdm, i); - } - return (b); - } - - protected void merge(PdMemPool pdm, BuddyInfo bi) { - if (bi.sibling != null) { - if (bi.sibling.s == state.FREE) { - BuddyInfo parent = bi.parent; - parent.s = state.FREE; - if (pdm.freeBuddies.get(parent.size) == null) { - pdm.freeBuddies.put(parent.size, new LinkedList()); - } - pdm.freeBuddies.get(parent.size).add(parent); - pdm.freeBuddies.get(bi.size).remove(bi.sibling); - pdm.freeBuddies.get(bi.size).remove(bi); - if (pdm.freeBuddies.get(bi.size).size() == 0) { - pdm.freeBuddies.remove(bi.size); - } - merge(pdm, parent); - } - } - } - synchronized void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { - if (b == null) { - return; - } - PdMemPool pdm = pdMemPool.get(endpoint.getPd()); - BuddyInfo bi = pdm.usedBuddies.remove(MemoryUtils.getAddress(b)); - // Buffer is not in the used list. Cannot free. - if (bi == null) { - return; - } - bi.s = state.FREE; - if (pdm.freeBuddies.get(bi.size) == null) { - pdm.freeBuddies.put(bi.size, new LinkedList()); - } - pdm.freeBuddies.get(bi.size).add(bi); - merge(pdm, bi); - } - - int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { - if (b == null) { - System.out.println("getLKey(): Argument buffer is null. Cannot return lkey."); - throw new IllegalArgumentException("getLKey(): Argument buffer is null. Cannot return lkey."); - } - PdMemPool pdm = pdMemPool.get(endpoint.getPd()); - BuddyInfo bi = pdm.usedBuddies.get(MemoryUtils.getAddress(b)); - if (bi != null) { - return bi.lkey; - } else { - System.out.println("getLKey(): This buffer is not allocated. Cannot return lkey."); - throw new IllegalArgumentException("getLKey(): This buffer is not allocated. Cannot return lkey."); - } - } - - void setAllocationSize(int size) { - allocationSize = size; - } - - void setMinAllocationSize(int size) { - minAllocationSize = size; - } - - void setAlignment(int size) { - alignmentSize = size; - } - - int getAllocationSize() { - return allocationSize; - } - - int getMinAllocationSize(int size) { - return minAllocationSize; - } - - int getAlignment(int size) { - return alignmentSize; - } - - void printBuddies() { - System.out.println("Free buddies:\n============"); - for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { - PdMemPool pdm = itpd.next(); - for (Iterator it = pdm.freeBuddies.keySet().iterator(); it.hasNext(); ) { - Integer size = it.next(); - System.out.println("Size: " + size); - LinkedList l = pdm.freeBuddies.get(size); - if (l != null) { - for (Iterator it2 = l.iterator(); it2.hasNext(); ) { - BuddyInfo bi = it2.next(); - System.out.println(bi); - } - } - } - } - System.out.println("============\n"); - - System.out.println("Used buddies:\n============"); - for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { - PdMemPool pdm = itpd.next(); - for (Iterator it = pdm.usedBuddies.values().iterator(); it.hasNext(); ) { - System.out.println(it.next()); - } - } - System.out.println("============\n"); - } - - - - // Internally used - enum state { - FREE, - USED, - SPLIT - } - - // Internally used to keep track of buffer state - class BuddyInfo { - ByteBuffer buffer; - BuddyInfo parent; - BuddyInfo sibling; - state s; - int size; - int lkey; - public String toString() { - return new String("Size= " + size + ", state = " - + (s == state.FREE ? "FREE": s == state.USED ? "USED" : "SPLIT") - + ", address = " + MemoryUtils.getAddress(buffer) - + ", capacity = " + buffer.capacity() - + ", lkey = " + lkey); - } - } - - class PdMemPool { - HashMap> freeBuddies; - HashMap usedBuddies; - IbvPd pd; - - PdMemPool(IbvPd pd) { - this.pd = pd; - freeBuddies = new HashMap>(); - usedBuddies = new HashMap(); - } - } +public interface DaRPCMemPool { + void close() throws IOException; + ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException; + void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException; + public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException; } diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java new file mode 100644 index 0000000..8d6b0c8 --- /dev/null +++ b/src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java @@ -0,0 +1,472 @@ +package com.ibm.darpc; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import com.ibm.disni.rdma.RdmaEndpoint; +import com.ibm.disni.rdma.verbs.IbvMr; +import com.ibm.disni.rdma.verbs.IbvPd; +import com.ibm.disni.util.MemoryUtils; + +public class DaRPCMemPoolImplBuddy implements DaRPCMemPool { + final static int defaultAllocationSize = 16 * 1024 * 1024; // 16MB + final static int defaultMinAllocationSize = 4 * 1024; // 4KB + final static int defaultAlignmentSize = 4 * 1024; // 4KB + final static int defaultHugePageLimit = 0; // no huge pages by default + + private HashMap pdMemPool; // One buddy allocator per protection domain + private List mrs; + private boolean isOpen; + + long currentRegion = 0; + private File dir; + protected int access; // RDMA access flags to use when registering memory regions + private long allocatedHugePageMemory; + + // Configurable values + String hugePagePath = null; + protected int allocationSize; + protected int minAllocationSize; + protected int alignmentSize; + protected int hugePageLimit; + + + + + public DaRPCMemPoolImplBuddy() throws IOException { + isOpen = false; + this.allocationSize = defaultAllocationSize; + this.minAllocationSize = defaultMinAllocationSize; + this.alignmentSize = defaultAlignmentSize; + this.hugePageLimit = defaultHugePageLimit; + + init(); + } + + public DaRPCMemPoolImplBuddy(String hugePagePath, int hugePageLimit, int allocationSize, int minAllocationSize, int alignmentSize) throws IOException { + isOpen = false; + this.hugePagePath = hugePagePath; + this.allocationSize = allocationSize; + this.minAllocationSize = minAllocationSize; + this.alignmentSize = alignmentSize; + this.hugePageLimit = hugePageLimit; + + init(); + } + + protected void init() throws IOException { + allocatedHugePageMemory = 0; + this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + + if ((this.hugePageLimit > 0) && (hugePagePath == null)) { + throw new IOException("hugePageLimit is > 0 (" + this.hugePageLimit + "), but no hugepage path given."); + } + + if (hugePagePath != null) { + dir = new File(hugePagePath); + if (!dir.exists()){ + dir.mkdirs(); + } + for (File child : dir.listFiles()) { + child.delete(); + } + } + + pdMemPool = new HashMap(); + mrs = new ArrayList(); + isOpen = true; + } + + +// public API + @Override + public void close() throws IOException { + cleanup(); + } + @Override + public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { + return getBufferImpl(endpoint, size); + } + @Override + public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { + freeBufferImpl(endpoint, b); + } + @Override + public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { + if (b == null) { + System.out.println("getLKey(): Argument buffer is null. Cannot return lkey."); + throw new IllegalArgumentException("getLKey(): Argument buffer is null. Cannot return lkey."); + } + PdMemPool pdm = pdMemPool.get(endpoint.getPd()); + BuddyInfo bi = pdm.usedBuddies.get(MemoryUtils.getAddress(b)); + if (bi != null) { + return bi.lkey; + } else { + System.out.println("getLKey(): This buffer is not allocated. Cannot return lkey."); + throw new IllegalArgumentException("getLKey(): This buffer is not allocated. Cannot return lkey."); + } + } + + + + + synchronized void cleanup() { + if (isOpen) { + isOpen = false; + pdMemPool = null; + for (IbvMr m : mrs) { + try { + m.deregMr().execute().free(); + } catch (IOException e) { + System.out.println("Could not unregister memory region."); + e.printStackTrace(); + } + } + mrs = null; + } + + // If hugepages were used, clean and delete the created files and the directory. + if (hugePageLimit > 0) { + if (dir.exists()) { + for (File child : dir.listFiles()) { + child.delete(); + } + dir.delete(); + } + } + } + + @Override + public void finalize() { + // Just in case the user did not do that. + try { + close(); + } catch (Exception e) { + System.out.println("MemoryPoolImplBuddy: Could not finalize() the memory pool"); + e.printStackTrace(); + } + } + + + // the next two methods allocate a buffer from the OS. The first one + // allocates from the regular heap and the second one from huge pages. + // These are two alternatives. If hugepages is configured by the user, + // memory will first be allocated from huge pages and after reacing the limit, + // more memory will be allocated from the regular heap. + + // allocate a buffer from the regular heap + ByteBuffer allocateHeapBuffer() { + ByteBuffer byteBuffer; + + if (alignmentSize > 1) { + ByteBuffer rawBuffer = ByteBuffer.allocateDirect(allocationSize + alignmentSize); + long rawBufferAddress = MemoryUtils.getAddress(rawBuffer); + long alignmentOffset = rawBufferAddress % alignmentSize; + if (alignmentOffset != 0) { + rawBuffer.position(alignmentSize - (int)alignmentOffset); + } + byteBuffer = rawBuffer.slice(); + + } else { + byteBuffer = ByteBuffer.allocateDirect(allocationSize); + } + return (byteBuffer); + } + + // allocate a buffer from hugepages + ByteBuffer allocateHugePageBuffer() throws IOException { + String path = hugePagePath + "/" + currentRegion++ + ".mem"; + RandomAccessFile randomFile = null; + try { + randomFile = new RandomAccessFile(path, "rw"); + } catch (FileNotFoundException e) { + System.out.println("Path " + path + " to huge page directory not found."); + throw e; + } + try { + randomFile.setLength(allocationSize + alignmentSize); + } catch (IOException e) { + System.out.println("Coult not set allocation length of mapped random access file on huge page directory."); + randomFile.close(); + throw e; + } + FileChannel channel = randomFile.getChannel(); + MappedByteBuffer mappedBuffer = null; + try { + mappedBuffer = channel.map(MapMode.READ_WRITE, 0, + allocationSize); + } catch (IOException e) { + System.out.println("Could not map the huge page file on path " + path); + randomFile.close(); + throw e; + } + randomFile.close(); + allocatedHugePageMemory += (allocationSize + alignmentSize); + + long rawBufferAddress = MemoryUtils.getAddress(mappedBuffer); + long alignmentOffset = rawBufferAddress % alignmentSize; + if (alignmentOffset != 0) { + mappedBuffer.position(alignmentSize - (int)alignmentOffset); + } + ByteBuffer b = mappedBuffer.slice(); + return (b); + } + + + // Add a new chunk and register it with the IB device. + // This adds a new "root" to the buddy tree. + protected void addNewBuddy(PdMemPool pdm) throws IOException { + BuddyInfo bi = new BuddyInfo(); + + if ((allocatedHugePageMemory + allocationSize) < hugePageLimit) { + bi.buffer = allocateHugePageBuffer(); + } else { + bi.buffer = allocateHeapBuffer(); + } + // Register buffer with IB card + IbvMr mr = pdm.pd.regMr(bi.buffer, access).execute().free().getMr(); + mrs.add(mr); + + bi.s = state.FREE; + bi.size = allocationSize; + bi.parent = null; + bi.sibling = null; + bi.lkey = mr.getLkey(); + + if (pdm.freeBuddies.get(allocationSize) == null) { + pdm.freeBuddies.put(allocationSize, new LinkedList()); + } + pdm.freeBuddies.get(allocationSize).add(bi); + } + + protected boolean split(PdMemPool pdm, int size) { + if (size > allocationSize) { + return false; + } + if (!pdm.freeBuddies.containsKey(size)) { + if (!split(pdm, size << 1)) { + // no free buddy, which could be split + return false; + } + } + LinkedList l = pdm.freeBuddies.get(size); + if (l == null) { + return false; + } + BuddyInfo bi = l.removeFirst(); + if (l.size() == 0) { + pdm.freeBuddies.remove(size); + } + bi.s = state.SPLIT; + bi.buffer.position(0); + bi.buffer.limit(size >> 1); + ByteBuffer b1 = bi.buffer.slice(); + bi.buffer.position(size >> 1); + bi.buffer.limit(size); + ByteBuffer b2 = bi.buffer.slice(); + + BuddyInfo bi1 = new BuddyInfo(); + BuddyInfo bi2 = new BuddyInfo(); + bi1.buffer = b1; + bi1.s = state.FREE; + bi1.size = (size >> 1); + bi1.parent = bi; + bi1.sibling = bi2; + bi1.lkey = bi.lkey; + + bi2.buffer = b2; + bi2.s = state.FREE; + bi2.size = (size >> 1); + bi2.parent = bi; + bi2.sibling = bi1; + bi2.lkey = bi.lkey; + + if (pdm.freeBuddies.get(size >> 1) == null) { + pdm.freeBuddies.put(size >> 1, new LinkedList()); + } + pdm.freeBuddies.get(size >> 1).add(bi1); + pdm.freeBuddies.get(size >> 1).add(bi2); + + return true; + } + + + protected ByteBuffer getPower2Buffer(PdMemPool pdm, int size) { + if (!pdm.freeBuddies.containsKey(size)) { + if (!split(pdm, size << 1)) { + // no free buddy, which could be split + return null; + } + } + LinkedList l = pdm.freeBuddies.get(size); + if (l == null) { + return null; + } + BuddyInfo bi = l.removeFirst(); + if (l.size() == 0) { + pdm.freeBuddies.remove(size); + } + bi.s = state.USED; + pdm.usedBuddies.put(MemoryUtils.getAddress(bi.buffer), bi); + return bi.buffer; + } + + synchronized ByteBuffer getBufferImpl(RdmaEndpoint endpoint, int size) throws IOException { + int i = minAllocationSize; + + if (!pdMemPool.containsKey(endpoint.getPd())) { + pdMemPool.put(endpoint.getPd(), new PdMemPool(endpoint.getPd())); + } + PdMemPool pdm = pdMemPool.get(endpoint.getPd()); + + while(size > i) { + i <<= 1; + } + + ByteBuffer b = getPower2Buffer(pdm, i); + if (b == null) { + addNewBuddy(pdm); + b = getPower2Buffer(pdm, i); + } + b.clear(); + return (b); + } + + protected void merge(PdMemPool pdm, BuddyInfo bi) { + if (bi.sibling != null) { + if (bi.sibling.s == state.FREE) { + BuddyInfo parent = bi.parent; + parent.s = state.FREE; + if (pdm.freeBuddies.get(parent.size) == null) { + pdm.freeBuddies.put(parent.size, new LinkedList()); + } + pdm.freeBuddies.get(parent.size).add(parent); + pdm.freeBuddies.get(bi.size).remove(bi.sibling); + pdm.freeBuddies.get(bi.size).remove(bi); + if (pdm.freeBuddies.get(bi.size).size() == 0) { + pdm.freeBuddies.remove(bi.size); + } + merge(pdm, parent); + } + } + } + synchronized void freeBufferImpl(RdmaEndpoint endpoint, ByteBuffer b) { + if (b == null) { + return; + } + PdMemPool pdm = pdMemPool.get(endpoint.getPd()); + BuddyInfo bi = pdm.usedBuddies.remove(MemoryUtils.getAddress(b)); + // Buffer is not in the used list. Cannot free. + if (bi == null) { + return; + } + bi.s = state.FREE; + if (pdm.freeBuddies.get(bi.size) == null) { + pdm.freeBuddies.put(bi.size, new LinkedList()); + } + pdm.freeBuddies.get(bi.size).add(bi); + merge(pdm, bi); + } + + void setAllocationSize(int size) { + allocationSize = size; + } + + void setMinAllocationSize(int size) { + minAllocationSize = size; + } + + void setAlignment(int size) { + alignmentSize = size; + } + + int getAllocationSize() { + return allocationSize; + } + + int getMinAllocationSize(int size) { + return minAllocationSize; + } + + int getAlignment(int size) { + return alignmentSize; + } + + void printBuddies() { + System.out.println("Free buddies:\n============"); + for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { + PdMemPool pdm = itpd.next(); + for (Iterator it = pdm.freeBuddies.keySet().iterator(); it.hasNext(); ) { + Integer size = it.next(); + System.out.println("Size: " + size); + LinkedList l = pdm.freeBuddies.get(size); + if (l != null) { + for (Iterator it2 = l.iterator(); it2.hasNext(); ) { + BuddyInfo bi = it2.next(); + System.out.println(bi); + } + } + } + } + System.out.println("============\n"); + + System.out.println("Used buddies:\n============"); + for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { + PdMemPool pdm = itpd.next(); + for (Iterator it = pdm.usedBuddies.values().iterator(); it.hasNext(); ) { + System.out.println(it.next()); + } + } + System.out.println("============\n"); + } + + + + // Internally used + enum state { + FREE, + USED, + SPLIT + } + + // Internally used to keep track of buffer state + class BuddyInfo { + ByteBuffer buffer; + BuddyInfo parent; + BuddyInfo sibling; + state s; + int size; + int lkey; + @Override + public String toString() { + return new String("Size= " + size + ", state = " + + (s == state.FREE ? "FREE": s == state.USED ? "USED" : "SPLIT") + + ", address = " + MemoryUtils.getAddress(buffer) + + ", capacity = " + buffer.capacity() + + ", lkey = " + lkey); + } + } + + class PdMemPool { + HashMap> freeBuddies; + HashMap usedBuddies; + IbvPd pd; + + PdMemPool(IbvPd pd) { + this.pd = pd; + freeBuddies = new HashMap>(); + usedBuddies = new HashMap(); + } + } +} diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java new file mode 100644 index 0000000..0465d8a --- /dev/null +++ b/src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java @@ -0,0 +1,97 @@ +package com.ibm.darpc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; + +import com.ibm.disni.rdma.RdmaEndpoint; +import com.ibm.disni.rdma.verbs.IbvMr; +import com.ibm.disni.rdma.verbs.IbvPd; +import com.ibm.disni.util.MemoryUtils; + +public class DaRPCMemPoolImplSimple implements DaRPCMemPool { + final int allocationSize; + final int alignmentSize; + int offset; + ByteBuffer byteBuffer; + IbvPd pd; + IbvMr mr; + int access; + LinkedList freeList; + + public DaRPCMemPoolImplSimple(int allocationSize, int alignmentSize) { + this.allocationSize = allocationSize; + this.alignmentSize = alignmentSize; + + ByteBuffer rawBuffer = ByteBuffer.allocateDirect(allocationSize + alignmentSize); + long rawBufferAddress = MemoryUtils.getAddress(rawBuffer); + long alignmentOffset = rawBufferAddress % alignmentSize; + if (alignmentOffset != 0) { + rawBuffer.position(alignmentSize - (int)alignmentOffset); + } + byteBuffer = rawBuffer.slice(); + this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + } + + @Override + public void close() throws IOException { + synchronized(this) { + try { + mr.deregMr().execute().free(); + } catch (IOException e) { + System.out.println("Could not unregister memory region."); + e.printStackTrace(); + } + } + } + + @Override + public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { + ByteBuffer r = null; + + synchronized(this) { + if (pd == null) { + pd = endpoint.getPd(); + } else if (!pd.equals(endpoint.getPd())) { + throw new IOException("No support for more than one PD"); + } + if (mr == null) { + mr = pd.regMr(byteBuffer, access).execute().free().getMr(); + } + + if (freeList == null) { + offset = size; + freeList = new LinkedList(); + int i = 0; + while ((i * offset + offset) < byteBuffer.capacity()) { + byteBuffer.position(i * offset); + byteBuffer.limit(i * offset + offset); + ByteBuffer b = byteBuffer.slice(); + freeList.addLast(b); + i++; + } + } + else + { + if (size != offset) { + throw new IOException("Requested size does not match block size managed by memory pool."); + } + } + r = freeList.removeFirst(); + r.clear(); + } + return r; + } + + @Override + public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { + synchronized(this) { + freeList.addLast(b); + } + } + + @Override + public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { + return mr.getLkey(); + } +} diff --git a/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java b/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java index b9989af..14ea773 100644 --- a/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java +++ b/src/main/java/com/ibm/darpc/DaRPCServerEndpoint.java @@ -12,13 +12,13 @@ public class DaRPCServerEndpoint extends DaRPCEndpoint { private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); - + private DaRPCServerGroup group; - private int eventPoolSize; + final private int eventPoolSize; private ArrayBlockingQueue> eventPool; private ArrayBlockingQueue> lazyEvents; private int getClusterId; - + public DaRPCServerEndpoint(DaRPCServerGroup group, RdmaCmId idPriv, boolean serverSide) throws IOException { super(group, idPriv, serverSide); this.group = group; @@ -29,23 +29,25 @@ public DaRPCServerEndpoint(DaRPCServerGroup group, RdmaCmId idPriv, boolea } + public void init() throws IOException { super.init(); for(int i = 0; i < this.eventPoolSize; i++){ DaRPCServerEvent event = new DaRPCServerEvent(this, group.createRequest(), group.createResponse()); this.eventPool.add(event); - + } } - + void sendResponse(DaRPCServerEvent event) throws IOException { if (sendMessage(event.getSendMessage(), event.getTicket())){ eventPool.add(event); } else { lazyEvents.add(event); } - } - + } + + public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent) throws IOException { super.dispatchCmEvent(cmEvent); try { @@ -56,16 +58,17 @@ public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent) throws IOException } else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal()) { logger.info("RPC disconnection, eid " + this.getEndpointId()); group.close(this); - } + } } catch (Exception e) { e.printStackTrace(); } - } + } public int clusterId() { return getClusterId; } - + + public void dispatchReceive(ByteBuffer recvBuffer, int ticket, int recvIndex) throws IOException { DaRPCServerEvent event = eventPool.poll(); if (event == null){ @@ -75,11 +78,12 @@ public void dispatchReceive(ByteBuffer recvBuffer, int ticket, int recvIndex) th event.getReceiveMessage().update(recvBuffer); event.stamp(ticket); postRecv(recvIndex); - group.processServerEvent(event); + group.processServerEvent(event); } - + + public void dispatchSend(int ticket) throws IOException { - freeSend(ticket); + freeSend(ticket); DaRPCServerEvent event = lazyEvents.poll(); if (event != null){ sendResponse(event); diff --git a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java index 8b049c7..268a406 100644 --- a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java +++ b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java @@ -43,6 +43,7 @@ import com.ibm.darpc.DaRPCEndpoint; import com.ibm.darpc.DaRPCFuture; import com.ibm.darpc.DaRPCMemPool; +import com.ibm.darpc.DaRPCMemPoolImplBuddy; import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.examples.protocol.RdmaRpcProtocol; import com.ibm.darpc.examples.protocol.RdmaRpcRequest; @@ -52,8 +53,8 @@ public class DaRPCClient { public static enum BenchmarkType { UNDEFINED - }; - + }; + public static class ClientThread implements Runnable { public static final int FUTURE_POLL = 0; public static final int STREAM_POLL = 1; @@ -61,19 +62,19 @@ public static class ClientThread implements Runnable { public static final int STREAM_TAKE = 3; public static final int BATCH_STREAM_TAKE = 4; public static final int BATCH_STREAM_POLL = 5; - + private DaRPCClientEndpoint clientEp; private int loop; private int queryMode; private int clienttimeout; private ArrayBlockingQueue freeResponses; - + protected double throughput; protected double latency; protected double readOps; protected double writeOps; - protected double errorOps; - + protected double errorOps; + public ClientThread(DaRPCClientEndpoint clientEp, int loop, URI uri, int mode, int rpcpipeline, int clienttimeout){ this.clientEp = clientEp; this.loop = loop; @@ -83,9 +84,9 @@ public ClientThread(DaRPCClientEndpoint clientE for (int i = 0; i < rpcpipeline; i++){ RdmaRpcResponse response = new RdmaRpcResponse(); freeResponses.add(response); - } + } } - + @Override public void run() { try { @@ -98,15 +99,15 @@ public void run() { while(freeResponses.isEmpty()){ DaRPCFuture future = stream.poll(); if (future != null){ - freeResponses.add(future.getReceiveMessage()); + freeResponses.add(future.getReceiveMessage()); consumed++; } } - + request.setParam(issued); RdmaRpcResponse response = freeResponses.poll(); DaRPCFuture future = stream.request(request, response, streamMode); - + switch (queryMode) { case FUTURE_POLL: while (!future.isDone()) { @@ -122,12 +123,12 @@ public void run() { } consumed++; freeResponses.add(future.getReceiveMessage()); - break; + break; case FUTURE_TAKE: future.get(clienttimeout, TimeUnit.MILLISECONDS); consumed++; freeResponses.add(future.getReceiveMessage()); - break; + break; case STREAM_TAKE: future = stream.take(clienttimeout); consumed++; @@ -136,7 +137,7 @@ public void run() { case BATCH_STREAM_TAKE: break; case BATCH_STREAM_POLL: - break; + break; } } while (consumed < issued){ @@ -153,7 +154,7 @@ public void run() { public void close() throws Exception { clientEp.close(); } - + public double getThroughput() { return throughput; } @@ -172,15 +173,15 @@ public double getWriteOps() { public double getErrorOps() { return this.errorOps; - } - + } + public double getOps(){ return loop; - } + } } - + public void launch(String[] args) throws Exception { - String ipAddress = ""; + String ipAddress = ""; int size = 24; int loop = 100; int threadCount = 1; @@ -274,14 +275,14 @@ public void launch(String[] args) throws Exception { if ((threadCount % connections) != 0){ throw new Exception("thread count needs to be a multiple of connections"); } - + int threadsperconnection = threadCount / connections; DaRPCEndpoint[] rpcConnections = new DaRPCEndpoint[connections]; Thread[] workers = new Thread[threadCount]; ClientThread[] benchmarkTask = new ClientThread[threadCount]; - + RdmaRpcProtocol rpcProtocol = new RdmaRpcProtocol(); - DaRPCMemPool memPool = new DaRPCMemPool("", 0, -1, -1, -1); + DaRPCMemPool memPool = new DaRPCMemPoolImplBuddy(); System.out.println("starting.. threads " + threadCount + ", connections " + connections + ", server " + ipAddress + ", recvQueue " + recvQueue + ", sendQueue" + sendQueue + ", batchSize " + batchSize + ", mode " + mode); DaRPCClientGroup group = DaRPCClientGroup.createClientGroup(rpcProtocol, memPool, 100, maxinline, recvQueue, sendQueue); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); @@ -297,7 +298,7 @@ public void launch(String[] args) throws Exception { } StopWatch stopWatchThroughput = new StopWatch(); - stopWatchThroughput.start(); + stopWatchThroughput.start(); for(int i = 0; i < threadCount;i++){ workers[i] = new Thread(benchmarkTask[i]); workers[i].start(); @@ -321,7 +322,7 @@ public void launch(String[] args) throws Exception { double throughputperclient = throughput / _threadcount; double norm = 1.0; latency = norm / throughputperclient * 1000000.0; - } + } System.out.println("throughput " + throughput); String dataFilename = "datalog-client.dat"; @@ -338,18 +339,18 @@ public void launch(String[] args) throws Exception { + "\n"; ByteBuffer buffer = ByteBuffer.wrap(logdata.getBytes()); dataChannel.write(buffer); - dataChannel.close(); + dataChannel.close(); dataStream.close(); - + for (int i = 0; i < rpcConnections.length; i++){ rpcConnections[i].close(); } group.close(); } - - public static void main(String[] args) throws Exception { + + public static void main(String[] args) throws Exception { DaRPCClient rpcClient = new DaRPCClient(); - rpcClient.launch(args); + rpcClient.launch(args); System.exit(0); } } diff --git a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java index 6ee41a3..37e11fa 100644 --- a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java +++ b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java @@ -34,6 +34,7 @@ import org.apache.commons.cli.ParseException; import com.ibm.darpc.DaRPCMemPool; +import com.ibm.darpc.DaRPCMemPoolImplBuddy; import com.ibm.darpc.DaRPCServerEndpoint; import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.examples.protocol.RdmaRpcRequest; @@ -41,7 +42,7 @@ import com.ibm.disni.rdma.*; public class DaRPCServer { - private String ipAddress; + private String ipAddress; private int poolsize = 3; private int recvQueue = 16; private int sendQueue = 16; @@ -50,7 +51,7 @@ public class DaRPCServer { private boolean polling = false; private int maxinline = 0; private int connections = 16; - + public void run() throws Exception{ long[] clusterAffinities = new long[poolsize]; for (int i = 0; i < poolsize; i++){ @@ -59,16 +60,16 @@ public void run() throws Exception{ } System.out.println("running...server " + ipAddress + ", poolsize " + poolsize + ", maxinline " + maxinline + ", polling " + polling + ", recvQueue " + recvQueue + ", sendQueue " + sendQueue + ", wqSize " + wqSize + ", rpcservice-timeout " + servicetimeout); RdmaRpcService rpcService = new RdmaRpcService(servicetimeout); - DaRPCMemPool memPool = new DaRPCMemPool(null, 0, -1, -1, -1); + DaRPCMemPool memPool = new DaRPCMemPoolImplBuddy(); DaRPCServerGroup group = DaRPCServerGroup.createServerGroup(rpcService, memPool, clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32); RdmaServerEndpoint> serverEp = group.createServerEndpoint(); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); serverEp.bind(uri); while(true){ serverEp.accept(); - } + } } - + public void launch(String[] args) throws Exception { Option addressOption = Option.builder("a").required().desc("server address").hasArg().build(); Option poolsizeOption = Option.builder("p").desc("pool size").hasArg().build(); @@ -132,9 +133,9 @@ public void launch(String[] args) throws Exception { } this.run(); } - - public static void main(String[] args) throws Exception { + + public static void main(String[] args) throws Exception { DaRPCServer rpcServer = new DaRPCServer(); - rpcServer.launch(args); - } + rpcServer.launch(args); + } } From 7e06fc2118b3fed8e28b60af6868f41d218b4328 Mon Sep 17 00:00:00 2001 From: Adrian Schuepbach Date: Wed, 13 Dec 2017 17:38:26 +0100 Subject: [PATCH 3/6] Allocate always from hugepages. --- .../java/com/ibm/darpc/DaRPCMemPoolImpl.java | 159 ++++++ .../com/ibm/darpc/DaRPCMemPoolImplBuddy.java | 472 ------------------ .../com/ibm/darpc/DaRPCMemPoolImplSimple.java | 97 ---- .../darpc/examples/client/DaRPCClient.java | 11 +- .../darpc/examples/server/DaRPCServer.java | 9 +- 5 files changed, 174 insertions(+), 574 deletions(-) create mode 100644 src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java delete mode 100644 src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java delete mode 100644 src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java new file mode 100644 index 0000000..e9c57b2 --- /dev/null +++ b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java @@ -0,0 +1,159 @@ +package com.ibm.darpc; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.util.LinkedList; + +import com.ibm.disni.rdma.RdmaEndpoint; +import com.ibm.disni.rdma.verbs.IbvMr; +import com.ibm.disni.rdma.verbs.IbvPd; +import com.ibm.disni.util.MemoryUtils; + +public class DaRPCMemPoolImpl implements DaRPCMemPool { + private static final int defaultAllocationSize = 16 * 1024 * 1024; // 16MB + private final int allocationSize; + private final int alignmentSize; + private final String hugePagePath; + private String hugePageFile; + int offset; + ByteBuffer byteBuffer; + IbvPd pd; + IbvMr mr; + int access; + LinkedList freeList; + + public DaRPCMemPoolImpl(String hugePagePath, int allocationSize, int alignmentSize) { + this.allocationSize = allocationSize; + this.alignmentSize = alignmentSize; + this.hugePagePath = hugePagePath; + + this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + } + + public DaRPCMemPoolImpl(String hugePagePath) { + this.allocationSize = defaultAllocationSize; + this.alignmentSize = 0; + this.hugePagePath = hugePagePath; + + this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + } + + // allocate a buffer from hugepages + ByteBuffer allocateHugePageBuffer() throws IOException { + if (hugePagePath == null) { + System.out.println("Hugepage path must be set"); + throw new IOException("Hugepage path must be set"); + } + + hugePageFile = hugePagePath + "/darpcmempoolimpl.mem"; + RandomAccessFile randomFile = null; + try { + randomFile = new RandomAccessFile(hugePageFile, "rw"); + } catch (FileNotFoundException e) { + System.out.println("Path " + hugePageFile + " to huge page directory not found."); + throw e; + } + try { + randomFile.setLength(allocationSize + alignmentSize); + } catch (IOException e) { + System.out.println("Coult not set allocation length of mapped random access file on huge page directory."); + System.out.println("allocaiton size = " + allocationSize + " , alignment size = " + alignmentSize); + System.out.println("allocation size and alignment must be a multiple of the hugepage size."); + randomFile.close(); + throw e; + } + FileChannel channel = randomFile.getChannel(); + MappedByteBuffer mappedBuffer = null; + try { + mappedBuffer = channel.map(MapMode.READ_WRITE, 0, + allocationSize + alignmentSize); + } catch (IOException e) { + System.out.println("Could not map the huge page file on path " + hugePageFile); + randomFile.close(); + throw e; + } + randomFile.close(); + + long rawBufferAddress = MemoryUtils.getAddress(mappedBuffer); + if (alignmentSize > 0) { + long alignmentOffset = rawBufferAddress % alignmentSize; + if (alignmentOffset != 0) { + mappedBuffer.position(alignmentSize - (int)alignmentOffset); + } + } + ByteBuffer b = mappedBuffer.slice(); + return (b); + } + + + @Override + public void close() throws IOException { + synchronized(this) { + try { + mr.deregMr().execute().free(); + } catch (IOException e) { + System.out.println("Could not unregister memory region."); + e.printStackTrace(); + } + File f = new File(hugePageFile); + f.delete(); + } + } + + @Override + public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { + ByteBuffer r = null; + + synchronized(this) { + if (pd == null) { + pd = endpoint.getPd(); + } else if (!pd.equals(endpoint.getPd())) { + throw new IOException("No support for more than one PD"); + } + if (mr == null) { + byteBuffer = allocateHugePageBuffer(); + mr = pd.regMr(byteBuffer, access).execute().free().getMr(); + } + + if (freeList == null) { + offset = size; + freeList = new LinkedList(); + int i = 0; + while ((i * offset + offset) < byteBuffer.capacity()) { + byteBuffer.position(i * offset); + byteBuffer.limit(i * offset + offset); + ByteBuffer b = byteBuffer.slice(); + freeList.addLast(b); + i++; + } + } + else + { + if (size != offset) { + throw new IOException("Requested size does not match block size managed by memory pool."); + } + } + r = freeList.removeFirst(); + r.clear(); + } + return r; + } + + @Override + public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { + synchronized(this) { + freeList.addLast(b); + } + } + + @Override + public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { + return mr.getLkey(); + } +} diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java deleted file mode 100644 index 8d6b0c8..0000000 --- a/src/main/java/com/ibm/darpc/DaRPCMemPoolImplBuddy.java +++ /dev/null @@ -1,472 +0,0 @@ -package com.ibm.darpc; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import com.ibm.disni.rdma.RdmaEndpoint; -import com.ibm.disni.rdma.verbs.IbvMr; -import com.ibm.disni.rdma.verbs.IbvPd; -import com.ibm.disni.util.MemoryUtils; - -public class DaRPCMemPoolImplBuddy implements DaRPCMemPool { - final static int defaultAllocationSize = 16 * 1024 * 1024; // 16MB - final static int defaultMinAllocationSize = 4 * 1024; // 4KB - final static int defaultAlignmentSize = 4 * 1024; // 4KB - final static int defaultHugePageLimit = 0; // no huge pages by default - - private HashMap pdMemPool; // One buddy allocator per protection domain - private List mrs; - private boolean isOpen; - - long currentRegion = 0; - private File dir; - protected int access; // RDMA access flags to use when registering memory regions - private long allocatedHugePageMemory; - - // Configurable values - String hugePagePath = null; - protected int allocationSize; - protected int minAllocationSize; - protected int alignmentSize; - protected int hugePageLimit; - - - - - public DaRPCMemPoolImplBuddy() throws IOException { - isOpen = false; - this.allocationSize = defaultAllocationSize; - this.minAllocationSize = defaultMinAllocationSize; - this.alignmentSize = defaultAlignmentSize; - this.hugePageLimit = defaultHugePageLimit; - - init(); - } - - public DaRPCMemPoolImplBuddy(String hugePagePath, int hugePageLimit, int allocationSize, int minAllocationSize, int alignmentSize) throws IOException { - isOpen = false; - this.hugePagePath = hugePagePath; - this.allocationSize = allocationSize; - this.minAllocationSize = minAllocationSize; - this.alignmentSize = alignmentSize; - this.hugePageLimit = hugePageLimit; - - init(); - } - - protected void init() throws IOException { - allocatedHugePageMemory = 0; - this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; - - if ((this.hugePageLimit > 0) && (hugePagePath == null)) { - throw new IOException("hugePageLimit is > 0 (" + this.hugePageLimit + "), but no hugepage path given."); - } - - if (hugePagePath != null) { - dir = new File(hugePagePath); - if (!dir.exists()){ - dir.mkdirs(); - } - for (File child : dir.listFiles()) { - child.delete(); - } - } - - pdMemPool = new HashMap(); - mrs = new ArrayList(); - isOpen = true; - } - - -// public API - @Override - public void close() throws IOException { - cleanup(); - } - @Override - public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { - return getBufferImpl(endpoint, size); - } - @Override - public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { - freeBufferImpl(endpoint, b); - } - @Override - public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { - if (b == null) { - System.out.println("getLKey(): Argument buffer is null. Cannot return lkey."); - throw new IllegalArgumentException("getLKey(): Argument buffer is null. Cannot return lkey."); - } - PdMemPool pdm = pdMemPool.get(endpoint.getPd()); - BuddyInfo bi = pdm.usedBuddies.get(MemoryUtils.getAddress(b)); - if (bi != null) { - return bi.lkey; - } else { - System.out.println("getLKey(): This buffer is not allocated. Cannot return lkey."); - throw new IllegalArgumentException("getLKey(): This buffer is not allocated. Cannot return lkey."); - } - } - - - - - synchronized void cleanup() { - if (isOpen) { - isOpen = false; - pdMemPool = null; - for (IbvMr m : mrs) { - try { - m.deregMr().execute().free(); - } catch (IOException e) { - System.out.println("Could not unregister memory region."); - e.printStackTrace(); - } - } - mrs = null; - } - - // If hugepages were used, clean and delete the created files and the directory. - if (hugePageLimit > 0) { - if (dir.exists()) { - for (File child : dir.listFiles()) { - child.delete(); - } - dir.delete(); - } - } - } - - @Override - public void finalize() { - // Just in case the user did not do that. - try { - close(); - } catch (Exception e) { - System.out.println("MemoryPoolImplBuddy: Could not finalize() the memory pool"); - e.printStackTrace(); - } - } - - - // the next two methods allocate a buffer from the OS. The first one - // allocates from the regular heap and the second one from huge pages. - // These are two alternatives. If hugepages is configured by the user, - // memory will first be allocated from huge pages and after reacing the limit, - // more memory will be allocated from the regular heap. - - // allocate a buffer from the regular heap - ByteBuffer allocateHeapBuffer() { - ByteBuffer byteBuffer; - - if (alignmentSize > 1) { - ByteBuffer rawBuffer = ByteBuffer.allocateDirect(allocationSize + alignmentSize); - long rawBufferAddress = MemoryUtils.getAddress(rawBuffer); - long alignmentOffset = rawBufferAddress % alignmentSize; - if (alignmentOffset != 0) { - rawBuffer.position(alignmentSize - (int)alignmentOffset); - } - byteBuffer = rawBuffer.slice(); - - } else { - byteBuffer = ByteBuffer.allocateDirect(allocationSize); - } - return (byteBuffer); - } - - // allocate a buffer from hugepages - ByteBuffer allocateHugePageBuffer() throws IOException { - String path = hugePagePath + "/" + currentRegion++ + ".mem"; - RandomAccessFile randomFile = null; - try { - randomFile = new RandomAccessFile(path, "rw"); - } catch (FileNotFoundException e) { - System.out.println("Path " + path + " to huge page directory not found."); - throw e; - } - try { - randomFile.setLength(allocationSize + alignmentSize); - } catch (IOException e) { - System.out.println("Coult not set allocation length of mapped random access file on huge page directory."); - randomFile.close(); - throw e; - } - FileChannel channel = randomFile.getChannel(); - MappedByteBuffer mappedBuffer = null; - try { - mappedBuffer = channel.map(MapMode.READ_WRITE, 0, - allocationSize); - } catch (IOException e) { - System.out.println("Could not map the huge page file on path " + path); - randomFile.close(); - throw e; - } - randomFile.close(); - allocatedHugePageMemory += (allocationSize + alignmentSize); - - long rawBufferAddress = MemoryUtils.getAddress(mappedBuffer); - long alignmentOffset = rawBufferAddress % alignmentSize; - if (alignmentOffset != 0) { - mappedBuffer.position(alignmentSize - (int)alignmentOffset); - } - ByteBuffer b = mappedBuffer.slice(); - return (b); - } - - - // Add a new chunk and register it with the IB device. - // This adds a new "root" to the buddy tree. - protected void addNewBuddy(PdMemPool pdm) throws IOException { - BuddyInfo bi = new BuddyInfo(); - - if ((allocatedHugePageMemory + allocationSize) < hugePageLimit) { - bi.buffer = allocateHugePageBuffer(); - } else { - bi.buffer = allocateHeapBuffer(); - } - // Register buffer with IB card - IbvMr mr = pdm.pd.regMr(bi.buffer, access).execute().free().getMr(); - mrs.add(mr); - - bi.s = state.FREE; - bi.size = allocationSize; - bi.parent = null; - bi.sibling = null; - bi.lkey = mr.getLkey(); - - if (pdm.freeBuddies.get(allocationSize) == null) { - pdm.freeBuddies.put(allocationSize, new LinkedList()); - } - pdm.freeBuddies.get(allocationSize).add(bi); - } - - protected boolean split(PdMemPool pdm, int size) { - if (size > allocationSize) { - return false; - } - if (!pdm.freeBuddies.containsKey(size)) { - if (!split(pdm, size << 1)) { - // no free buddy, which could be split - return false; - } - } - LinkedList l = pdm.freeBuddies.get(size); - if (l == null) { - return false; - } - BuddyInfo bi = l.removeFirst(); - if (l.size() == 0) { - pdm.freeBuddies.remove(size); - } - bi.s = state.SPLIT; - bi.buffer.position(0); - bi.buffer.limit(size >> 1); - ByteBuffer b1 = bi.buffer.slice(); - bi.buffer.position(size >> 1); - bi.buffer.limit(size); - ByteBuffer b2 = bi.buffer.slice(); - - BuddyInfo bi1 = new BuddyInfo(); - BuddyInfo bi2 = new BuddyInfo(); - bi1.buffer = b1; - bi1.s = state.FREE; - bi1.size = (size >> 1); - bi1.parent = bi; - bi1.sibling = bi2; - bi1.lkey = bi.lkey; - - bi2.buffer = b2; - bi2.s = state.FREE; - bi2.size = (size >> 1); - bi2.parent = bi; - bi2.sibling = bi1; - bi2.lkey = bi.lkey; - - if (pdm.freeBuddies.get(size >> 1) == null) { - pdm.freeBuddies.put(size >> 1, new LinkedList()); - } - pdm.freeBuddies.get(size >> 1).add(bi1); - pdm.freeBuddies.get(size >> 1).add(bi2); - - return true; - } - - - protected ByteBuffer getPower2Buffer(PdMemPool pdm, int size) { - if (!pdm.freeBuddies.containsKey(size)) { - if (!split(pdm, size << 1)) { - // no free buddy, which could be split - return null; - } - } - LinkedList l = pdm.freeBuddies.get(size); - if (l == null) { - return null; - } - BuddyInfo bi = l.removeFirst(); - if (l.size() == 0) { - pdm.freeBuddies.remove(size); - } - bi.s = state.USED; - pdm.usedBuddies.put(MemoryUtils.getAddress(bi.buffer), bi); - return bi.buffer; - } - - synchronized ByteBuffer getBufferImpl(RdmaEndpoint endpoint, int size) throws IOException { - int i = minAllocationSize; - - if (!pdMemPool.containsKey(endpoint.getPd())) { - pdMemPool.put(endpoint.getPd(), new PdMemPool(endpoint.getPd())); - } - PdMemPool pdm = pdMemPool.get(endpoint.getPd()); - - while(size > i) { - i <<= 1; - } - - ByteBuffer b = getPower2Buffer(pdm, i); - if (b == null) { - addNewBuddy(pdm); - b = getPower2Buffer(pdm, i); - } - b.clear(); - return (b); - } - - protected void merge(PdMemPool pdm, BuddyInfo bi) { - if (bi.sibling != null) { - if (bi.sibling.s == state.FREE) { - BuddyInfo parent = bi.parent; - parent.s = state.FREE; - if (pdm.freeBuddies.get(parent.size) == null) { - pdm.freeBuddies.put(parent.size, new LinkedList()); - } - pdm.freeBuddies.get(parent.size).add(parent); - pdm.freeBuddies.get(bi.size).remove(bi.sibling); - pdm.freeBuddies.get(bi.size).remove(bi); - if (pdm.freeBuddies.get(bi.size).size() == 0) { - pdm.freeBuddies.remove(bi.size); - } - merge(pdm, parent); - } - } - } - synchronized void freeBufferImpl(RdmaEndpoint endpoint, ByteBuffer b) { - if (b == null) { - return; - } - PdMemPool pdm = pdMemPool.get(endpoint.getPd()); - BuddyInfo bi = pdm.usedBuddies.remove(MemoryUtils.getAddress(b)); - // Buffer is not in the used list. Cannot free. - if (bi == null) { - return; - } - bi.s = state.FREE; - if (pdm.freeBuddies.get(bi.size) == null) { - pdm.freeBuddies.put(bi.size, new LinkedList()); - } - pdm.freeBuddies.get(bi.size).add(bi); - merge(pdm, bi); - } - - void setAllocationSize(int size) { - allocationSize = size; - } - - void setMinAllocationSize(int size) { - minAllocationSize = size; - } - - void setAlignment(int size) { - alignmentSize = size; - } - - int getAllocationSize() { - return allocationSize; - } - - int getMinAllocationSize(int size) { - return minAllocationSize; - } - - int getAlignment(int size) { - return alignmentSize; - } - - void printBuddies() { - System.out.println("Free buddies:\n============"); - for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { - PdMemPool pdm = itpd.next(); - for (Iterator it = pdm.freeBuddies.keySet().iterator(); it.hasNext(); ) { - Integer size = it.next(); - System.out.println("Size: " + size); - LinkedList l = pdm.freeBuddies.get(size); - if (l != null) { - for (Iterator it2 = l.iterator(); it2.hasNext(); ) { - BuddyInfo bi = it2.next(); - System.out.println(bi); - } - } - } - } - System.out.println("============\n"); - - System.out.println("Used buddies:\n============"); - for (Iterator itpd = pdMemPool.values().iterator(); itpd.hasNext(); ) { - PdMemPool pdm = itpd.next(); - for (Iterator it = pdm.usedBuddies.values().iterator(); it.hasNext(); ) { - System.out.println(it.next()); - } - } - System.out.println("============\n"); - } - - - - // Internally used - enum state { - FREE, - USED, - SPLIT - } - - // Internally used to keep track of buffer state - class BuddyInfo { - ByteBuffer buffer; - BuddyInfo parent; - BuddyInfo sibling; - state s; - int size; - int lkey; - @Override - public String toString() { - return new String("Size= " + size + ", state = " - + (s == state.FREE ? "FREE": s == state.USED ? "USED" : "SPLIT") - + ", address = " + MemoryUtils.getAddress(buffer) - + ", capacity = " + buffer.capacity() - + ", lkey = " + lkey); - } - } - - class PdMemPool { - HashMap> freeBuddies; - HashMap usedBuddies; - IbvPd pd; - - PdMemPool(IbvPd pd) { - this.pd = pd; - freeBuddies = new HashMap>(); - usedBuddies = new HashMap(); - } - } -} diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java deleted file mode 100644 index 0465d8a..0000000 --- a/src/main/java/com/ibm/darpc/DaRPCMemPoolImplSimple.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.ibm.darpc; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; - -import com.ibm.disni.rdma.RdmaEndpoint; -import com.ibm.disni.rdma.verbs.IbvMr; -import com.ibm.disni.rdma.verbs.IbvPd; -import com.ibm.disni.util.MemoryUtils; - -public class DaRPCMemPoolImplSimple implements DaRPCMemPool { - final int allocationSize; - final int alignmentSize; - int offset; - ByteBuffer byteBuffer; - IbvPd pd; - IbvMr mr; - int access; - LinkedList freeList; - - public DaRPCMemPoolImplSimple(int allocationSize, int alignmentSize) { - this.allocationSize = allocationSize; - this.alignmentSize = alignmentSize; - - ByteBuffer rawBuffer = ByteBuffer.allocateDirect(allocationSize + alignmentSize); - long rawBufferAddress = MemoryUtils.getAddress(rawBuffer); - long alignmentOffset = rawBufferAddress % alignmentSize; - if (alignmentOffset != 0) { - rawBuffer.position(alignmentSize - (int)alignmentOffset); - } - byteBuffer = rawBuffer.slice(); - this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; - } - - @Override - public void close() throws IOException { - synchronized(this) { - try { - mr.deregMr().execute().free(); - } catch (IOException e) { - System.out.println("Could not unregister memory region."); - e.printStackTrace(); - } - } - } - - @Override - public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { - ByteBuffer r = null; - - synchronized(this) { - if (pd == null) { - pd = endpoint.getPd(); - } else if (!pd.equals(endpoint.getPd())) { - throw new IOException("No support for more than one PD"); - } - if (mr == null) { - mr = pd.regMr(byteBuffer, access).execute().free().getMr(); - } - - if (freeList == null) { - offset = size; - freeList = new LinkedList(); - int i = 0; - while ((i * offset + offset) < byteBuffer.capacity()) { - byteBuffer.position(i * offset); - byteBuffer.limit(i * offset + offset); - ByteBuffer b = byteBuffer.slice(); - freeList.addLast(b); - i++; - } - } - else - { - if (size != offset) { - throw new IOException("Requested size does not match block size managed by memory pool."); - } - } - r = freeList.removeFirst(); - r.clear(); - } - return r; - } - - @Override - public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { - synchronized(this) { - freeList.addLast(b); - } - } - - @Override - public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { - return mr.getLkey(); - } -} diff --git a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java index 268a406..8ca510d 100644 --- a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java +++ b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java @@ -43,7 +43,7 @@ import com.ibm.darpc.DaRPCEndpoint; import com.ibm.darpc.DaRPCFuture; import com.ibm.darpc.DaRPCMemPool; -import com.ibm.darpc.DaRPCMemPoolImplBuddy; +import com.ibm.darpc.DaRPCMemPoolImpl; import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.examples.protocol.RdmaRpcProtocol; import com.ibm.darpc.examples.protocol.RdmaRpcRequest; @@ -192,6 +192,7 @@ public void launch(String[] args) throws Exception { int maxinline = 0; int recvQueue = batchSize; int sendQueue = batchSize; + String hugePagePath = null; Option addressOption = Option.builder("a").required().desc("server address").hasArg().build(); Option loopOption = Option.builder("k").desc("loop count").hasArg().build(); @@ -204,6 +205,7 @@ public void launch(String[] args) throws Exception { Option sendQueueOption = Option.builder("s").desc("send queue").hasArg().build(); Option recvQueueOption = Option.builder("r").desc("receive queue").hasArg().build(); Option serializedSizeOption = Option.builder("l").desc("serialized size").hasArg().build(); + Option hugepagePathOption = Option.builder("h").required().desc("memory pool hugepage path").hasArg().build(); Options options = new Options(); options.addOption(addressOption); options.addOption(loopOption); @@ -216,12 +218,15 @@ public void launch(String[] args) throws Exception { options.addOption(sendQueueOption); options.addOption(recvQueueOption); options.addOption(serializedSizeOption); + options.addOption(hugepagePathOption); CommandLineParser parser = new DefaultParser(); - + try { CommandLine line = parser.parse(options, args); ipAddress = line.getOptionValue(addressOption.getOpt()); + hugePagePath = line.getOptionValue(hugepagePathOption.getOpt()); + if (line.hasOption(loopOption.getOpt())) { loop = Integer.parseInt(line.getOptionValue(loopOption.getOpt())); } @@ -282,7 +287,7 @@ public void launch(String[] args) throws Exception { ClientThread[] benchmarkTask = new ClientThread[threadCount]; RdmaRpcProtocol rpcProtocol = new RdmaRpcProtocol(); - DaRPCMemPool memPool = new DaRPCMemPoolImplBuddy(); + DaRPCMemPool memPool = new DaRPCMemPoolImpl(hugePagePath); System.out.println("starting.. threads " + threadCount + ", connections " + connections + ", server " + ipAddress + ", recvQueue " + recvQueue + ", sendQueue" + sendQueue + ", batchSize " + batchSize + ", mode " + mode); DaRPCClientGroup group = DaRPCClientGroup.createClientGroup(rpcProtocol, memPool, 100, maxinline, recvQueue, sendQueue); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); diff --git a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java index 37e11fa..d3f2e2d 100644 --- a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java +++ b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java @@ -34,7 +34,7 @@ import org.apache.commons.cli.ParseException; import com.ibm.darpc.DaRPCMemPool; -import com.ibm.darpc.DaRPCMemPoolImplBuddy; +import com.ibm.darpc.DaRPCMemPoolImpl; import com.ibm.darpc.DaRPCServerEndpoint; import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.examples.protocol.RdmaRpcRequest; @@ -51,6 +51,7 @@ public class DaRPCServer { private boolean polling = false; private int maxinline = 0; private int connections = 16; + String hugePagePath = null; public void run() throws Exception{ long[] clusterAffinities = new long[poolsize]; @@ -60,7 +61,7 @@ public void run() throws Exception{ } System.out.println("running...server " + ipAddress + ", poolsize " + poolsize + ", maxinline " + maxinline + ", polling " + polling + ", recvQueue " + recvQueue + ", sendQueue " + sendQueue + ", wqSize " + wqSize + ", rpcservice-timeout " + servicetimeout); RdmaRpcService rpcService = new RdmaRpcService(servicetimeout); - DaRPCMemPool memPool = new DaRPCMemPoolImplBuddy(); + DaRPCMemPool memPool = new DaRPCMemPoolImpl(hugePagePath); DaRPCServerGroup group = DaRPCServerGroup.createServerGroup(rpcService, memPool, clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32); RdmaServerEndpoint> serverEp = group.createServerEndpoint(); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); @@ -81,6 +82,7 @@ public void launch(String[] args) throws Exception { Option recvQueueOption = Option.builder("r").desc("receive queue").hasArg().build(); Option sendQueueOption = Option.builder("s").desc("send queue").hasArg().build(); Option serializedSizeOption = Option.builder("l").desc("serialized size").hasArg().build(); + Option hugepagePathOption = Option.builder("h").required().desc("memory pool hugepage path").hasArg().build(); Options options = new Options(); options.addOption(addressOption); options.addOption(poolsizeOption); @@ -92,12 +94,15 @@ public void launch(String[] args) throws Exception { options.addOption(recvQueueOption); options.addOption(sendQueueOption); options.addOption(serializedSizeOption); + options.addOption(hugepagePathOption); CommandLineParser parser = new DefaultParser(); try { CommandLine line = parser.parse(options, args); ipAddress = line.getOptionValue(addressOption.getOpt()); + hugePagePath = line.getOptionValue(hugepagePathOption.getOpt()); + if (line.hasOption(poolsizeOption.getOpt())) { poolsize = Integer.parseInt(line.getOptionValue(poolsizeOption.getOpt())); } From d88de1fe208fd058fb254b2ad9e2866411439424 Mon Sep 17 00:00:00 2001 From: Adrian Schuepbach Date: Thu, 14 Dec 2017 11:53:52 +0100 Subject: [PATCH 4/6] Check for hugepage path in constructor, remove duplicated code form constructors and throw NoSuchElementException, if no more memory blocks are available. --- src/main/java/com/ibm/darpc/DaRPCMemPool.java | 3 +- .../java/com/ibm/darpc/DaRPCMemPoolImpl.java | 29 ++++++++----------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPool.java b/src/main/java/com/ibm/darpc/DaRPCMemPool.java index e7d6ea1..5516c06 100644 --- a/src/main/java/com/ibm/darpc/DaRPCMemPool.java +++ b/src/main/java/com/ibm/darpc/DaRPCMemPool.java @@ -2,12 +2,13 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.NoSuchElementException; import com.ibm.disni.rdma.RdmaEndpoint; public interface DaRPCMemPool { void close() throws IOException; - ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException; + ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException, NoSuchElementException; void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException; public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException; } diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java index e9c57b2..656f3cc 100644 --- a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java +++ b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java @@ -9,6 +9,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.util.LinkedList; +import java.util.NoSuchElementException; import com.ibm.disni.rdma.RdmaEndpoint; import com.ibm.disni.rdma.verbs.IbvMr; @@ -19,7 +20,6 @@ public class DaRPCMemPoolImpl implements DaRPCMemPool { private static final int defaultAllocationSize = 16 * 1024 * 1024; // 16MB private final int allocationSize; private final int alignmentSize; - private final String hugePagePath; private String hugePageFile; int offset; ByteBuffer byteBuffer; @@ -28,35 +28,30 @@ public class DaRPCMemPoolImpl implements DaRPCMemPool { int access; LinkedList freeList; - public DaRPCMemPoolImpl(String hugePagePath, int allocationSize, int alignmentSize) { + public DaRPCMemPoolImpl(String hugePagePath, int allocationSize, int alignmentSize) throws IllegalArgumentException { + if (hugePagePath == null) { + System.out.println("Hugepage path must be set"); + throw new IllegalArgumentException("Hugepage path must be set"); + } + this.allocationSize = allocationSize; this.alignmentSize = alignmentSize; - this.hugePagePath = hugePagePath; + hugePageFile = hugePagePath + "/darpcmempoolimpl.mem"; this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; } - public DaRPCMemPoolImpl(String hugePagePath) { - this.allocationSize = defaultAllocationSize; - this.alignmentSize = 0; - this.hugePagePath = hugePagePath; - - this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + public DaRPCMemPoolImpl(String hugePagePath) throws IllegalArgumentException { + this(hugePagePath, defaultAllocationSize, 0); } // allocate a buffer from hugepages ByteBuffer allocateHugePageBuffer() throws IOException { - if (hugePagePath == null) { - System.out.println("Hugepage path must be set"); - throw new IOException("Hugepage path must be set"); - } - - hugePageFile = hugePagePath + "/darpcmempoolimpl.mem"; RandomAccessFile randomFile = null; try { randomFile = new RandomAccessFile(hugePageFile, "rw"); } catch (FileNotFoundException e) { - System.out.println("Path " + hugePageFile + " to huge page directory not found."); + System.out.println("Path " + hugePageFile + " to huge page path/file cannot be accessed."); throw e; } try { @@ -107,7 +102,7 @@ public void close() throws IOException { } @Override - public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException { + public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException, NoSuchElementException { ByteBuffer r = null; synchronized(this) { From e7a3c6d7010d303e4e61b39096e3a8d1b1012896 Mon Sep 17 00:00:00 2001 From: Adrian Schuepbach Date: Thu, 11 Jan 2018 11:07:23 +0100 Subject: [PATCH 5/6] Simplified MemPoolImpl.java --- .../java/com/ibm/darpc/DaRPCClientGroup.java | 18 +- .../java/com/ibm/darpc/DaRPCEndpoint.java | 15 +- .../com/ibm/darpc/DaRPCEndpointGroup.java | 18 +- src/main/java/com/ibm/darpc/DaRPCMemPool.java | 9 +- .../java/com/ibm/darpc/DaRPCMemPoolImpl.java | 204 ++++++++++-------- .../java/com/ibm/darpc/DaRPCServerGroup.java | 56 ++--- .../darpc/examples/client/DaRPCClient.java | 4 +- .../darpc/examples/server/DaRPCServer.java | 4 +- 8 files changed, 180 insertions(+), 148 deletions(-) diff --git a/src/main/java/com/ibm/darpc/DaRPCClientGroup.java b/src/main/java/com/ibm/darpc/DaRPCClientGroup.java index efcf5e9..e06e54a 100644 --- a/src/main/java/com/ibm/darpc/DaRPCClientGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCClientGroup.java @@ -9,17 +9,17 @@ import com.ibm.disni.rdma.verbs.RdmaCmId; public class DaRPCClientGroup extends DaRPCEndpointGroup, R, T> { - public static DaRPCClientGroup createClientGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { + public static DaRPCClientGroup createClientGroup(DaRPCProtocol protocol, DaRPCMemPool, R, T> memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { DaRPCClientGroup group = new DaRPCClientGroup(protocol, memPool, timeout, maxinline, recvQueue, sendQueue); group.init(new RpcClientFactory(group)); return group; - } - - private DaRPCClientGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) + } + + private DaRPCClientGroup(DaRPCProtocol protocol, DaRPCMemPool, R, T> memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { super(protocol, memPool, timeout, maxinline, recvQueue, sendQueue); } - + @Override public void allocateResources(DaRPCClientEndpoint endpoint) throws Exception { @@ -38,18 +38,18 @@ public IbvQP createQpProvider(DaRPCClientEndpoint endpoint) throws IOExcep IbvQP qp = this.createQP(endpoint.getIdPriv(), endpoint.getPd(), cq); return qp; } - + public static class RpcClientFactory implements RdmaEndpointFactory> { private DaRPCClientGroup group; - + public RpcClientFactory(DaRPCClientGroup group){ this.group = group; } - + @Override public DaRPCClientEndpoint createEndpoint(RdmaCmId id, boolean serverSide) throws IOException { return new DaRPCClientEndpoint(group, id, serverSide); } - } + } } diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java index 4198df7..759a93a 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java @@ -27,7 +27,6 @@ import java.util.LinkedList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -39,7 +38,7 @@ public abstract class DaRPCEndpoint extends RdmaEndpoint { private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); - private static final int headerSize = 4; + static final int HEADERSIZE = 4; //size of the ticket public abstract void dispatchReceive(ByteBuffer buffer, int ticket, int recvIndex) throws IOException; public abstract void dispatchSend(int ticket) throws IOException; @@ -70,7 +69,7 @@ public DaRPCEndpoint(DaRPCEndpointGroup, R, T> endp this.rpcGroup = endpointGroup; this.maxinline = rpcGroup.getMaxInline(); this.payloadSize = rpcGroup.getBufferSize(); - this.rawBufferSize = headerSize + this.payloadSize; + this.rawBufferSize = HEADERSIZE + this.payloadSize; this.sendPipelineLength = rpcGroup.sendQueueSize(); this.recvPipelineLength = rpcGroup.recvQueueSize(); this.freePostSend = new ArrayBlockingQueue(sendPipelineLength); @@ -93,8 +92,8 @@ public void init() throws IOException { * one for sends and one for receives. */ try { - dataBuffer = rpcGroup.getWRBuffer(this, sendPipelineLength * rawBufferSize + recvPipelineLength * rawBufferSize); - lkey = rpcGroup.getLKey(this, dataBuffer); + dataBuffer = rpcGroup.getWRBuffer(this); + lkey = rpcGroup.getLKey(dataBuffer); } catch (Exception e) { throw new IOException(e); } @@ -147,8 +146,8 @@ protected boolean sendMessage(DaRPCMessage message, int ticket) throws IOExcepti if (postSend != null){ int index = (int) postSend.getWrMod(0).getWr_id(); sendBufs[index].putInt(0, ticket); - sendBufs[index].position(headerSize); - int written = headerSize + message.write(sendBufs[index]); + sendBufs[index].position(HEADERSIZE); + int written = HEADERSIZE + message.write(sendBufs[index]); postSend.getWrMod(0).getSgeMod(0).setLength(written); postSend.getWrMod(0).setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); if (written <= maxinline) { @@ -188,7 +187,7 @@ public void dispatchCqEvent(IbvWC wc) throws IOException { int index = (int) wc.getWr_id(); ByteBuffer recvBuffer = recvBufs[index]; int ticket = recvBuffer.getInt(0); - recvBuffer.position(headerSize); + recvBuffer.position(HEADERSIZE); dispatchReceive(recvBuffer, ticket, index); } else if (wc.getOpcode() == 0) { //send completion diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java b/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java index 01429f9..acff4b7 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpointGroup.java @@ -38,13 +38,13 @@ public abstract class DaRPCEndpointGroup, R extends private int timeout; private int bufferSize; private int maxInline; - private DaRPCMemPool memPool; + private DaRPCMemPool memPool; public static int getVersion(){ return DARPC_VERSION; } - protected DaRPCEndpointGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { + protected DaRPCEndpointGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception { super(timeout); this.recvQueueSize = recvQueue; this.sendQueueSize = sendQueue; @@ -54,6 +54,12 @@ protected DaRPCEndpointGroup(DaRPCProtocol protocol, DaRPCMemPool memPool, this.memPool = memPool; } + @Override + public void init(RdmaEndpointFactory factory) { + super.init(factory); + memPool.init(this); + } + protected synchronized IbvQP createQP(RdmaCmId id, IbvPd pd, IbvCQ cq) throws IOException{ IbvQPInitAttr attr = new IbvQPInitAttr(); attr.cap().setMax_recv_wr(recvQueueSize); @@ -95,15 +101,15 @@ public int getMaxInline() { return maxInline; } - ByteBuffer getWRBuffer(RdmaEndpoint endpoint, int size) throws Exception { - return memPool.getBuffer(endpoint, size); + ByteBuffer getWRBuffer(RdmaEndpoint endpoint) throws Exception { + return memPool.getBuffer(endpoint); } void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException { memPool.freeBuffer(endpoint, b); } - int getLKey(RdmaEndpoint endpoint, ByteBuffer b) { - return memPool.getLKey(endpoint, b); + int getLKey(ByteBuffer b) { + return memPool.getLKey(b); } } diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPool.java b/src/main/java/com/ibm/darpc/DaRPCMemPool.java index 5516c06..9d30255 100644 --- a/src/main/java/com/ibm/darpc/DaRPCMemPool.java +++ b/src/main/java/com/ibm/darpc/DaRPCMemPool.java @@ -6,9 +6,10 @@ import com.ibm.disni.rdma.RdmaEndpoint; -public interface DaRPCMemPool { +public interface DaRPCMemPool, R extends DaRPCMessage, T extends DaRPCMessage> { + void init(DaRPCEndpointGroup endpointGroup); void close() throws IOException; - ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException, NoSuchElementException; - void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) throws IOException; - public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException; + ByteBuffer getBuffer(RdmaEndpoint endpoint) throws IOException, NoSuchElementException; + void freeBuffer(RdmaEndpoint endpoint, ByteBuffer buffer) throws IOException; + public int getLKey(ByteBuffer b) throws IllegalArgumentException; } diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java index 656f3cc..fad0d79 100644 --- a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java +++ b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java @@ -9,57 +9,138 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.util.LinkedList; +import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.ibm.disni.rdma.RdmaEndpoint; import com.ibm.disni.rdma.verbs.IbvMr; import com.ibm.disni.rdma.verbs.IbvPd; import com.ibm.disni.util.MemoryUtils; -public class DaRPCMemPoolImpl implements DaRPCMemPool { +public class DaRPCMemPoolImpl, R extends DaRPCMessage, T extends DaRPCMessage> implements DaRPCMemPool { + private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private static final int defaultAllocationSize = 16 * 1024 * 1024; // 16MB private final int allocationSize; private final int alignmentSize; + private final int allocationLimit; + private int currentAllocationSize; private String hugePageFile; - int offset; - ByteBuffer byteBuffer; - IbvPd pd; - IbvMr mr; - int access; - LinkedList freeList; - - public DaRPCMemPoolImpl(String hugePagePath, int allocationSize, int alignmentSize) throws IllegalArgumentException { + private ConcurrentHashMap memoryRegions; + private int access; + private DaRPCEndpointGroup endpointGroup; + private ConcurrentHashMap> pdMap; + private List mrs; + + public DaRPCMemPoolImpl(String hugePagePath, int allocationSize, int alignmentSize, int allocationLimit) throws IllegalArgumentException { if (hugePagePath == null) { - System.out.println("Hugepage path must be set"); + logger.error("Hugepage path must be set"); throw new IllegalArgumentException("Hugepage path must be set"); } - this.allocationSize = allocationSize; this.alignmentSize = alignmentSize; + this.allocationLimit = allocationLimit; hugePageFile = hugePagePath + "/darpcmempoolimpl.mem"; - + this.currentAllocationSize = 0; this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + this.pdMap = new ConcurrentHashMap>(); + this.mrs = new LinkedList(); } public DaRPCMemPoolImpl(String hugePagePath) throws IllegalArgumentException { - this(hugePagePath, defaultAllocationSize, 0); + this(hugePagePath, defaultAllocationSize, 0, 16 * defaultAllocationSize); + } + + @Override + public void init(DaRPCEndpointGroup endpointGroup) { + this.endpointGroup = endpointGroup; + } + + @Override + public void close() throws IOException { + synchronized(this) { + for (IbvMr m : mrs) { + try { + m.deregMr().execute().free(); + } catch (IOException e) { + System.out.println("Could not unregister memory region."); + e.printStackTrace(); + } + } + mrs = null; + File f = new File(hugePageFile); + f.delete(); + } + } + + @Override + public ByteBuffer getBuffer(RdmaEndpoint endpoint) throws IOException, NoSuchElementException { + LinkedBlockingQueue freeList = pdMap.get(endpoint.getPd()); + + if (freeList == null) { + synchronized(this) { + freeList = pdMap.get(endpoint.getPd()); + if (freeList == null) { + freeList = new LinkedBlockingQueue(); + pdMap.put(endpoint.getPd(), freeList); + } + } + } + + ByteBuffer r = freeList.poll(); + + if (r == null) { + synchronized(this) { + r = freeList.poll(); + if (r == null) { + allocateHugePageBuffer(freeList, endpoint.getPd()); + } + r = freeList.poll(); + if (r == null) { + logger.error("Failed to allocate more buffers."); + throw new NoSuchElementException("Failed to allocate more buffers."); + } + } + } + r.clear(); + return r; + } + + @Override + public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer buffer) { + LinkedBlockingQueue freeList = pdMap.get(endpoint.getPd()); + freeList.add(buffer); + } + + @Override + public int getLKey(ByteBuffer buffer) throws IllegalArgumentException { + return memoryRegions.get(MemoryUtils.getAddress(buffer)).getLkey(); } // allocate a buffer from hugepages - ByteBuffer allocateHugePageBuffer() throws IOException { + private void allocateHugePageBuffer(LinkedBlockingQueue freeList, IbvPd pd) throws IOException { + int totalAllocationSize = allocationSize + alignmentSize; + if ((currentAllocationSize + totalAllocationSize) > allocationLimit) { + logger.error("Out of memory. Cannot allocate more buffers from hugepages."); + throw new IOException("Out of memory. Cannot allocate more buffers from hugepages."); + } RandomAccessFile randomFile = null; try { randomFile = new RandomAccessFile(hugePageFile, "rw"); } catch (FileNotFoundException e) { - System.out.println("Path " + hugePageFile + " to huge page path/file cannot be accessed."); + logger.error("Path " + hugePageFile + " to huge page path/file cannot be accessed."); throw e; } try { - randomFile.setLength(allocationSize + alignmentSize); + randomFile.setLength(totalAllocationSize); } catch (IOException e) { - System.out.println("Coult not set allocation length of mapped random access file on huge page directory."); - System.out.println("allocaiton size = " + allocationSize + " , alignment size = " + alignmentSize); - System.out.println("allocation size and alignment must be a multiple of the hugepage size."); + logger.error("Could not set allocation length of mapped random access file on huge page directory."); + logger.error("allocaiton size = " + allocationSize + " , alignment size = " + alignmentSize); + logger.error("allocation size and alignment must be a multiple of the hugepage size."); randomFile.close(); throw e; } @@ -67,14 +148,16 @@ ByteBuffer allocateHugePageBuffer() throws IOException { MappedByteBuffer mappedBuffer = null; try { mappedBuffer = channel.map(MapMode.READ_WRITE, 0, - allocationSize + alignmentSize); + totalAllocationSize); } catch (IOException e) { - System.out.println("Could not map the huge page file on path " + hugePageFile); + logger.error("Could not map the huge page file on path " + hugePageFile); randomFile.close(); throw e; } randomFile.close(); + currentAllocationSize += totalAllocationSize; + long rawBufferAddress = MemoryUtils.getAddress(mappedBuffer); if (alignmentSize > 0) { long alignmentOffset = rawBufferAddress % alignmentSize; @@ -82,73 +165,20 @@ ByteBuffer allocateHugePageBuffer() throws IOException { mappedBuffer.position(alignmentSize - (int)alignmentOffset); } } - ByteBuffer b = mappedBuffer.slice(); - return (b); - } - - - @Override - public void close() throws IOException { - synchronized(this) { - try { - mr.deregMr().execute().free(); - } catch (IOException e) { - System.out.println("Could not unregister memory region."); - e.printStackTrace(); - } - File f = new File(hugePageFile); - f.delete(); - } - } - - @Override - public ByteBuffer getBuffer(RdmaEndpoint endpoint, int size) throws IOException, NoSuchElementException { - ByteBuffer r = null; - - synchronized(this) { - if (pd == null) { - pd = endpoint.getPd(); - } else if (!pd.equals(endpoint.getPd())) { - throw new IOException("No support for more than one PD"); - } - if (mr == null) { - byteBuffer = allocateHugePageBuffer(); - mr = pd.regMr(byteBuffer, access).execute().free().getMr(); - } - - if (freeList == null) { - offset = size; - freeList = new LinkedList(); - int i = 0; - while ((i * offset + offset) < byteBuffer.capacity()) { - byteBuffer.position(i * offset); - byteBuffer.limit(i * offset + offset); - ByteBuffer b = byteBuffer.slice(); - freeList.addLast(b); - i++; - } - } - else - { - if (size != offset) { - throw new IOException("Requested size does not match block size managed by memory pool."); - } - } - r = freeList.removeFirst(); - r.clear(); - } - return r; - } - @Override - public void freeBuffer(RdmaEndpoint endpoint, ByteBuffer b) { - synchronized(this) { - freeList.addLast(b); + ByteBuffer alignedBuffer = mappedBuffer.slice(); + + IbvMr mr = pd.regMr(alignedBuffer, access).execute().free().getMr(); + mrs.add(mr); + int sliceSize = endpointGroup.getBufferSize() + DaRPCEndpoint.HEADERSIZE; + int i = 0; + while ((i * sliceSize + sliceSize) < alignedBuffer.capacity()) { + alignedBuffer.position(i * sliceSize); + alignedBuffer.limit(i * sliceSize + sliceSize); + ByteBuffer buffer = alignedBuffer.slice(); + freeList.add(buffer); + memoryRegions.put(MemoryUtils.getAddress(buffer), mr); + i++; } } - - @Override - public int getLKey(RdmaEndpoint endpoint, ByteBuffer b) throws IllegalArgumentException { - return mr.getLkey(); - } } diff --git a/src/main/java/com/ibm/darpc/DaRPCServerGroup.java b/src/main/java/com/ibm/darpc/DaRPCServerGroup.java index d0a0f47..47dca76 100644 --- a/src/main/java/com/ibm/darpc/DaRPCServerGroup.java +++ b/src/main/java/com/ibm/darpc/DaRPCServerGroup.java @@ -15,7 +15,7 @@ public class DaRPCServerGroup extends DaRPCEndpointGroup, R, T> { private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); - + private ConcurrentHashMap> deviceInstance; private DaRPCResourceManager resourceManager; private long[] computeAffinities; @@ -26,8 +26,8 @@ public class DaRPCServerGroup ex private boolean polling; private int pollSize; private int clusterSize; - - public static DaRPCServerGroup createServerGroup(DaRPCService rpcService, DaRPCMemPool memPool, long[] clusterAffinities, int timeout, int maxinline, boolean polling, + + public static DaRPCServerGroup createServerGroup(DaRPCService rpcService, DaRPCMemPool, R, T> memPool, long[] clusterAffinities, int timeout, int maxinline, boolean polling, int recvQueue, int sendQueue, int pollSize, int clusterSize) throws Exception { DaRPCServerGroup group = new DaRPCServerGroup(rpcService, memPool, clusterAffinities, timeout, maxinline, polling, recvQueue, sendQueue, pollSize, clusterSize); @@ -35,14 +35,14 @@ public static DaRPCServerGroup< return group; } - private DaRPCServerGroup(DaRPCService rpcService, DaRPCMemPool memPool,long[] clusterAffinities, int timeout, int maxinline, + private DaRPCServerGroup(DaRPCService rpcService, DaRPCMemPool, R, T> memPool,long[] clusterAffinities, int timeout, int maxinline, boolean polling, int recvQueue, int sendQueue, int pollSize, int clusterSize) throws Exception { super(rpcService, memPool, timeout, maxinline, recvQueue, sendQueue); - + this.rpcService = rpcService; deviceInstance = new ConcurrentHashMap>(); this.computeAffinities = clusterAffinities; - this.resourceAffinities = clusterAffinities; + this.resourceAffinities = clusterAffinities; this.nbrOfClusters = computeAffinities.length; this.currentCluster = 0; resourceManager = new DaRPCResourceManager(resourceAffinities, timeout); @@ -50,7 +50,7 @@ private DaRPCServerGroup(DaRPCService rpcService, DaRPCMemPool memPool,lon this.pollSize = pollSize; this.clusterSize = clusterSize; } - + public RdmaCqProvider createCqProvider(DaRPCServerEndpoint endpoint) throws IOException { logger.info("setting up cq processor (multicore)"); IbvContext context = endpoint.getIdPriv().getVerbs(); @@ -67,27 +67,27 @@ public RdmaCqProvider createCqProvider(DaRPCServerEndpoint endpoint) throws rpcInstance = deviceInstance.get(context.getCmd_fd()); DaRPCCluster cqProcessor = rpcInstance.getProcessor(endpoint.clusterId()); return cqProcessor; - } - + } + public IbvQP createQpProvider(DaRPCServerEndpoint endpoint) throws IOException{ logger.info("setting up QP"); DaRPCCluster cqProcessor = this.lookupCqProcessor(endpoint); IbvCQ cq = cqProcessor.getCQ(); - IbvQP qp = this.createQP(endpoint.getIdPriv(), endpoint.getPd(), cq); + IbvQP qp = this.createQP(endpoint.getIdPriv(), endpoint.getPd(), cq); cqProcessor.registerQP(qp.getQp_num(), endpoint); return qp; - } - + } + public void allocateResources(DaRPCServerEndpoint endpoint) throws Exception { resourceManager.allocateResources(endpoint); } - + synchronized int newClusterId() { int newClusterId = currentCluster; currentCluster = (currentCluster + 1) % nbrOfClusters; return newClusterId; } - + protected synchronized DaRPCCluster lookupCqProcessor(DaRPCServerEndpoint endpoint) throws IOException{ IbvContext context = endpoint.getIdPriv().getVerbs(); if (context == null) { @@ -102,17 +102,17 @@ protected synchronized DaRPCCluster lookupCqProcessor(DaRPCServerEndpoint cqProcessor = rpcInstance.getProcessor(endpoint.clusterId()); return cqProcessor; } - } - + } + public void close() throws IOException, InterruptedException { super.close(); for (DaRPCInstance rpcInstance : deviceInstance.values()){ rpcInstance.close(); - } + } resourceManager.close(); logger.info("rpc group down"); - } - + } + public R createRequest() { return rpcService.createRequest(); } @@ -120,33 +120,33 @@ public R createRequest() { public T createResponse() { return rpcService.createResponse(); } - + public void processServerEvent(DaRPCServerEvent event) throws IOException { rpcService.processServerEvent(event); } - + public void open(DaRPCServerEndpoint endpoint){ rpcService.open(endpoint); - } - + } + public void close(DaRPCServerEndpoint endpoint){ rpcService.close(endpoint); } - + public DaRPCService getRpcService() { return rpcService; } - + public static class RpcServerFactory implements RdmaEndpointFactory> { private DaRPCServerGroup group; - + public RpcServerFactory(DaRPCServerGroup group){ this.group = group; } - + @Override public DaRPCServerEndpoint createEndpoint(RdmaCmId id, boolean serverSide) throws IOException { return new DaRPCServerEndpoint(group, id, serverSide); } - } + } } diff --git a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java index 8ca510d..283aa2f 100644 --- a/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java +++ b/src/test/java/com/ibm/darpc/examples/client/DaRPCClient.java @@ -22,8 +22,6 @@ package com.ibm.darpc.examples.client; import java.io.FileOutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -287,7 +285,7 @@ public void launch(String[] args) throws Exception { ClientThread[] benchmarkTask = new ClientThread[threadCount]; RdmaRpcProtocol rpcProtocol = new RdmaRpcProtocol(); - DaRPCMemPool memPool = new DaRPCMemPoolImpl(hugePagePath); + DaRPCMemPool, RdmaRpcRequest, RdmaRpcResponse> memPool = new DaRPCMemPoolImpl, RdmaRpcRequest, RdmaRpcResponse>(hugePagePath); System.out.println("starting.. threads " + threadCount + ", connections " + connections + ", server " + ipAddress + ", recvQueue " + recvQueue + ", sendQueue" + sendQueue + ", batchSize " + batchSize + ", mode " + mode); DaRPCClientGroup group = DaRPCClientGroup.createClientGroup(rpcProtocol, memPool, 100, maxinline, recvQueue, sendQueue); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); diff --git a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java index d3f2e2d..58dc4a8 100644 --- a/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java +++ b/src/test/java/com/ibm/darpc/examples/server/DaRPCServer.java @@ -21,8 +21,6 @@ package com.ibm.darpc.examples.server; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URI; import org.apache.commons.cli.CommandLine; @@ -61,7 +59,7 @@ public void run() throws Exception{ } System.out.println("running...server " + ipAddress + ", poolsize " + poolsize + ", maxinline " + maxinline + ", polling " + polling + ", recvQueue " + recvQueue + ", sendQueue " + sendQueue + ", wqSize " + wqSize + ", rpcservice-timeout " + servicetimeout); RdmaRpcService rpcService = new RdmaRpcService(servicetimeout); - DaRPCMemPool memPool = new DaRPCMemPoolImpl(hugePagePath); + DaRPCMemPool, RdmaRpcRequest, RdmaRpcResponse> memPool = new DaRPCMemPoolImpl, RdmaRpcRequest, RdmaRpcResponse>(hugePagePath); DaRPCServerGroup group = DaRPCServerGroup.createServerGroup(rpcService, memPool, clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32); RdmaServerEndpoint> serverEp = group.createServerEndpoint(); URI uri = URI.create("rdma://" + ipAddress + ":" + 1919); From bf52e02e8d4548f3b772e4d486dbe2c83e5e8b9a Mon Sep 17 00:00:00 2001 From: Adrian Schuepbach Date: Thu, 18 Jan 2018 00:38:13 +0100 Subject: [PATCH 6/6] Simplified mempool --- .../java/com/ibm/darpc/DaRPCEndpoint.java | 54 +++++++------------ .../java/com/ibm/darpc/DaRPCMemPoolImpl.java | 38 +++++++++---- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java index 759a93a..01ed96c 100644 --- a/src/main/java/com/ibm/darpc/DaRPCEndpoint.java +++ b/src/main/java/com/ibm/darpc/DaRPCEndpoint.java @@ -44,10 +44,6 @@ public abstract class DaRPCEndpoint, R, T> rpcGroup; - private ByteBuffer dataBuffer; - private int lkey; - private ByteBuffer receiveBuffer; - private ByteBuffer sendBuffer; private ByteBuffer[] recvBufs; private ByteBuffer[] sendBufs; private SVCPostRecv[] recvCall; @@ -86,41 +82,22 @@ public DaRPCEndpoint(DaRPCEndpointGroup, R, T> endp } public void init() throws IOException { - int sendBufferOffset = recvPipelineLength * rawBufferSize; - - /* Main data buffer for sends and receives. Will be split into two regions, - * one for sends and one for receives. - */ - try { - dataBuffer = rpcGroup.getWRBuffer(this); - lkey = rpcGroup.getLKey(dataBuffer); - } catch (Exception e) { - throw new IOException(e); - } - - /* Receive memory region is the first half of the main buffer. */ - dataBuffer.limit(dataBuffer.position() + sendBufferOffset); - receiveBuffer = dataBuffer.slice(); - - /* Send memory region is the second half of the main buffer. */ - dataBuffer.position(sendBufferOffset); - dataBuffer.limit(dataBuffer.capacity()); - sendBuffer = dataBuffer.slice(); - for(int i = 0; i < sendPipelineLength; i++) { - /* Create single send buffers within the send region in form of slices. */ - sendBuffer.position(i * rawBufferSize); - sendBuffer.limit(sendBuffer.position() + rawBufferSize); - sendBufs[i] = sendBuffer.slice(); + try { + sendBufs[i] = rpcGroup.getWRBuffer(this); + } catch (Exception e) { + throw new IOException(e); + } this.sendCall[i] = setupSendTask(i); freePostSend.add(sendCall[i]); } for(int i = 0; i < recvPipelineLength; i++) { - /* Create single receive buffers within the receive region in form of slices. */ - receiveBuffer.position(i * rawBufferSize); - receiveBuffer.limit(receiveBuffer.position() + rawBufferSize); - recvBufs[i] = receiveBuffer.slice(); + try { + recvBufs[i] = rpcGroup.getWRBuffer(this); + } catch (Exception e) { + throw new IOException(e); + } this.recvCall[i] = setupRecvTask(i); recvCall[i].execute(); @@ -129,7 +106,12 @@ public void init() throws IOException { @Override public synchronized void close() throws IOException, InterruptedException { - rpcGroup.freeBuffer(this, dataBuffer); + for(int i = 0; i < sendPipelineLength; i++) { + rpcGroup.freeBuffer(this, sendBufs[i]); + } + for(int i = 0; i < recvPipelineLength; i++) { + rpcGroup.freeBuffer(this, recvBufs[i]); + } super.close(); } @@ -207,7 +189,7 @@ private SVCPostSend setupSendTask(int wrid) throws IOException { IbvSge sge = new IbvSge(); sge.setAddr(MemoryUtils.getAddress(sendBufs[wrid])); sge.setLength(rawBufferSize); - sge.setLkey(lkey); + sge.setLkey(rpcGroup.getLKey(sendBufs[wrid])); sgeList.add(sge); IbvSendWR sendWR = new IbvSendWR(); @@ -227,7 +209,7 @@ private SVCPostRecv setupRecvTask(int wrid) throws IOException { IbvSge sge = new IbvSge(); sge.setAddr(MemoryUtils.getAddress(recvBufs[wrid])); sge.setLength(rawBufferSize); - sge.setLkey(lkey); + sge.setLkey(rpcGroup.getLKey(recvBufs[wrid])); sgeList.add(sge); IbvRecvWR recvWR = new IbvRecvWR(); diff --git a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java index fad0d79..20aa5c4 100644 --- a/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java +++ b/src/main/java/com/ibm/darpc/DaRPCMemPoolImpl.java @@ -25,30 +25,34 @@ public class DaRPCMemPoolImpl, R extends DaRPCMessage, T extends DaRPCMessage> implements DaRPCMemPool { private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private static final int defaultAllocationSize = 16 * 1024 * 1024; // 16MB + private static final String hugePageFileName = "/darpcmempoolimpl.mem"; private final int allocationSize; private final int alignmentSize; private final int allocationLimit; private int currentAllocationSize; - private String hugePageFile; + private final String hugePagePath; private ConcurrentHashMap memoryRegions; private int access; private DaRPCEndpointGroup endpointGroup; private ConcurrentHashMap> pdMap; private List mrs; + private List hugePageFiles; public DaRPCMemPoolImpl(String hugePagePath, int allocationSize, int alignmentSize, int allocationLimit) throws IllegalArgumentException { if (hugePagePath == null) { logger.error("Hugepage path must be set"); throw new IllegalArgumentException("Hugepage path must be set"); } + this.hugePagePath = hugePagePath; this.allocationSize = allocationSize; this.alignmentSize = alignmentSize; this.allocationLimit = allocationLimit; - hugePageFile = hugePagePath + "/darpcmempoolimpl.mem"; this.currentAllocationSize = 0; this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; this.pdMap = new ConcurrentHashMap>(); this.mrs = new LinkedList(); + memoryRegions = new ConcurrentHashMap(); + hugePageFiles = new LinkedList(); } public DaRPCMemPoolImpl(String hugePagePath) throws IllegalArgumentException { @@ -67,13 +71,16 @@ public void close() throws IOException { try { m.deregMr().execute().free(); } catch (IOException e) { - System.out.println("Could not unregister memory region."); + logger.error("Could not unregister memory region."); e.printStackTrace(); } } mrs = null; - File f = new File(hugePageFile); - f.delete(); + for (String fileName : hugePageFiles) { + File f = new File(fileName); + f.delete(); + } + hugePageFiles = null; } } @@ -125,16 +132,27 @@ public int getLKey(ByteBuffer buffer) throws IllegalArgumentException { private void allocateHugePageBuffer(LinkedBlockingQueue freeList, IbvPd pd) throws IOException { int totalAllocationSize = allocationSize + alignmentSize; if ((currentAllocationSize + totalAllocationSize) > allocationLimit) { - logger.error("Out of memory. Cannot allocate more buffers from hugepages."); - throw new IOException("Out of memory. Cannot allocate more buffers from hugepages."); + logger.error("Out of memory. Cannot allocate more buffers from hugepages. " + + "allocationSize = " + allocationSize + + ", alignmentSize = " + alignmentSize + + ", currentAllocationSize = " + currentAllocationSize + + ", allocationLimit = " + allocationLimit); + throw new IOException("Out of memory. Cannot allocate more buffers from hugepages." + + "allocationSize = " + allocationSize + + ", alignmentSize = " + alignmentSize + + ", currentAllocationSize = " + currentAllocationSize + + ", allocationLimit = " + allocationLimit); + } + String newFile = this.hugePagePath + hugePageFileName + System.currentTimeMillis(); RandomAccessFile randomFile = null; try { - randomFile = new RandomAccessFile(hugePageFile, "rw"); + randomFile = new RandomAccessFile(newFile, "rw"); } catch (FileNotFoundException e) { - logger.error("Path " + hugePageFile + " to huge page path/file cannot be accessed."); + logger.error("Path " + newFile + " to huge page path/file cannot be accessed."); throw e; } + hugePageFiles.add(newFile); try { randomFile.setLength(totalAllocationSize); } catch (IOException e) { @@ -150,7 +168,7 @@ private void allocateHugePageBuffer(LinkedBlockingQueue freeList, Ib mappedBuffer = channel.map(MapMode.READ_WRITE, 0, totalAllocationSize); } catch (IOException e) { - logger.error("Could not map the huge page file on path " + hugePageFile); + logger.error("Could not map the huge page file on path " + newFile); randomFile.close(); throw e; }