22
33import com .google .cloud .ReadChannel ;
44import com .google .common .base .Optional ;
5+ import com .google .common .io .ByteSink ;
56import com .google .edwmigration .dbsync .common .storage .AbstractRemoteByteSource ;
67import com .google .edwmigration .dbsync .common .storage .Slice ;
78import com .google .cloud .storage .BlobId ;
@@ -26,23 +27,22 @@ public class GcsByteSource extends AbstractRemoteByteSource {
2627
2728 private final Storage storage ;
2829 private final BlobId blobId ;
29- private InputStream sourceStream ;
30-
31- private long currentSourceOffset = 0 ;
30+ private InputStreamCache inputStreamCache ;
3231
3332 private GcsByteSource (Storage storage , BlobId blobId , @ Nullable Slice slice ) {
3433 super (slice );
3534 this .storage = storage ;
3635 this .blobId = blobId ;
36+ this .inputStreamCache = new InputStreamCache (
37+ Channels .newInputStream (storage .reader (blobId )), 0 );
3738 }
3839
3940 private GcsByteSource (Storage storage , BlobId blobId , @ Nullable Slice slice ,
40- InputStream sourceStream , long currentSourceOffset ) {
41+ InputStreamCache inputStreamCache ) {
4142 super (slice );
4243 this .storage = storage ;
4344 this .blobId = blobId ;
44- this .sourceStream = sourceStream ;
45- this .currentSourceOffset = currentSourceOffset ;
45+ this .inputStreamCache = inputStreamCache ;
4646 }
4747
4848 public GcsByteSource (Storage storage , BlobId blobId ) {
@@ -51,7 +51,7 @@ public GcsByteSource(Storage storage, BlobId blobId) {
5151
5252 @ Override
5353 protected ByteSource slice (Slice slice ) {
54- return new GcsByteSource (storage , blobId , slice , sourceStream , currentSourceOffset );
54+ return new GcsByteSource (storage , blobId , slice , inputStreamCache );
5555 }
5656
5757 @ Override
@@ -60,11 +60,9 @@ public InputStream openStream() throws IOException {
6060 Slice slice = getSlice ();
6161 if (slice != null ) {
6262 channel .seek (slice .getOffset ());
63- channel = channel .limit (slice .getLength ());
63+ channel = channel .limit (slice .getLength () + slice . getOffset () );
6464 }
65- sourceStream = Channels .newInputStream (channel );
66- currentSourceOffset = slice .getOffset ();
67- return sourceStream ;
65+ return Channels .newInputStream (channel );
6866 }
6967
7068 @ Override
@@ -74,22 +72,27 @@ public long copyTo(OutputStream outputStream) throws IOException {
7472 return super .copyTo (outputStream );
7573 }
7674
77- if (slice .getOffset () < currentSourceOffset ) {
75+ if (slice .getOffset () < inputStreamCache . getCurrentOffset () ) {
7876 if (DEBUG ) {
7977 LOG .info (String .format ("Target offset %d smaller than current offset %d, reopen stream" ,
80- slice .getOffset (), currentSourceOffset ));
78+ slice .getOffset (), inputStreamCache . currentOffset ));
8179 }
82- sourceStream .close ();
83- sourceStream = Channels .newInputStream (storage .reader (blobId ));
84- currentSourceOffset = 0 ;
80+ inputStreamCache . getSourceStream () .close ();
81+ inputStreamCache . setSourceStream ( Channels .newInputStream (storage .reader (blobId ) ));
82+ inputStreamCache . setCurrentOffset ( 0 ) ;
8583 }
8684
8785 // Skip forward to the desired start.
88- long toSkip = slice .getOffset () - currentSourceOffset ;
86+ long toSkip = slice .getOffset () - inputStreamCache . getCurrentOffset () ;
8987 skip (toSkip );
9088 return copy (slice .getLength (), outputStream );
9189 }
9290
91+ @ Override
92+ public long copyTo (ByteSink byteSink ) throws IOException {
93+ return copyTo (byteSink .openBufferedStream ());
94+ }
95+
9396 @ Override
9497 protected MoreObjects .ToStringHelper toStringHelper (ToStringHelper helper ) {
9598 return super .toStringHelper (helper )
@@ -104,12 +107,12 @@ public Optional<Long> sizeIfKnown() {
104107 private void skip (long length ) throws IOException {
105108 long remaining = length ;
106109 while (remaining > 0 ) {
107- long skipped = sourceStream .skip (remaining );
108- if (skipped < remaining ) {
110+ long skipped = inputStreamCache . getSourceStream () .skip (remaining );
111+ if (skipped == 0 ) {
109112 throw new EOFException ("Unexpected end of stream while skipping." );
110113 }
111114 remaining -= skipped ;
112- currentSourceOffset += skipped ;
115+ inputStreamCache . setCurrentOffset ( inputStreamCache . getCurrentOffset () + skipped ) ;
113116 }
114117 }
115118
@@ -119,17 +122,49 @@ private long copy(long copyLength, OutputStream out) throws IOException {
119122 long remaining = copyLength ;
120123 while (remaining > 0 ) {
121124 int bytesToRead = (int ) Math .min (buffer .length , remaining );
122- int read = sourceStream .read (buffer , 0 , bytesToRead );
125+ int read = inputStreamCache . getSourceStream () .read (buffer , 0 , bytesToRead );
123126 if (read == -1 ) {
124127 throw new EOFException ("Unexpected end of stream while copying " + copyLength
125128 + " bytes starting at offset " + getSlice ().getOffset ());
126129 }
127130 out .write (buffer , 0 , read );
128131 remaining -= read ;
129- currentSourceOffset += read ;
132+ inputStreamCache . setCurrentOffset ( inputStreamCache . getCurrentOffset () + read ) ;
130133 }
131134
132135 // As we always read until there's no remaining bytes or throw.
133136 return copyLength ;
134137 }
138+
139+ /**
140+ * A cache with an {@link InputStream} and a mark of the current offset within the stream. This
141+ * cache will be shared by this {@link ByteSource} and the copies created by
142+ * {@link #slice(long, long)} to efficiently implement the {@link #copyTo(OutputStream)} method.
143+ */
144+ private static class InputStreamCache {
145+
146+ private InputStream sourceStream ;
147+ private long currentOffset ;
148+
149+ private InputStreamCache (InputStream inputStream , long currentOffSet ) {
150+ this .sourceStream = inputStream ;
151+ this .currentOffset = currentOffSet ;
152+ }
153+
154+ private InputStream getSourceStream () {
155+ return sourceStream ;
156+ }
157+
158+ public long getCurrentOffset () {
159+ return currentOffset ;
160+ }
161+
162+ public void setSourceStream (InputStream inputStream ) {
163+ this .sourceStream = inputStream ;
164+ }
165+
166+ public void setCurrentOffset (long currentOffset ) {
167+ this .currentOffset = currentOffset ;
168+ }
169+ }
135170}
0 commit comments