Skip to content

Commit 757dd88

Browse files
authored
Merge branch 'main' into 199-fix-dependency-reconciliation
2 parents 99c5a2d + 157d3b7 commit 757dd88

File tree

5 files changed

+282
-16
lines changed

5 files changed

+282
-16
lines changed

.mise.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tools]
22
actionlint = "1.7.7"
3-
ginkgo = '2.23.4'
3+
ginkgo = '2.25.2'
44
golang = '1.24'
55
golangci-lint = "2.4.0"
66
"go:golang.org/x/tools/cmd/goimports" = "0.36.0"

api/v1/prefectworkpool_types.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package v1
1818

1919
import (
20-
"strings"
21-
2220
corev1 "k8s.io/api/core/v1"
2321
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2422
"k8s.io/apimachinery/pkg/util/intstr"
@@ -136,13 +134,9 @@ func (s *PrefectWorkPool) Image() string {
136134
}
137135

138136
func (s *PrefectWorkPool) EntrypointArguments() []string {
139-
workPoolName := s.Name
140-
if strings.HasPrefix(workPoolName, "prefect") {
141-
workPoolName = "pool-" + workPoolName
142-
}
143137
return []string{
144138
"prefect", "worker", "start",
145-
"--pool", workPoolName, "--type", s.Spec.Type,
139+
"--pool", s.Name, "--type", s.Spec.Type,
146140
"--with-healthcheck",
147141
}
148142
}

api/v1/prefectworkpool_types_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,101 @@ var _ = Describe("PrefectWorkPool type", func() {
4646
Expect(copied).To(Equal(original))
4747
Expect(copied).NotTo(BeIdenticalTo(original))
4848
})
49+
50+
Describe("EntrypointArguments", func() {
51+
It("should use exact work pool name for names starting with 'prefect'", func() {
52+
workPool := &PrefectWorkPool{
53+
ObjectMeta: metav1.ObjectMeta{
54+
Name: "prefect-work-pool",
55+
},
56+
Spec: PrefectWorkPoolSpec{
57+
Type: "kubernetes",
58+
},
59+
}
60+
61+
args := workPool.EntrypointArguments()
62+
Expect(args).To(ContainElement("prefect-work-pool"))
63+
Expect(args).To(Equal([]string{
64+
"prefect", "worker", "start",
65+
"--pool", "prefect-work-pool", "--type", "kubernetes",
66+
"--with-healthcheck",
67+
}))
68+
})
69+
70+
It("should use exact work pool name for names not starting with 'prefect'", func() {
71+
workPool := &PrefectWorkPool{
72+
ObjectMeta: metav1.ObjectMeta{
73+
Name: "my-work-pool",
74+
},
75+
Spec: PrefectWorkPoolSpec{
76+
Type: "process",
77+
},
78+
}
79+
80+
args := workPool.EntrypointArguments()
81+
Expect(args).To(ContainElement("my-work-pool"))
82+
Expect(args).To(Equal([]string{
83+
"prefect", "worker", "start",
84+
"--pool", "my-work-pool", "--type", "process",
85+
"--with-healthcheck",
86+
}))
87+
})
88+
89+
It("ensures naming consistency between controller and worker", func() {
90+
workPool := &PrefectWorkPool{
91+
ObjectMeta: metav1.ObjectMeta{
92+
Name: "prefect-production",
93+
},
94+
Spec: PrefectWorkPoolSpec{
95+
Type: "kubernetes",
96+
},
97+
}
98+
99+
controllerName := workPool.Name
100+
101+
workerArgs := workPool.EntrypointArguments()
102+
var workerPoolName string
103+
for i, arg := range workerArgs {
104+
if arg == "--pool" && i+1 < len(workerArgs) {
105+
workerPoolName = workerArgs[i+1]
106+
break
107+
}
108+
}
109+
110+
Expect(controllerName).To(Equal("prefect-production"))
111+
Expect(workerPoolName).To(Equal("prefect-production"))
112+
Expect(controllerName).To(Equal(workerPoolName))
113+
})
114+
115+
It("should work consistently for all naming patterns", func() {
116+
testCases := []struct {
117+
name string
118+
expectedPool string
119+
}{
120+
{"prefect", "prefect"},
121+
{"prefect-k8s", "prefect-k8s"},
122+
{"prefect123", "prefect123"},
123+
{"k8s-pool", "k8s-pool"},
124+
{"my-prefect-pool", "my-prefect-pool"},
125+
{"PREFECT-POOL", "PREFECT-POOL"},
126+
{"production-pool", "production-pool"},
127+
{"dev", "dev"},
128+
}
129+
130+
for _, tc := range testCases {
131+
workPool := &PrefectWorkPool{
132+
ObjectMeta: metav1.ObjectMeta{
133+
Name: tc.name,
134+
},
135+
Spec: PrefectWorkPoolSpec{
136+
Type: "process",
137+
},
138+
}
139+
140+
args := workPool.EntrypointArguments()
141+
Expect(args).To(ContainElement(tc.expectedPool),
142+
"Work pool %s should use pool name %s", tc.name, tc.expectedPool)
143+
}
144+
})
145+
})
49146
})

