Skip to content

Commit 3733de6

Browse files
luis-garza-devtimm4205
authored andcommitted
feat: Set memory size limit in multi-sql statements with no ring buffer
1 parent 685d4a8 commit 3733de6

File tree

3 files changed

+118
-23
lines changed

3 files changed

+118
-23
lines changed

src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
package com.amazon.redshift.core.v3;
88

99
import com.amazon.redshift.RedshiftProperty;
10-
import com.amazon.redshift.copy.CopyIn;
1110
import com.amazon.redshift.copy.CopyOperation;
12-
import com.amazon.redshift.copy.CopyOut;
1311
import com.amazon.redshift.core.CommandCompleteParser;
1412
import com.amazon.redshift.core.Encoding;
1513
import com.amazon.redshift.core.EncodingPredictor;
@@ -71,7 +69,6 @@
7169
import java.util.Set;
7270
import java.util.TimeZone;
7371
import java.util.concurrent.TimeUnit;
74-
import java.util.concurrent.atomic.AtomicBoolean;
7572
import java.util.concurrent.locks.Lock;
7673
import java.util.concurrent.locks.ReentrantLock;
7774
import java.util.regex.Pattern;
@@ -158,6 +155,8 @@ private static boolean looksLikePrepare(String sql) {
158155
// Query or some execution on a socket in process
159156
private final Lock m_executingLock = new ReentrantLock();
160157

158+
private static final long INVALID_TUPLE_SIZE = -1L;
159+
161160
/**
162161
* {@code CommandComplete(B)} messages are quite common, so we reuse instance to parse those
163162
*/
@@ -1900,6 +1899,7 @@ private void processResultsOnThread(ResultHandler handler,
19001899
&& (!bothRowsAndStatus); // RETURNING clause
19011900

19021901
List<Tuple> tuples = null;
1902+
long totalTupleSize = 0;
19031903

19041904
int c;
19051905
boolean endQuery = false;
@@ -2269,8 +2269,15 @@ private void processResultsOnThread(ResultHandler handler,
22692269
tuples = new ArrayList<Tuple>();
22702270
}
22712271

2272-
if(!skipRow)
2272+
if (!skipRow) {
2273+
if (enableFetchRingBuffer) {
2274+
totalTupleSize = checkAndUpdateTupleSize(tuple, totalTupleSize, handler);
2275+
if (handler.getException() != null || totalTupleSize == INVALID_TUPLE_SIZE) {
2276+
return;
2277+
}
2278+
}
22732279
tuples.add(tuple);
2280+
}
22742281
}
22752282
}
22762283

@@ -2492,6 +2499,40 @@ private void processResultsOnThread(ResultHandler handler,
24922499

24932500
}
24942501

2502+
/**
2503+
* Checks and updates the tuple size ensuring it doesn't exceed buffer limits.
2504+
*
2505+
* @param tuple The tuple to check
2506+
* @param totalTupleSize Current total size of tuples
2507+
* @param handler Error handler for reporting issues
2508+
* @return Updated total size or `INVALID_TUPLE_SIZE` if an error occurred
2509+
*/
2510+
private long checkAndUpdateTupleSize(Tuple tuple, long totalTupleSize, ResultHandler handler) {
2511+
int nodeSize = RedshiftMemoryUtils.calculateNodeSize(tuple);
2512+
2513+
// Check for potential overflow before addition
2514+
if (Long.MAX_VALUE - totalTupleSize < nodeSize) {
2515+
handler.handleError(
2516+
new RedshiftException(
2517+
GT.tr("Buffer size calculation overflow. Current size: {0}, Adding: {1}",
2518+
totalTupleSize, nodeSize),
2519+
RedshiftState.OUT_OF_MEMORY));
2520+
return INVALID_TUPLE_SIZE;
2521+
}
2522+
2523+
long newTotalSize = totalTupleSize + nodeSize;
2524+
if (newTotalSize > this.fetchRingBufferSize) {
2525+
handler.handleError(
2526+
new RedshiftException(
2527+
GT.tr("Total result set size of {0} bytes would exceed maximum allowed size of {1} bytes",
2528+
newTotalSize, this.fetchRingBufferSize),
2529+
RedshiftState.OUT_OF_MEMORY));
2530+
return INVALID_TUPLE_SIZE;
2531+
}
2532+
2533+
return newTotalSize;
2534+
}
2535+
24952536
/**
24962537
* Ignore the response message by reading the message length and skipping over those bytes in the
24972538
* communication stream.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.amazon.redshift.core.v3;
2+
3+
import com.amazon.redshift.core.Tuple;
4+
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
5+
6+
/**
7+
* Utility class for memory-related calculations in Redshift JDBC driver.
8+
* Provides methods to estimate memory usage of result set data structures.
9+
*/
10+
public final class RedshiftMemoryUtils {
11+
12+
/**
13+
* Scaling factor to account for JVM memory overhead and alignment.
14+
* The factor 1.2 adds a 20% safety margin to the calculated size.
15+
*/
16+
static final double MEMORY_ESTIMATE_SCALING_FACTOR = 1.2;
17+
18+
/**
19+
* Memory overhead for a node in a 64-bit JVM:
20+
* - 8 bytes: Object reference for Tuple
21+
* - 8 bytes: Reference for next pointer
22+
* - 16 bytes: Object header overhead
23+
*/
24+
private static final int NODE_OVERHEAD_64BIT = 32;
25+
26+
/**
27+
* Memory overhead for a node in a 32-bit JVM:
28+
* - 4 bytes: Object reference for Tuple
29+
* - 4 bytes: Reference for next pointer
30+
* - 8 bytes: Object header overhead
31+
*/
32+
private static final int NODE_OVERHEAD_32BIT = 16;
33+
34+
/**
35+
* JVM-specific memory overhead for node structures.
36+
*/
37+
private static final int NODE_OVERHEAD = RedshiftConnectionImpl.IS_64_BIT_JVM
38+
? NODE_OVERHEAD_64BIT
39+
: NODE_OVERHEAD_32BIT;
40+
41+
private RedshiftMemoryUtils() {
42+
throw new AssertionError("Utility class should not be instantiated");
43+
}
44+
45+
/**
46+
* Calculates the estimated memory size in bytes for a tuple node, including JVM overhead.
47+
* The calculation includes:
48+
* <ul>
49+
* <li>The actual tuple data size</li>
50+
* <li>JVM object overhead (32 bytes for 64-bit JVM, 16 bytes for 32-bit JVM)</li>
51+
* <li>A scaling factor to account for additional JVM overhead</li>
52+
* </ul>
53+
*
54+
* Note: We don't worry about overflow in this method because:
55+
* - MEMORY_ESTIMATE_SCALING_FACTOR is 1.2, so even with scaling we're only increasing values by 20%
56+
* - NODE_OVERHEAD is a modest constant value
57+
* The main overflow risk comes from accumulating multiple tuples' sizes.
58+
*
59+
* @param row The tuple to calculate size for, can be null
60+
* @return Estimated size in bytes, or 0 if the tuple is null
61+
*/
62+
public static int calculateNodeSize(Tuple row) {
63+
if (row == null) {
64+
return 0;
65+
}
66+
67+
int rawSize = row.getTupleSize() + NODE_OVERHEAD;
68+
return (int) (rawSize * MEMORY_ESTIMATE_SCALING_FACTOR);
69+
}
70+
}

