Skip to content

Commit 95aff6e

Browse files
authored
Feat: Add instance fleet support to EMR resource (#150)
1 parent ec737cd commit 95aff6e

2 files changed

Lines changed: 77 additions & 29 deletions

File tree

orchard-provider-aws/src/main/scala/com/salesforce/mce/orchard/io/aws/resource/EmrResource.scala

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ case class EmrResource(
5353

5454
val response = Client
5555
.emr()
56-
.runJobFlow{
56+
.runJobFlow {
5757
val req = loggingUriBase
5858
.foldLeft(
5959
RunJobFlowRequest
@@ -83,33 +83,62 @@ case class EmrResource(
8383
.instances {
8484
val builder = JobFlowInstancesConfig
8585
.builder()
86-
.ec2SubnetId(instancesConfig.subnetId)
8786
.keepJobFlowAliveWhenNoSteps(true)
8887

89-
instancesConfig.instanceGroupConfigs
90-
.foldLeft(builder) { case (b, instGroupConfigs) =>
91-
b.instanceGroups(
92-
instGroupConfigs.map { c =>
93-
val builder = InstanceGroupConfig
94-
.builder()
95-
.name(s"orchard-${c.instanceRoleType}".toLowerCase)
96-
.instanceRole(c.instanceRoleType)
97-
.instanceCount(c.instanceCount)
98-
.instanceType(c.instanceType)
99-
100-
c.instanceBidPrice
101-
.fold(builder.market(MarketType.ON_DEMAND))(p =>
102-
if (lastAttempt && useOnDemandOnLastAttempt) {
103-
builder.market(MarketType.ON_DEMAND)
104-
} else {
105-
builder.bidPrice(p).market(MarketType.SPOT)
106-
}
107-
)
108-
109-
builder.build()
110-
}: _*
111-
)
112-
}
88+
if (instancesConfig.instanceFleetConfigs.exists(_.nonEmpty)) {
89+
instancesConfig.subnetIds.foldLeft(builder)(_.ec2SubnetIds(_: _*))
90+
instancesConfig.instanceFleetConfigs
91+
.foldLeft(builder) { case (b, instFleetConfigs) =>
92+
b.instanceFleets(
93+
instFleetConfigs.map { c =>
94+
val builder = InstanceFleetConfig
95+
.builder()
96+
.name(s"orchard-instance-fleet-${c.instanceRoleType}".toLowerCase)
97+
.instanceFleetType(c.instanceRoleType)
98+
.instanceTypeConfigs(c.instanceConfigs.map { i =>
99+
val builder = InstanceTypeConfig
100+
.builder()
101+
.instanceType(i.instanceType)
102+
i.bidPrice.foldLeft(builder)(_.bidPrice(_))
103+
i.weightedCapacity.foldLeft(builder)(_.weightedCapacity(_))
104+
builder.build()
105+
}: _*)
106+
if (lastAttempt && useOnDemandOnLastAttempt || c.targetSpotCapacity.isEmpty) {
107+
builder.targetOnDemandCapacity(c.targetOnDemandCapacity)
108+
} else {
109+
c.targetSpotCapacity.foldLeft(builder)(_.targetSpotCapacity(_))
110+
}
111+
builder.build()
112+
}: _*
113+
)
114+
}
115+
} else {
116+
instancesConfig.subnetId.foldLeft(builder)(_.ec2SubnetId(_))
117+
instancesConfig.instanceGroupConfigs
118+
.foldLeft(builder) { case (b, instGroupConfigs) =>
119+
b.instanceGroups(
120+
instGroupConfigs.map { c =>
121+
val builder = InstanceGroupConfig
122+
.builder()
123+
.name(s"orchard-instance-group-${c.instanceRoleType}".toLowerCase)
124+
.instanceRole(c.instanceRoleType)
125+
.instanceCount(c.instanceCount)
126+
.instanceType(c.instanceType)
127+
128+
c.instanceBidPrice
129+
.fold(builder.market(MarketType.ON_DEMAND))(p =>
130+
if (lastAttempt && useOnDemandOnLastAttempt) {
131+
builder.market(MarketType.ON_DEMAND)
132+
} else {
133+
builder.bidPrice(p).market(MarketType.SPOT)
134+
}
135+
)
136+
137+
builder.build()
138+
}: _*
139+
)
140+
}
141+
}
113142

114143
instancesConfig.ec2KeyName
115144
.foldLeft(builder)(_.ec2KeyName(_))
@@ -130,7 +159,7 @@ case class EmrResource(
130159

131160
spec.customAmiId.foldLeft(req)(_.customAmiId(_))
132161
req.build()
133-
}
162+
}
134163
Json.toJson(EmrResource.InstSpec(response.jobFlowId()))
135164
}
136165

@@ -202,10 +231,29 @@ object EmrResource {
202231
implicit val instanceGroupConfigReads: Reads[InstanceGroupConfig] =
203232
Json.reads[InstanceGroupConfig]
204233

234+
case class InstanceTypeConfig(
235+
instanceType: String,
236+
bidPrice: Option[String],
237+
weightedCapacity: Option[Int]
238+
)
239+
implicit val instanceTypeConfigReads: Reads[InstanceTypeConfig] =
240+
Json.reads[InstanceTypeConfig]
241+
242+
case class InstanceFleetConfig(
243+
instanceRoleType: String,
244+
targetOnDemandCapacity: Int,
245+
targetSpotCapacity: Option[Int],
246+
instanceConfigs: Seq[InstanceTypeConfig]
247+
)
248+
implicit val instanceFleetConfigReads: Reads[InstanceFleetConfig] =
249+
Json.reads[InstanceFleetConfig]
250+
205251
case class InstancesConfig(
206-
subnetId: String,
252+
subnetId: Option[String],
253+
subnetIds: Option[Seq[String]],
207254
ec2KeyName: Option[String],
208255
instanceGroupConfigs: Option[Seq[InstanceGroupConfig]],
256+
instanceFleetConfigs: Option[Seq[InstanceFleetConfig]],
209257
emrManagedMasterSecurityGroup: Option[String],
210258
emrManagedSlaveSecurityGroup: Option[String],
211259
additionalMasterSecurityGroups: Option[Seq[String]],

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "0.27.1"
1+
ThisBuild / version := "0.28.0"

0 commit comments

Comments
 (0)