11package com .scylladb .migrator
22
3+ import com .scylladb .alternator .AlternatorEndpointProvider
34import com .scylladb .migrator .config .{ DynamoDBEndpoint , SourceSettings , TargetSettings }
4- import org .apache .hadoop .dynamodb . DynamoDBConstants
5+ import org .apache .hadoop .conf .{ Configurable , Configuration }
56import org .apache .hadoop .dynamodb .read .DynamoDBInputFormat
7+ import org .apache .hadoop .dynamodb .write .DynamoDBOutputFormat
8+ import org .apache .hadoop .dynamodb .{ DynamoDBConstants , DynamoDbClientBuilderTransformer }
69import org .apache .hadoop .mapred .JobConf
710import org .apache .log4j .LogManager
811import software .amazon .awssdk .auth .credentials .{
912 AwsCredentials ,
1013 AwsCredentialsProvider ,
1114 ProfileCredentialsProvider
1215}
13- import software .amazon .awssdk .services .dynamodb .DynamoDbClient
16+ import software .amazon .awssdk .services .dynamodb .{ DynamoDbClient , DynamoDbClientBuilder }
1417import software .amazon .awssdk .services .dynamodb .model .{
1518 BillingMode ,
1619 CreateTableRequest ,
@@ -30,6 +33,7 @@ import software.amazon.awssdk.services.dynamodb.model.{
3033import software .amazon .awssdk .services .dynamodb .streams .DynamoDbStreamsClient
3134
3235import java .util .stream .Collectors
36+ import java .net .URI
3337import scala .util .{ Failure , Success , Try }
3438import scala .jdk .OptionConverters ._
3539
@@ -230,11 +234,13 @@ object DynamoUtils {
230234 jobConf.set(DynamoDBConstants .YARN_RESOURCE_MANAGER_ENABLED , " false" )
231235
232236 jobConf.set(
233- DynamoDBConstants .CUSTOM_CREDENTIALS_PROVIDER_CONF ,
234- " com.scylladb.migrator.DynamoUtils$ProfileCredentialsProvider" )
237+ DynamoDBConstants .CUSTOM_CLIENT_BUILDER_TRANSFORMER ,
238+ classOf [AlternatorLoadBalancingEnabler ].getName)
239+
235240 jobConf.set(
236- " mapred.output.format.class" ,
237- " org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat" )
241+ DynamoDBConstants .CUSTOM_CREDENTIALS_PROVIDER_CONF ,
242+ classOf [ProfileCredentialsProvider ].getName)
243+ jobConf.set(" mapred.output.format.class" , classOf [DynamoDBOutputFormat ].getName)
238244 jobConf.set(" mapred.input.format.class" , classOf [DynamoDBInputFormat ].getName)
239245 }
240246
@@ -284,4 +290,17 @@ object DynamoUtils {
284290 def resolveCredentials (): AwsCredentials = delegate.resolveCredentials()
285291 }
286292
293+ class AlternatorLoadBalancingEnabler extends DynamoDbClientBuilderTransformer with Configurable {
294+ private var conf : Configuration = null
295+
296+ override def apply (builder : DynamoDbClientBuilder ): DynamoDbClientBuilder =
297+ builder.endpointProvider(
298+ new AlternatorEndpointProvider (URI .create(conf.get(DynamoDBConstants .ENDPOINT )))
299+ )
300+
301+ override def setConf (configuration : Configuration ): Unit =
302+ conf = configuration
303+ override def getConf : Configuration = conf
304+ }
305+
287306}
0 commit comments