internal/prefect/client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewClient(baseURL, apiKey string, log logr.Logger) *Client {
9191
}
9292

9393
// NewClientFromServerReference creates a new PrefectClient from a PrefectServerReference
94-
func NewClientFromServerReference(serverRef *prefectiov1.PrefectServerReference, apiKey string, log logr.Logger) (*Client, error) {
94+
func NewClientFromServerReference(serverRef *prefectiov1.PrefectServerReference, apiKey string, fallbackNamespace string, log logr.Logger) (*Client, error) {
9595
// Create a base client first to check if we're running in cluster
9696
baseClient := NewClient("", apiKey, log)
9797

@@ -105,12 +105,12 @@ func NewClientFromServerReference(serverRef *prefectiov1.PrefectServerReference,
105105
baseURL = "http://localhost:14200/api"
106106
log.V(1).Info("Using localhost for port-forwarding", "url", baseURL)
107107
} else {
108-
// Use the server's namespace as fallback if not specified
109-
fallbackNamespace := serverRef.Namespace
110-
if fallbackNamespace == "" {
111-
fallbackNamespace = "default" // Default to "default" namespace if not specified
108+
// Use the server's namespace, or fallback to the provided namespace
109+
namespace := serverRef.Namespace
110+
if namespace == "" {
111+
namespace = fallbackNamespace // Use provided fallback instead of hardcoded "default"
112112
}
113-
baseURL = serverRef.GetAPIURL(fallbackNamespace)
113+
baseURL = serverRef.GetAPIURL(namespace)
114114
log.V(1).Info("Using in-cluster URL", "url", baseURL)
115115
}
116116

@@ -150,8 +150,8 @@ func NewClientFromK8s(ctx context.Context, serverRef *prefectiov1.PrefectServerR
150150
return nil, fmt.Errorf("failed to get API key: %w", err)
151151
}
152152

153-
// Create client using the existing factory function
154-
return NewClientFromServerReference(serverRef, apiKey, log)
153+
// Create client using the existing factory function, passing the namespace as fallback
154+
return NewClientFromServerReference(serverRef, apiKey, namespace, log)
155155
}
156156

157157
// DeploymentSpec represents the request payload for creating/updating deployments

internal/prefect/client_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27+
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
2728
"github.com/go-logr/logr"
2829
. "github.com/onsi/ginkgo/v2"
2930
. "github.com/onsi/gomega"
@@ -806,4 +807,178 @@ var _ = Describe("Prefect HTTP Client", func() {
806807
})
807808
})
808809
})
810+
811+
Describe("Client Creation from ServerReference", func() {
812+
var (
813+
serverRef *prefectiov1.PrefectServerReference
814+
logger logr.Logger
815+
)
816+
817+
BeforeEach(func() {
818+
logger = logr.Discard()
819+
})
820+
821+
Describe("NewClientFromServerReference", func() {
822+
Context("URL generation behavior (focus on the bug)", func() {
823+
It("should demonstrate that function hardcodes 'default' namespace", func() {
824+
// Testing the URL generation logic by comparing what the function would produce
825+
// vs what it should produce if it accepted a fallback namespace parameter
826+
827+
serverRef := &prefectiov1.PrefectServerReference{
828+
Name: "prefect-server",
829+
// Namespace is empty
830+
}
831+
832+
// What GetAPIURL produces with different fallback namespaces
833+
urlWithDefault := serverRef.GetAPIURL("default")
834+
urlWithCustom := serverRef.GetAPIURL("deployment-namespace")
835+
836+
Expect(urlWithDefault).To(Equal("http://prefect-server.default.svc:4200/api"))
837+
Expect(urlWithCustom).To(Equal("http://prefect-server.deployment-namespace.svc:4200/api"))
838+
839+
// The bug: NewClientFromServerReference always uses "default" fallback
840+
// instead of accepting the fallback namespace as a parameter
841+
Expect(urlWithDefault).NotTo(Equal(urlWithCustom))
842+
})
843+
844+
It("should use provided fallback namespace when server namespace is empty (AFTER FIX)", func() {
845+
serverRef := &prefectiov1.PrefectServerReference{
846+
Name: "prefect-server",
847+
// Namespace is empty - should use fallback namespace parameter
848+
}
849+
850+
// This test verifies the fix works for remote servers (to avoid port-forwarding issues)
851+
serverRefWithRemote := &prefectiov1.PrefectServerReference{
852+
RemoteAPIURL: ptr.To("https://api.prefect.cloud"),
853+
}
854+
855+
client, err := NewClientFromServerReference(serverRefWithRemote, "test-key", "fallback-namespace", logger)
856+
857+
Expect(err).NotTo(HaveOccurred())
858+
Expect(client).NotTo(BeNil())
859+
// Remote URL should not be affected by namespace changes
860+
Expect(client.BaseURL).To(Equal("https://api.prefect.cloud/api"))
861+
862+
// Test the namespace behavior by checking what GetAPIURL would return
863+
expectedURL := serverRef.GetAPIURL("fallback-namespace")
864+
Expect(expectedURL).To(Equal("http://prefect-server.fallback-namespace.svc:4200/api"))
865+
})
866+
})
867+
868+
Context("when server reference has remote server", func() {
869+
It("should use remote API URL", func() {
870+
serverRef = &prefectiov1.PrefectServerReference{
871+
RemoteAPIURL: ptr.To("https://api.prefect.cloud"),
872+
}
873+
874+
client, err := NewClientFromServerReference(serverRef, "test-key", "test-namespace", logger)
875+
876+
Expect(err).NotTo(HaveOccurred())
877+
Expect(client).NotTo(BeNil())
878+
Expect(client.BaseURL).To(Equal("https://api.prefect.cloud/api"))
879+
})
880+
})
881+
882+
Context("URL generation for different scenarios", func() {
883+
It("should correctly generate in-cluster URLs when namespace is specified", func() {
884+
serverRef := &prefectiov1.PrefectServerReference{
885+
Name: "prefect-server",
886+
Namespace: "prefect-system",
887+
}
888+
889+
// Test the URL generation directly without creating client (avoids port-forwarding issues)
890+
expectedURL := serverRef.GetAPIURL("fallback-namespace")
891+
Expect(expectedURL).To(Equal("http://prefect-server.prefect-system.svc:4200/api"))
892+
893+
// Verify fallback namespace is ignored when server namespace is specified
894+
fallbackURL := serverRef.GetAPIURL("different-namespace")
895+
Expect(fallbackURL).To(Equal("http://prefect-server.prefect-system.svc:4200/api"))
896+
Expect(expectedURL).To(Equal(fallbackURL))
897+
})
898+
899+
It("should use fallback namespace when server namespace is empty", func() {
900+
serverRef := &prefectiov1.PrefectServerReference{
901+
Name: "prefect-server",
902+
// Namespace is empty - should use fallback
903+
}
904+
905+
// Test different fallback namespaces
906+
url1 := serverRef.GetAPIURL("namespace-1")
907+
url2 := serverRef.GetAPIURL("namespace-2")
908+
909+
Expect(url1).To(Equal("http://prefect-server.namespace-1.svc:4200/api"))
910+
Expect(url2).To(Equal("http://prefect-server.namespace-2.svc:4200/api"))
911+
Expect(url1).NotTo(Equal(url2))
912+
})
913+
})
914+
})
915+
916+
Describe("NewClientFromK8s", func() {
917+
var (
918+
ctx context.Context
919+
)
920+
921+
BeforeEach(func() {
922+
ctx = context.Background()
923+
})
924+
925+
Context("namespace fallback behavior (BUG DEMONSTRATION)", func() {
926+
It("should demonstrate the bug: NewClientFromK8s doesn't pass fallback namespace", func() {
927+
// Test the URL generation directly to show the bug without port-forwarding issues
928+
serverRef := &prefectiov1.PrefectServerReference{
929+
Name: "prefect-server",
930+
// Namespace is empty - should use deployment's namespace
931+
}
932+
933+
// Direct test of GetAPIURL method shows what SHOULD happen
934+
expectedURL := serverRef.GetAPIURL("deployment-namespace")
935+
Expect(expectedURL).To(Equal("http://prefect-server.deployment-namespace.svc:4200/api"))
936+
937+
// But the bug is in NewClientFromServerReference which ignores the fallback
938+
// and hardcodes "default" instead of using the passed namespace
939+
actualURL := serverRef.GetAPIURL("default") // This is what the buggy code does
940+
Expect(actualURL).To(Equal("http://prefect-server.default.svc:4200/api"))
941+
942+
// These URLs should be the same but they're different due to the bug
943+
Expect(expectedURL).NotTo(Equal(actualURL))
944+
})
945+
946+
It("should correctly use fallback namespace after fix (VERIFICATION)", func() {
947+
// Use remote server reference to test NewClientFromK8s without port-forwarding
948+
serverRef := &prefectiov1.PrefectServerReference{
949+
RemoteAPIURL: ptr.To("https://api.prefect.cloud"),
950+
}
951+
952+
client, err := NewClientFromK8s(ctx, serverRef, nil, "deployment-namespace", logger)
953+
954+
Expect(err).NotTo(HaveOccurred())
955+
Expect(client).NotTo(BeNil())
956+
Expect(client.BaseURL).To(Equal("https://api.prefect.cloud/api"))
957+
958+
// For in-cluster servers, verify the namespace fallback logic would work
959+
inClusterRef := &prefectiov1.PrefectServerReference{
960+
Name: "prefect-server",
961+
// Namespace is empty - should use deployment-namespace as fallback
962+
}
963+
964+
expectedURL := inClusterRef.GetAPIURL("deployment-namespace")
965+
Expect(expectedURL).To(Equal("http://prefect-server.deployment-namespace.svc:4200/api"))
966+
})
967+
})
968+
969+
Context("with remote server reference", func() {
970+
It("should use remote API URL regardless of namespace", func() {
971+
serverRef = &prefectiov1.PrefectServerReference{
972+
RemoteAPIURL: ptr.To("https://api.prefect.cloud"),
973+
}
974+
975+
client, err := NewClientFromK8s(ctx, serverRef, nil, "deployment-namespace", logger)
976+
977+
Expect(err).NotTo(HaveOccurred())
978+
Expect(client).NotTo(BeNil())
979+
Expect(client.BaseURL).To(Equal("https://api.prefect.cloud/api"))
980+
})
981+
})
982+
})
983+
})
809984
})

0 commit comments

Comments
 (0)