Skip to content

Commit cefcf04

Browse files
committed
add tls
Signed-off-by: husharp <jinhao.hu@pingcap.com>
1 parent dbe75c0 commit cefcf04

File tree

9 files changed

+466
-20
lines changed

9 files changed

+466
-20
lines changed

pkg/controller/pd_control.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,11 @@ func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string)
133133
pdControl.SetPDClientWithAddress(peerURL, pdClient)
134134
return pdClient
135135
}
136+
137+
// NewFakeEtcdClient creates a fake etcdClient that is set as the etcd client
138+
func NewFakeEtcdClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *pdapi.FakeEtcdClient {
139+
etcdClient := pdapi.NewFakeEtcdClient()
140+
pdControl.SetEtcdClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), etcdClient)
141+
142+
return etcdClient
143+
}

pkg/manager/member/pd_member_manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,12 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a
357357
return err
358358
}
359359

360+
// check pd member's peer urls
361+
upgradePdName := PdName(tcName, 0, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s)
362+
if err := m.upgrader.(*pdUpgrader).SyncPDPeerUrls(tc, upgradePdName); err != nil {
363+
return fmt.Errorf("syncTidbClusterStatus: failed to sync pd peer urls for cluster %s/%s, err: %s", ns, tcName, err)
364+
}
365+
360366
cluster, err := pdClient.GetCluster()
361367
if err != nil {
362368
tc.Status.PD.Synced = false

pkg/manager/member/pd_ms_upgrader.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
5050
}
5151

5252
curService := controller.PDMSTrimName(newSet.Name)
53-
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
53+
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
5454
if tc.Status.PDMS[curService] == nil {
5555
tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService}
5656
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService)
@@ -62,9 +62,9 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
6262
if oldTrimName != curService {
6363
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService)
6464
}
65-
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
65+
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
6666
if tc.PDMSScaling(oldTrimName) {
67-
klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
67+
klog.Infof("tidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
6868
ns, tcName, tc.Status.PDMS[curService].Phase)
6969
_, podSpec, err := GetLastAppliedConfig(oldSet)
7070
if err != nil {
@@ -141,7 +141,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
141141
return err
142142
}
143143

144-
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
144+
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
145145
primary, upgradePDMSName, upgradePodName)
146146
// If current pdms is primary, transfer primary to other pdms pod
147147
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
@@ -152,15 +152,15 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
152152
}
153153

154154
if targetName != "" {
155-
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
155+
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
156156
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
157157
if err != nil {
158-
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
158+
klog.Errorf("tidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
159159
return err
160160
}
161-
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
161+
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
162162
} else {
163-
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
163+
klog.Warningf("tidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
164164
}
165165
}
166166
}
@@ -177,7 +177,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
177177
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
178178
ns := tc.GetNamespace()
179179
tcName := tc.GetName()
180-
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
180+
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
181181
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)
182182

183183
// set ordinal to max ordinal if ordinal isn't exist
@@ -202,7 +202,7 @@ func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.Stat
202202
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
203203
}
204204

205-
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
205+
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
206206
return targetName
207207
}
208208

pkg/manager/member/pd_upgrader.go

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ package member
1515

