1111import java .util .Base64 ;
1212import java .util .List ;
1313import java .util .concurrent .ExecutionException ;
14+ import java .util .concurrent .Executors ;
1415import java .util .concurrent .Future ;
1516
16-
1717import org .apache .hadoop .conf .Configuration ;
1818import org .apache .hadoop .hbase .TableName ;
1919import org .apache .hadoop .hbase .wal .WAL .Entry ;
2626import com .amazonaws .services .kinesis .producer .KinesisProducer ;
2727import com .amazonaws .services .kinesis .producer .KinesisProducerConfiguration ;
2828import com .amazonaws .services .kinesis .producer .UserRecordResult ;
29+ import com .google .common .util .concurrent .FutureCallback ;
30+ import com .google .common .util .concurrent .ListenableFuture ;
31+ import com .google .common .util .concurrent .ListeningExecutorService ;
32+ import com .google .common .util .concurrent .MoreExecutors ;
33+ import com .google .common .util .concurrent .Futures ;
2934
3035public class KinesisDataSinkImpl extends DataSink {
3136 private MessageDigest md ;
@@ -35,13 +40,19 @@ public class KinesisDataSinkImpl extends DataSink {
3540 private KinesisProducer kinesis = null ;
3641
3742 private KinesisConfigurationUtil configUtil ;
43+
44+
45+ private FutureCallback <UserRecordResult > putRecordCallback ;
46+ private ListeningExecutorService executor ;
3847
3948 /**
4049 * Constructor
4150 * @param config
4251 */
4352 public KinesisDataSinkImpl (Configuration config ) {
4453 super (config );
54+ this .configUtil = this .getConfigurationUtil ();
55+
4556 try {
4657 md = MessageDigest .getInstance ("MD5" );
4758 } catch (NoSuchAlgorithmException e ) {
@@ -54,6 +65,21 @@ public KinesisDataSinkImpl(Configuration config) {
5465 + "" , e );
5566 e .printStackTrace ();
5667 }
68+ if ( this .configUtil .isSynchPutsEnabled () == false ) {
69+ LOG .info ("Initilizing Asynchronous putRecords. We wil skip failed PutRecords will be lost! " );
70+ putRecordCallback = new FutureCallback <UserRecordResult >() {
71+ @ Override
72+ public void onFailure (Throwable t ) {
73+ /* Analyze and respond to the failure */
74+ };
75+ @ Override
76+ public void onSuccess (UserRecordResult result ) {
77+ /* Respond to the success */
78+ };
79+ };
80+
81+ executor = MoreExecutors .listeningDecorator (Executors .newFixedThreadPool (50 ));
82+ }
5783 }
5884
5985 /**
@@ -75,7 +101,9 @@ public boolean isBlocking() {
75101 */
76102 public void putRecord (ByteBuffer buffer , String tablename ) throws IOException , InterruptedException , ExecutionException {
77103 String partition = UUIDHelper .getBase64UUID ();
78- LOG .debug ("Putting record in random partition: " + partition );
104+ if (LOG .isDebugEnabled ()) {
105+ LOG .debug ("Putting record in random partition: " + partition );
106+ }
79107 this .putRecord ( buffer , tablename ,partition );
80108 }
81109
@@ -91,34 +119,41 @@ public void putRecord(ByteBuffer buffer, String tablename) throws IOException, I
91119 */
92120 public void putRecord (ByteBuffer buffer , String tablename ,String partition ) throws IOException , InterruptedException , ExecutionException {
93121 if (kinesis == null ) { // creating the producer when there is a request.
94- KinesisProducerConfiguration config = this . getConfigurationUtil () .getKPLConfiguration ();
95- LOG .debug ("First Time ptoducer . endpoint " + config .getKinesisEndpoint () + " port: " + config .getKinesisPort () );
122+ KinesisProducerConfiguration config = configUtil .getKPLConfiguration ();
123+ LOG .debug ("First Time producer . endpoint " + config .getKinesisEndpoint () + " port: " + config .getKinesisPort () );
96124 this .kinesis = KinesisProducerFactory .getProducer (config );
97125 }
98-
99-
126+
100127 md .update (partition .getBytes ());
101128 String digest = Base64 .getEncoder ().encodeToString (md .digest ());
102- LOG .debug ("Putting record in digest partition: " + digest );
103129 String destination = this .getConfigurationUtil ().getStreamNameFromTableName (tablename );
104- Future <UserRecordResult > putFuture = (Future <UserRecordResult >) kinesis .addUserRecord (destination , digest , buffer );
130+
131+
105132 long time = System .currentTimeMillis ();
106- LOG .debug ("Starting a put " + time );
107- UserRecordResult result = putFuture .get (); // this does block
108- LOG .debug ("Out of Put " + System .currentTimeMillis ());
109133
110- if (result .isSuccessful ()) {
111- LOG .debug (
112- "Put record into shard= {} PartitionKey = {}, time={} "
113- , result .getShardId ()
114- , digest
115- , System .currentTimeMillis () - time );
134+ if (configUtil .isSynchPutsEnabled ()) {
135+ Future <UserRecordResult > putFuture = (Future <UserRecordResult >) kinesis .addUserRecord (destination , digest , buffer );
136+
137+ UserRecordResult result = putFuture .get (); // this does block
138+
139+ if (result .isSuccessful ()) {
140+ if (LOG .isDebugEnabled ()) {
141+ LOG .debug (
142+ "Put record into shard= {} PartitionKey = {}, time={} "
143+ , result .getShardId ()
144+ , digest
145+ , System .currentTimeMillis () - time );
146+ }
147+ } else {
148+ for (Attempt attempt : result .getAttempts ()) {
149+ LOG .error (attempt .getErrorMessage ());
150+ throw new IOException ("Record faild to replicate" );
151+ }
152+ }
116153 } else {
117- for (Attempt attempt : result .getAttempts ()) {
118- LOG .error (attempt .getErrorMessage ());
119- throw new IOException ("Record faild to replicate" );
120- }
121- }
154+ ListenableFuture <UserRecordResult > putFuture = kinesis .addUserRecord (destination , digest , buffer );
155+ Futures .addCallback (putFuture ,putRecordCallback , executor );
156+ }
122157 }
123158
124159 /**
@@ -168,26 +203,19 @@ public List<Entry> filter(final List<Entry> oldEntries) {
168203
169204 @ Override
170205 public boolean supportsTransaction () {
171- // TODO Auto-generated method stub
172206 return false ;
173207 }
174208
175209 @ Override
176210 public void beginTransaction () {
177- // TODO Auto-generated method stub
178-
179211 }
180212
181213 @ Override
182214 public void commitTransaction () {
183- // TODO Auto-generated method stub
184-
185215 }
186216
187217 @ Override
188218 public void abortTransaction () {
189- // TODO Auto-generated method stub
190-
191219 }
192220
193221}
0 commit comments