src/main/java/com/amazon/redshift/core/v3/RedshiftRowsBlockingQueue.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class RedshiftRowsBlockingQueue<E> extends LinkedBlockingQueue<E> {
3737
private RedshiftLogger logger;
3838

3939
private Portal currentSuspendedPortal;
40-
41-
private final double MEMORY_ESTIMATE_SCALING_FACTOR = 1.2;
4240

4341
public RedshiftRowsBlockingQueue(int capacity) {
4442
super(capacity);
@@ -108,7 +106,7 @@ public void put(E e) throws InterruptedException {
108106

109107
super.put(e);
110108

111-
currentBufSize = totalFetchRingBufferSize.addAndGet(getNodeSize(row));
109+
currentBufSize = totalFetchRingBufferSize.addAndGet(RedshiftMemoryUtils.calculateNodeSize(row));
112110

113111
if (currentBufSize < fetchRingBufferSizeCapacity)
114112
notFull.signal();
@@ -118,7 +116,7 @@ public void put(E e) throws InterruptedException {
118116
}
119117
else {
120118
super.put(e);
121-
totalFetchRingBufferSize.addAndGet(getNodeSize((Tuple)e));
119+
totalFetchRingBufferSize.addAndGet(RedshiftMemoryUtils.calculateNodeSize((Tuple)e));
122120
}
123121
}
124122
} // By size
@@ -135,7 +133,7 @@ public E take() throws InterruptedException {
135133
Tuple row = (Tuple)e;
136134
long currentBufSize;
137135
boolean bufWasFull = (totalFetchRingBufferSize.get() >= fetchRingBufferSizeCapacity);
138-
currentBufSize = totalFetchRingBufferSize.addAndGet(-getNodeSize(row));
136+
currentBufSize = totalFetchRingBufferSize.addAndGet(-RedshiftMemoryUtils.calculateNodeSize(row));
139137

140138
// Signal the waiters
141139
if (bufWasFull) {
@@ -238,18 +236,4 @@ private void signalNotFull() {
238236
putLock.unlock();
239237
}
240238
}
241-
242-
/**
243-
* Returns the size in bytes of an individual node of Ring buffer queue/linked list
244-
*/
245-
private int getNodeSize(Tuple row) {
246-
/**
247-
* Node overheads are 32 bytes for 64-bit JVM and 16 bytes for 32-bit JVM
248-
* For 64-bit JVM: (8 + 8 + 16) => 8 byte reference for Tuple object + 8 byte reference for next
249-
* + 16 byte Node object header overhead
250-
* Each of these are reduced to half in case of 32-bit JVM.
251-
*/
252-
int estimatedNodeSize = row.getTupleSize() + (RedshiftConnectionImpl.IS_64_BIT_JVM ? 32 : 16);
253-
return (int) (estimatedNodeSize * MEMORY_ESTIMATE_SCALING_FACTOR); // using a scaling factor for avoiding OOM errors
254-
}
255239
}

0 commit comments

Comments
 (0)