1616
import (
1717
"fmt"
18+
"net/url"
1819

1920
"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
21+
"github.com/pingcap/kvproto/pkg/pdpb"
2022
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
2123
"github.com/pingcap/tidb-operator/pkg/controller"
2224
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
2325
"github.com/pingcap/tidb-operator/pkg/pdapi"
2426
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
25-
2627
apps "k8s.io/api/apps/v1"
2728
"k8s.io/klog/v2"
2829
)
@@ -54,7 +55,7 @@ func (u *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Stat
5455
return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed, can not to be upgraded", ns, tcName)
5556
}
5657
if tc.PDScaling() {
57-
klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
58+
klog.Infof("tidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
5859
ns, tcName, tc.Status.PD.Phase)
5960
_, podSpec, err := GetLastAppliedConfig(oldSet)
6061
if err != nil {
@@ -143,17 +144,82 @@ func (u *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSe
143144
if targetName != "" {
144145
err := u.transferPDLeaderTo(tc, targetName)
145146
if err != nil {
146-
klog.Errorf("pd upgrader: failed to transfer pd leader to: %s, %v", targetName, err)
147+
klog.Errorf("tidbcluster: [%s/%s] pd upgrader: failed to transfer pd leader to: %s, %v", ns, tcName, targetName, err)
147148
return err
148149
}
149-
klog.Infof("pd upgrader: transfer pd leader to: %s successfully", targetName)
150+
klog.Infof("tidbcluster: [%s/%s] pd upgrader: transfer pd leader to: %s successfully", ns, tcName, targetName)
150151
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName)
151152
} else {
152-
klog.Warningf("pd upgrader: skip to transfer pd leader, because can not find a suitable pd")
153+
klog.Warningf("tidbcluster: [%s/%s] pd upgrader: skip to transfer pd leader, because can not find a suitable pd", ns, tcName)
153154
}
154155
}
155156

156157
mngerutils.SetUpgradePartition(newSet, ordinal)
158+
return u.SyncPDPeerUrls(tc, upgradePdName)
159+
}
160+
161+
func (u *pdUpgrader) SyncPDPeerUrls(tc *v1alpha1.TidbCluster, pdName string) error {
162+
ns := tc.GetNamespace()
163+
tcName := tc.GetName()
164+
165+
var (
166+
tlsEnabled = tc.IsTLSClusterEnabled()
167+
pdCli = controller.GetPDClient(u.deps.PDControl, tc)
168+
member *pdpb.Member
169+
)
170+
members, err := pdCli.GetMembers()
171+
if err != nil {
172+
return err
173+
}
174+
for _, m := range members.Members {
175+
if m.Name == pdName {
176+
member = m
177+
break
178+
}
179+
}
180+
if member == nil {
181+
return fmt.Errorf("tidbcluster: [%s/%s] failed to find member %s in pd cluster", ns, tcName, pdName)
182+
}
183+
184+
var (
185+
newPeers []string
186+
needSync bool
187+
)
188+
for _, peerUrl := range member.PeerUrls {
189+
u, err := url.Parse(peerUrl)
190+
if err != nil {
191+
return fmt.Errorf("tidbcluster: [%s/%s] failed to parse peer url %s, %v", ns, tcName, peerUrl, err)
192+
}
193+
// check if peer url need to be updated
194+
if tlsEnabled != (u.Scheme == "https") {
195+
needSync = true
196+
if !tlsEnabled {
197+
u.Scheme = "http"
198+
} else {
199+
u.Scheme = "https"
200+
}
201+
newPeers = append(newPeers, u.String())
202+
} else {
203+
newPeers = append(newPeers, peerUrl)
204+
}
205+
}
206+
207+
if needSync {
208+
pdEtcdClient, err := u.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
209+
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))
210+
211+
if err != nil {
212+
return fmt.Errorf("tidbcluster: [%s/%s] failed to create pd etcd client, %v", ns, tcName, err)
213+
}
214+
defer pdEtcdClient.Close()
215+
err = pdEtcdClient.UpdateMember(member.GetMemberId(), newPeers)
216+
if err != nil {
217+
return fmt.Errorf("tidbcluster: [%s/%s] failed to update pd member: %s peer urls, %v", ns, tcName, pdName, err)
218+
}
219+
klog.Infof("tidbcluster: [%s/%s] pd upgrader: sync pd member: %s peer urls successfully, from %v to %v",
220+
ns, tcName, pdName, member.PeerUrls, newPeers)
221+
}
222+
157223
return nil
158224
}
159225

pkg/manager/member/pd_upgrader_test.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ import (
1717
"fmt"
1818
"testing"
1919

20+
. "github.com/onsi/gomega"
21+
"github.com/pingcap/kvproto/pkg/pdpb"
2022
"github.com/pingcap/tidb-operator/pkg/apis/label"
2123
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
2224
"github.com/pingcap/tidb-operator/pkg/controller"
2325
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
2426
"github.com/pingcap/tidb-operator/pkg/pdapi"
25-
26-
. "github.com/onsi/gomega"
2727
apps "k8s.io/api/apps/v1"
2828
corev1 "k8s.io/api/core/v1"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -51,6 +51,7 @@ func TestPDUpgraderUpgrade(t *testing.T) {
5151
upgrader, pdControl, _, podInformer := newPDUpgrader()
5252
tc := newTidbClusterForPDUpgrader()
5353
pdClient := controller.NewFakePDClient(pdControl, tc)
54+
etcdClient := controller.NewFakeEtcdClient(pdControl, tc)
5455

5556
if test.changeFn != nil {
5657
test.changeFn(tc)
@@ -95,6 +96,27 @@ func TestPDUpgraderUpgrade(t *testing.T) {
9596
return healthInfo, nil
9697
})
9798

99+
pdClient.AddReaction(pdapi.GetMembersActionType, func(action *pdapi.Action) (interface{}, error) {
100+
membersInfo := &pdapi.MembersInfo{
101+
Members: []*pdpb.Member{
102+
{
103+
Name: PdPodName(upgradeTcName, 0),
104+
MemberId: 111,
105+
PeerUrls: []string{"http://upgrader-pd-0:2380"},
106+
},
107+
{
108+
Name: PdPodName(upgradeTcName, 1),
109+
MemberId: 222,
110+
PeerUrls: []string{"http://upgrader-pd-1:2380"},
111+
},
112+
},
113+
}
114+
return membersInfo, nil
115+
})
116+
etcdClient.AddReaction(pdapi.EtcdUpdateMemberActionType, func(action *pdapi.Action) (interface{}, error) {
117+
return nil, nil
118+
})
119+
98120
err := upgrader.Upgrade(tc, oldSet, newSet)
99121
test.errExpectFn(g, err)
100122
test.expectFn(g, tc, newSet)
@@ -314,12 +336,41 @@ func TestPDUpgraderUpgrade(t *testing.T) {
314336
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
315337
},
316338
},
339+
{
340+
name: "upgrade from non-tls to tls",
341+
changeFn: func(tc *v1alpha1.TidbCluster) {
342+
tc.Status.PD.Synced = true
343+
tc.Spec = v1alpha1.TidbClusterSpec{
344+
PD: &v1alpha1.PDSpec{
345+
ComponentSpec: v1alpha1.ComponentSpec{
346+
Image: "pingcap/pd:v3.1.0",
347+
},
348+
},
349+
TiDB: &v1alpha1.TiDBSpec{
350+
TLSClient: &v1alpha1.TiDBTLSClient{
351+
Enabled: true,
352+
},
353+
},
354+
TiKV: &v1alpha1.TiKVSpec{},
355+
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
356+
}
357+
},
358+
changePods: nil,
359+
changeOldSet: nil,
360+
transferLeaderErr: false,
361+
errExpectFn: func(g *GomegaWithT, err error) {
362+
g.Expect(err).NotTo(HaveOccurred())
363+
},
364+
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
365+
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase))
366+
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
367+
},
368+
},
317369
}
318370

