23
23
import java .io .EOFException ;
24
24
import java .io .IOException ;
25
25
import java .nio .ByteBuffer ;
26
+ import java .nio .channels .ReadableByteChannel ;
26
27
import java .nio .channels .ScatteringByteChannel ;
28
+ import java .util .concurrent .atomic .AtomicInteger ;
27
29
28
30
/**
29
- * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
31
+ * A size delimited Receive that consists of a 4 byte network-ordered size N
32
+ * followed by N bytes of content.
30
33
*/
31
34
public class NetworkReceive implements Receive {
32
35
@@ -36,129 +39,189 @@ public class NetworkReceive implements Receive {
36
39
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer .allocate (0 );
37
40
38
41
private final String source ;
39
- private final ByteBuffer size ;
42
+ private final ByteBuffer sizeBuf ;
43
+ private final ByteBuffer minBuf ;
40
44
private final int maxSize ;
41
45
private final MemoryPool memoryPool ;
46
+ private final AtomicInteger byteCount ;
42
47
private int requestedBufferSize = -1 ;
43
- private ByteBuffer buffer ;
48
+ private ByteBuffer payloadBuffer = null ;
49
+ private volatile ReadState readState = ReadState .READ_SIZE ;
44
50
51
+ enum ReadState {
52
+ READ_SIZE , VALIDATE_SIZE , ALLOCATE_BUFFER , READ_PAYLOAD , COMPLETE
53
+ }
45
54
46
- public NetworkReceive (String source , ByteBuffer buffer ) {
47
- this .source = source ;
48
- this .buffer = buffer ;
49
- this .size = null ;
50
- this .maxSize = UNLIMITED ;
51
- this .memoryPool = MemoryPool .NONE ;
55
+ public NetworkReceive () {
56
+ this (UNKNOWN_SOURCE );
52
57
}
53
58
54
59
public NetworkReceive (String source ) {
55
- this .source = source ;
56
- this .size = ByteBuffer .allocate (4 );
57
- this .buffer = null ;
58
- this .maxSize = UNLIMITED ;
59
- this .memoryPool = MemoryPool .NONE ;
60
+ this (UNLIMITED , source );
61
+ }
62
+
63
+ public NetworkReceive (String source , ByteBuffer buffer ) {
64
+ this (source );
65
+ this .payloadBuffer = buffer ;
60
66
}
61
67
62
68
public NetworkReceive (int maxSize , String source ) {
63
- this .source = source ;
64
- this .size = ByteBuffer .allocate (4 );
65
- this .buffer = null ;
66
- this .maxSize = maxSize ;
67
- this .memoryPool = MemoryPool .NONE ;
69
+ this (maxSize , source , MemoryPool .NONE );
68
70
}
69
71
70
72
public NetworkReceive (int maxSize , String source , MemoryPool memoryPool ) {
71
73
this .source = source ;
72
- this .size = ByteBuffer .allocate (4 );
73
- this .buffer = null ;
74
74
this .maxSize = maxSize ;
75
75
this .memoryPool = memoryPool ;
76
- }
77
76
78
- public NetworkReceive () {
79
- this (UNKNOWN_SOURCE );
80
- }
81
-
82
- @ Override
83
- public String source () {
84
- return source ;
85
- }
86
-
87
- @ Override
88
- public boolean complete () {
89
- return !size .hasRemaining () && buffer != null && !buffer .hasRemaining ();
77
+ this .minBuf = (ByteBuffer ) ByteBuffer .allocate (SslUtils .SSL_RECORD_HEADER_LENGTH ).position (4 );
78
+ this .sizeBuf = (ByteBuffer ) this .minBuf .duplicate ().position (0 ).limit (4 );
79
+ this .byteCount = new AtomicInteger (0 );
90
80
}
91
81
82
+ @ SuppressWarnings ("fallthrough" )
92
83
public long readFrom (ScatteringByteChannel channel ) throws IOException {
93
84
int read = 0 ;
94
- if (size .hasRemaining ()) {
95
- int bytesRead = channel .read (size );
96
- if (bytesRead < 0 )
97
- throw new EOFException ();
98
- read += bytesRead ;
99
- if (!size .hasRemaining ()) {
100
- size .rewind ();
101
- int receiveSize = size .getInt ();
102
- if (receiveSize < 0 )
103
- throw new InvalidReceiveException ("Invalid receive (size = " + receiveSize + ")" );
104
- if (maxSize != UNLIMITED && receiveSize > maxSize )
105
- throw new InvalidReceiveException ("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")" );
106
- requestedBufferSize = receiveSize ; //may be 0 for some payloads (SASL)
107
- if (receiveSize == 0 ) {
108
- buffer = EMPTY_BUFFER ;
85
+
86
+ switch (readState ) {
87
+ case READ_SIZE :
88
+ read += readRequestedBufferSize (channel );
89
+ if (this .sizeBuf .hasRemaining ()) {
90
+ break ;
109
91
}
110
- }
92
+ this .readState = ReadState .VALIDATE_SIZE ;
93
+ /** FALLTHROUGH TO NEXT STATE */
94
+ case VALIDATE_SIZE :
95
+ if (this .requestedBufferSize != 0 ) {
96
+ read += validateRequestedBufferSize (channel );
97
+ if (this .minBuf .hasRemaining ()) {
98
+ break ;
99
+ }
100
+ }
101
+ this .readState = ReadState .ALLOCATE_BUFFER ;
102
+ /** FALLTHROUGH */
103
+ case ALLOCATE_BUFFER :
104
+ if (this .requestedBufferSize == 0 ) {
105
+ this .payloadBuffer = EMPTY_BUFFER ;
106
+ } else {
107
+ this .payloadBuffer = tryAllocateBuffer (this .requestedBufferSize );
108
+ if (this .payloadBuffer == null ) {
109
+ break ;
110
+ } else {
111
+ // Copy any bytes that were already consumed
112
+ this .minBuf .position (this .sizeBuf .limit ());
113
+ this .payloadBuffer .put (this .minBuf );
114
+ }
115
+ }
116
+ this .readState = ReadState .READ_PAYLOAD ;
117
+ /** FALLTHROUGH TO NEXT STATE */
118
+ case READ_PAYLOAD :
119
+ final int payloadRead = channel .read (payloadBuffer );
120
+ if (payloadRead < 0 )
121
+ throw new EOFException ();
122
+ read += payloadRead ;
123
+ if (!this .payloadBuffer .hasRemaining ()) {
124
+ this .readState = ReadState .COMPLETE ;
125
+ }
126
+ break ;
127
+ case COMPLETE :
128
+ break ;
111
129
}
112
- if (buffer == null && requestedBufferSize != -1 ) { //we know the size we want but havent been able to allocate it yet
113
- buffer = memoryPool .tryAllocate (requestedBufferSize );
114
- if (buffer == null )
115
- log .trace ("Broker low on memory - could not allocate buffer of size {} for source {}" , requestedBufferSize , source );
130
+
131
+ this .byteCount .addAndGet (read );
132
+
133
+ return read ;
134
+ }
135
+
136
+ private int validateRequestedBufferSize (final ScatteringByteChannel channel )
137
+ throws IOException {
138
+ int minRead = channel .read (this .minBuf );
139
+ if (minRead < 0 ) {
140
+ throw new EOFException ();
116
141
}
117
- if (buffer != null ) {
118
- int bytesRead = channel .read (buffer );
119
- if (bytesRead < 0 )
120
- throw new EOFException ();
121
- read += bytesRead ;
142
+ if (!this .minBuf .hasRemaining ()) {
143
+ final boolean isEncrypted =
144
+ SslUtils .isEncrypted ((ByteBuffer ) this .minBuf .duplicate ().rewind ());
145
+ if (isEncrypted ) {
146
+ throw new InvalidReceiveException (
147
+ "Recieved an unexpected SSL packet from the server. "
148
+ + "Please ensure the client is properly configured with SSL enabled." );
149
+ }
150
+ if (this .requestedBufferSize < 0 )
151
+ throw new InvalidReceiveException (
152
+ "Invalid receive (size = " + this .requestedBufferSize + ")" );
153
+ if (maxSize != UNLIMITED && this .requestedBufferSize > maxSize )
154
+ throw new InvalidReceiveException ("Invalid receive (size = "
155
+ + this .requestedBufferSize + " larger than " + maxSize + ")" );
122
156
}
123
157
124
- return read ;
158
+ return minRead ;
159
+ }
160
+
161
+ private ByteBuffer tryAllocateBuffer (final int bufSize ) {
162
+ final ByteBuffer bb = memoryPool .tryAllocate (bufSize );
163
+ if (bb == null ) {
164
+ log .trace ("Broker low on memory - could not allocate buffer of size {} for source {}" ,
165
+ requestedBufferSize , source );
166
+ }
167
+ return bb ;
168
+ }
169
+
170
+ private int readRequestedBufferSize (final ReadableByteChannel channel ) throws IOException {
171
+ final int sizeRead = channel .read (sizeBuf );
172
+ if (sizeRead < 0 ) {
173
+ throw new EOFException ();
174
+ }
175
+ if (sizeBuf .hasRemaining ()) {
176
+ return sizeRead ;
177
+ }
178
+ sizeBuf .rewind ();
179
+ this .requestedBufferSize = sizeBuf .getInt ();
180
+ return sizeRead ;
125
181
}
126
182
127
183
@ Override
128
184
public boolean requiredMemoryAmountKnown () {
129
- return requestedBufferSize != - 1 ;
185
+ return this . readState . ordinal () > ReadState . VALIDATE_SIZE . ordinal () ;
130
186
}
131
187
132
188
@ Override
133
189
public boolean memoryAllocated () {
134
- return buffer != null ;
190
+ return this . readState . ordinal () >= ReadState . READ_PAYLOAD . ordinal () ;
135
191
}
136
192
193
+ @ Override
194
+ public boolean complete () {
195
+ return this .readState == ReadState .COMPLETE ;
196
+ }
137
197
138
198
@ Override
139
199
public void close () throws IOException {
140
- if (buffer != null && buffer != EMPTY_BUFFER ) {
141
- memoryPool .release (buffer );
142
- buffer = null ;
200
+ if (payloadBuffer != null && payloadBuffer != EMPTY_BUFFER ) {
201
+ memoryPool .release (payloadBuffer );
202
+ payloadBuffer = null ;
143
203
}
144
204
}
145
205
206
+ @ Override
207
+ public String source () {
208
+ return source ;
209
+ }
210
+
146
211
public ByteBuffer payload () {
147
- return this .buffer ;
212
+ return this .payloadBuffer ;
148
213
}
149
214
150
215
public int bytesRead () {
151
- if (buffer == null )
152
- return size .position ();
153
- return buffer .position () + size .position ();
216
+ return this .byteCount .get ();
154
217
}
155
218
156
219
/**
157
220
* Returns the total size of the receive including payload and size buffer
158
221
* for use in metrics. This is consistent with {@link NetworkSend#size()}
159
222
*/
160
223
public int size () {
161
- return payload ().limit () + size .limit ();
224
+ return payload ().limit () + sizeBuf .limit ();
162
225
}
163
226
164
227
}
0 commit comments