Skip to content

Define SparkApplicationSubmitter interface to allow customizing submitting mechanism #2500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,16 @@ func start() {
}
}

sparkSubmitter := &sparkapplication.SparkSubmitter{}

// Setup controller for SparkApplication.
if err = sparkapplication.NewReconciler(
mgr,
mgr.GetScheme(),
mgr.GetClient(),
mgr.GetEventRecorderFor("spark-application-controller"),
registry,
sparkSubmitter,
newSparkApplicationReconcilerOptions(),
).SetupWithManager(mgr, newControllerOptions()); err != nil {
logger.Error(err, "Failed to create controller", "controller", "SparkApplication")
Expand Down
47 changes: 22 additions & 25 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ type Options struct {

// Reconciler reconciles a SparkApplication object.
type Reconciler struct {
manager ctrl.Manager
scheme *runtime.Scheme
client client.Client
recorder record.EventRecorder
options Options
registry *scheduler.Registry
manager ctrl.Manager
scheme *runtime.Scheme
client client.Client
recorder record.EventRecorder
registry *scheduler.Registry
submitter SparkApplicationSubmitter
options Options
}

// Reconciler implements reconcile.Reconciler.
Expand All @@ -93,15 +94,17 @@ func NewReconciler(
client client.Client,
recorder record.EventRecorder,
registry *scheduler.Registry,
submitter SparkApplicationSubmitter,
options Options,
) *Reconciler {
return &Reconciler{
manager: manager,
scheme: scheme,
client: client,
recorder: recorder,
registry: registry,
options: options,
manager: manager,
scheme: scheme,
client: client,
recorder: recorder,
registry: registry,
submitter: submitter,
options: options,
}
}

Expand Down Expand Up @@ -263,7 +266,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
}
app := old.DeepCopy()

_ = r.submitSparkApplication(app)
_ = r.submitSparkApplication(ctx, app)
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
Expand Down Expand Up @@ -331,7 +334,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
}
if timeUntilNextRetryDue <= 0 {
if r.validateSparkResourceDeletion(ctx, app) {
_ = r.submitSparkApplication(app)
_ = r.submitSparkApplication(ctx, app)
} else {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace)
Expand Down Expand Up @@ -412,7 +415,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
r.recordSparkApplicationEvent(app)
r.resetSparkApplicationStatus(app)
_ = r.submitSparkApplication(app)
_ = r.submitSparkApplication(ctx, app)
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
Expand Down Expand Up @@ -644,7 +647,7 @@ func (r *Reconciler) getSparkApplication(ctx context.Context, key types.Namespac
}

// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) {
func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.SparkApplication) (submitErr error) {
logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)

// SubmissionID must be set before creating any resources to ensure all the resources are labeled.
Expand Down Expand Up @@ -747,17 +750,11 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm
}
}()

sparkSubmitArgs, err := buildSparkSubmitArgs(app)
if err != nil {
return fmt.Errorf("failed to build spark-submit arguments: %v", err)
}

// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil {
if err := r.submitter.Submit(ctx, app); err != nil {
r.recordSparkApplicationEvent(app)
return fmt.Errorf("failed to run spark-submit: %v", err)
return fmt.Errorf("failed to submit spark application: %v", err)
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions internal/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 10 * time.Second},
)
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand All @@ -139,6 +140,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -205,6 +207,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -266,6 +269,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -325,6 +329,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -379,6 +384,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -438,6 +444,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -520,6 +527,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
record.NewFakeRecorder(3),
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand All @@ -539,6 +547,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
record.NewFakeRecorder(3),
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 1},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expand Down
42 changes: 30 additions & 12 deletions internal/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sparkapplication

import (
"context"
"fmt"
"os"
"os/exec"
Expand All @@ -26,30 +27,47 @@ import (
"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/pkg/common"
"github.com/kubeflow/spark-operator/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// submission includes information of a Spark application to be submitted.
type submission struct {
namespace string
name string
args []string
// SparkApplicationSubmitter is the interface for submitting a SparkApplication.
type SparkApplicationSubmitter interface {
Submit(ctx context.Context, app *v1beta2.SparkApplication) error
}

func newSubmission(args []string, app *v1beta2.SparkApplication) *submission {
return &submission{
namespace: app.Namespace,
name: app.Name,
args: args,
// SparkSubmitter submits a SparkApplication by calling spark-submit.
type SparkSubmitter struct {
}

// SparkSubmitter implements SparkApplicationSubmitter interface.
// This interface is highly experimental and may go under significant changes or removed in the future.
var _ SparkApplicationSubmitter = &SparkSubmitter{}

// Submit implements SparkApplicationSubmitter interface.
func (*SparkSubmitter) Submit(ctx context.Context, app *v1beta2.SparkApplication) error {
logger := log.FromContext(ctx)

args, err := buildSparkSubmitArgs(app)
if err != nil {
return fmt.Errorf("failed to build spark-submit arguments: %v", err)
}

// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit", "arguments", args)
if err := runSparkSubmit(args); err != nil {
return fmt.Errorf("failed to run spark-submit: %v", err)
}

return nil
}

func runSparkSubmit(submission *submission) error {
func runSparkSubmit(args []string) error {
sparkHome, present := os.LookupEnv(common.EnvSparkHome)
if !present {
return fmt.Errorf("env %s is not specified", common.EnvSparkHome)
}
command := filepath.Join(sparkHome, "bin", "spark-submit")
cmd := exec.Command(command, submission.args...)
cmd := exec.Command(command, args...)
_, err := cmd.Output()
if err != nil {
var errorMsg string
Expand Down