319371
for i := range tests {
320372
testFn(&tests[i])
321373
}
322-
323374
}
324375

325376
func TestChoosePDToTransferFromMembers(t *testing.T) {
@@ -447,6 +498,16 @@ func TestChoosePDToTransferFromMembers(t *testing.T) {
447498

448499
func newPDUpgrader() (Upgrader, *pdapi.FakePDControl, *controller.FakePodControl, podinformers.PodInformer) {
449500
fakeDeps := controller.NewFakeDependencies()
501+
502+
informer := fakeDeps.KubeInformerFactory
503+
informer.Core().V1().Secrets().Informer().GetIndexer().Add(&corev1.Secret{
504+
ObjectMeta: metav1.ObjectMeta{
505+
Name: "upgrader-cluster-client-secret",
506+
Namespace: corev1.NamespaceDefault,
507+
},
508+
})
509+
fakeDeps.PDControl = pdapi.NewFakePDControl(informer.Core().V1().Secrets().Lister())
510+
450511
pdUpgrader := &pdUpgrader{deps: fakeDeps}
451512
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)
452513
podControl := fakeDeps.PodControl.(*controller.FakePodControl)

pkg/pdapi/fake_pdapi.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,3 +339,75 @@ func (c *FakePDMSClient) TransferPrimary(newPrimary string) error {
339339
_, err := c.fakeAPI(PDMSTransferPrimaryActionType, action)
340340
return err
341341
}
342+
343+
const (
344+
EtcdGetActionType ActionType = "Get"
345+
EtcdPutActionType ActionType = "Put"
346+
EtcdDeleteActionType ActionType = "Delete"
347+
EtcdUpdateMemberActionType ActionType = "UpdateMember"
348+
EtcdCloseActionType ActionType = "Close"
349+
)
350+
351+
// FakeEtcdClient implements a fake version of EtcdClient.
352+
type FakeEtcdClient struct {
353+
reactions map[ActionType]Reaction
354+
}
355+
356+
func NewFakeEtcdClient() *FakeEtcdClient {
357+
return &FakeEtcdClient{reactions: map[ActionType]Reaction{}}
358+
}
359+
360+
func (c *FakeEtcdClient) AddReaction(actionType ActionType, reaction Reaction) {
361+
c.reactions[actionType] = reaction
362+
}
363+
364+
// fakeAPI is a small helper for fake API calls
365+
func (c *FakeEtcdClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
366+
if reaction, ok := c.reactions[actionType]; ok {
367+
result, err := reaction(action)
368+
if err != nil {
369+
return nil, err
370+
}
371+
return result, nil
372+
}
373+
return nil, &NotFoundReaction{actionType}
374+
}
375+
376+
func (c *FakeEtcdClient) Get(_ string, _ bool) (kvs []*KeyValue, err error) {
377+
action := &Action{}
378+
nil, err := c.fakeAPI(EtcdGetActionType, action)
379+
if err != nil {
380+
return
381+
}
382+
return
383+
}
384+
385+
func (c *FakeEtcdClient) PutKey(_, _ string) error {
386+
action := &Action{}
387+
_, err := c.fakeAPI(EtcdPutActionType, action)
388+
return err
389+
}
390+
391+
func (c *FakeEtcdClient) PutTTLKey(_, _ string, _ int64) error {
392+
action := &Action{}
393+
_, err := c.fakeAPI(EtcdPutActionType, action)
394+
return err
395+
}
396+
397+
func (c *FakeEtcdClient) DeleteKey(_ string) error {
398+
action := &Action{}
399+
_, err := c.fakeAPI(EtcdDeleteActionType, action)
400+
return err
401+
}
402+
403+
func (c *FakeEtcdClient) UpdateMember(_ uint64, _ []string) error {
404+
action := &Action{}
405+
_, err := c.fakeAPI(EtcdUpdateMemberActionType, action)
406+
return err
407+
}
408+
409+
func (c *FakeEtcdClient) Close() error {
410+
action := &Action{}
411+
_, err := c.fakeAPI(EtcdCloseActionType, action)
412+
return err
413+
}

0 commit comments

Comments
 (0)