Skip to content

Commit 43dba1c

Browse files
committed
support schema change
Signed-off-by: husharp <ihusharp@gmail.com>
1 parent 545685a commit 43dba1c

File tree

7 files changed

+447
-6
lines changed

7 files changed

+447
-6
lines changed

pkg/controller/pd_control.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,11 @@ func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string)
119119
pdControl.SetPDClientWithAddress(peerURL, pdClient)
120120
return pdClient
121121
}
122+
123+
// NewFakeEtcdClient creates a fake etcdClient that is set as the etcd client
124+
func NewFakeEtcdClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *pdapi.FakeEtcdClient {
125+
etcdClient := pdapi.NewFakeEtcdClient()
126+
pdControl.SetEtcdClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), etcdClient)
127+
128+
return etcdClient
129+
}

pkg/manager/member/pd_upgrader.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ package member
1515

1616
import (
1717
"fmt"
18+
"net/url"
1819

1920
"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
21+
"github.com/pingcap/kvproto/pkg/pdpb"
2022
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
2123
"github.com/pingcap/tidb-operator/pkg/controller"
2224
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
2325
"github.com/pingcap/tidb-operator/pkg/pdapi"
2426
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
25-
2627
apps "k8s.io/api/apps/v1"
2728
"k8s.io/klog/v2"
2829
)
@@ -154,6 +155,69 @@ func (u *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSe
154155
}
155156

156157
mngerutils.SetUpgradePartition(newSet, ordinal)
158+
return u.syncPDPeerUrls(tc, upgradePdName)
159+
}
160+
161+
func (u *pdUpgrader) syncPDPeerUrls(tc *v1alpha1.TidbCluster, pdName string) error {
162+
klog.Infof("pd upgrader: sync pd member: %s peer urls", pdName)
163+
var (
164+
tlsEnabled = tc.IsTLSClusterEnabled()
165+
pdCli = controller.GetPDClient(u.deps.PDControl, tc)
166+
member *pdpb.Member
167+
)
168+
members, err := pdCli.GetMembers()
169+
if err != nil {
170+
return err
171+
}
172+
for _, m := range members.Members {
173+
if m.Name == pdName {
174+
member = m
175+
break
176+
}
177+
}
178+
if member == nil {
179+
return fmt.Errorf("failed to find member %s in pd cluster", pdName)
180+
}
181+
182+
var (
183+
newPeers []string
184+
needSync bool
185+
)
186+
for _, peerUrl := range member.PeerUrls {
187+
u, err := url.Parse(peerUrl)
188+
if err != nil {
189+
return fmt.Errorf("failed to parse peer url %s, %v", peerUrl, err)
190+
}
191+
// check if peer url need to be updated
192+
if tlsEnabled != (u.Scheme == "https") {
193+
needSync = true
194+
if !tlsEnabled {
195+
u.Scheme = "http"
196+
} else {
197+
u.Scheme = "https"
198+
}
199+
newPeers = append(newPeers, u.String())
200+
} else {
201+
newPeers = append(newPeers, peerUrl)
202+
}
203+
}
204+
205+
if needSync {
206+
pdEtcdClient, err := u.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
207+
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))
208+
209+
if err != nil {
210+
return err
211+
}
212+
defer pdEtcdClient.Close()
213+
err = pdEtcdClient.UpdateMember(member.GetMemberId(), newPeers)
214+
if err != nil {
215+
return err
216+
}
217+
klog.Infof("pd upgrader: sync pd member: %s peer urls successfully, from %v to %v",
218+
pdName, member.PeerUrls, newPeers)
219+
}
220+
157221
return nil
158222
}
159223

pkg/manager/member/pd_upgrader_test.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ import (
1717
"fmt"
1818
"testing"
1919

20+
. "github.com/onsi/gomega"
21+
"github.com/pingcap/kvproto/pkg/pdpb"
2022
"github.com/pingcap/tidb-operator/pkg/apis/label"
2123
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
2224
"github.com/pingcap/tidb-operator/pkg/controller"
2325
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
2426
"github.com/pingcap/tidb-operator/pkg/pdapi"
25-
26-
. "github.com/onsi/gomega"
2727
apps "k8s.io/api/apps/v1"
2828
corev1 "k8s.io/api/core/v1"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -51,6 +51,7 @@ func TestPDUpgraderUpgrade(t *testing.T) {
5151
upgrader, pdControl, _, podInformer := newPDUpgrader()
5252
tc := newTidbClusterForPDUpgrader()
5353
pdClient := controller.NewFakePDClient(pdControl, tc)
54+
etcdClient := controller.NewFakeEtcdClient(pdControl, tc)
5455

5556
if test.changeFn != nil {
5657
test.changeFn(tc)
@@ -95,6 +96,30 @@ func TestPDUpgraderUpgrade(t *testing.T) {
9596
return healthInfo, nil
9697
})
9798

99+
pdClient.AddReaction(pdapi.GetMembersActionType, func(action *pdapi.Action) (interface{}, error) {
100+
membersInfo := &pdapi.MembersInfo{
101+
Members: []*pdpb.Member{
102+
{
103+
Name: PdPodName(upgradeTcName, 0),
104+
MemberId: 111,
105+
PeerUrls: []string{"http://upgrader-pd-0:2380"},
106+
},
107+
{
108+
Name: PdPodName(upgradeTcName, 1),
109+
MemberId: 222,
110+
PeerUrls: []string{"http://upgrader-pd-1:2380"},
111+
},
112+
},
113+
}
114+
return membersInfo, nil
115+
})
116+
117+
pdClient.GetMembers()
118+
119+
etcdClient.AddReaction(pdapi.EtcdUpdateMemberActionType, func(action *pdapi.Action) (interface{}, error) {
120+
return nil, nil
121+
})
122+
98123
err := upgrader.Upgrade(tc, oldSet, newSet)
99124
test.errExpectFn(g, err)
100125
test.expectFn(g, tc, newSet)
@@ -314,12 +339,41 @@ func TestPDUpgraderUpgrade(t *testing.T) {
314339
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
315340
},
316341
},
342+
{
343+
name: "upgrade from non-tls to tls",
344+
changeFn: func(tc *v1alpha1.TidbCluster) {
345+
tc.Status.PD.Synced = true
346+
tc.Spec = v1alpha1.TidbClusterSpec{
347+
PD: &v1alpha1.PDSpec{
348+
ComponentSpec: v1alpha1.ComponentSpec{
349+
Image: "pingcap/pd:v3.1.0",
350+
},
351+
},
352+
TiDB: &v1alpha1.TiDBSpec{
353+
TLSClient: &v1alpha1.TiDBTLSClient{
354+
Enabled: true,
355+
},
356+
},
357+
TiKV: &v1alpha1.TiKVSpec{},
358+
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
359+
}
360+
},
361+
changePods: nil,
362+
changeOldSet: nil,
363+
transferLeaderErr: false,
364+
errExpectFn: func(g *GomegaWithT, err error) {
365+
g.Expect(err).NotTo(HaveOccurred())
366+
},
367+
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
368+
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase))
369+
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
370+
},
371+
},
317372
}
318373

319374
for i := range tests {
320375
testFn(&tests[i])
321376
}
322-
323377
}
324378

325379
func TestChoosePDToTransferFromMembers(t *testing.T) {
@@ -447,6 +501,16 @@ func TestChoosePDToTransferFromMembers(t *testing.T) {
447501

448502
func newPDUpgrader() (Upgrader, *pdapi.FakePDControl, *controller.FakePodControl, podinformers.PodInformer) {
449503
fakeDeps := controller.NewFakeDependencies()
504+
505+
informer := fakeDeps.KubeInformerFactory
506+
informer.Core().V1().Secrets().Informer().GetIndexer().Add(&corev1.Secret{
507+
ObjectMeta: metav1.ObjectMeta{
508+
Name: "upgrader-cluster-client-secret",
509+
Namespace: corev1.NamespaceDefault,
510+
},
511+
})
512+
fakeDeps.PDControl = pdapi.NewFakePDControl(informer.Core().V1().Secrets().Lister())
513+
450514
pdUpgrader := &pdUpgrader{deps: fakeDeps}
451515
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)
452516
podControl := fakeDeps.PodControl.(*controller.FakePodControl)

pkg/pdapi/fake_pdapi.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,75 @@ func (c *FakePDClient) GetRecoveringMark() (bool, error) {
291291

292292
return true, nil
293293
}
294+
295+
const (
296+
EtcdGetActionType ActionType = "Get"
297+
EtcdPutActionType ActionType = "Put"
298+
EtcdDeleteActionType ActionType = "Delete"
299+
EtcdUpdateMemberActionType ActionType = "UpdateMember"
300+
EtcdCloseActionType ActionType = "Close"
301+
)
302+
303+
// FakeEtcdClient implements a fake version of EtcdClient.
304+
type FakeEtcdClient struct {
305+
reactions map[ActionType]Reaction
306+
}
307+
308+
func NewFakeEtcdClient() *FakeEtcdClient {
309+
return &FakeEtcdClient{reactions: map[ActionType]Reaction{}}
310+
}
311+
312+
func (c *FakeEtcdClient) AddReaction(actionType ActionType, reaction Reaction) {
313+
c.reactions[actionType] = reaction
314+
}
315+
316+
// fakeAPI is a small helper for fake API calls
317+
func (c *FakeEtcdClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
318+
if reaction, ok := c.reactions[actionType]; ok {
319+
result, err := reaction(action)
320+
if err != nil {
321+
return nil, err
322+
}
323+
return result, nil
324+
}
325+
return nil, &NotFoundReaction{actionType}
326+
}
327+
328+
func (c *FakeEtcdClient) Get(_ string, _ bool) (kvs []*KeyValue, err error) {
329+
action := &Action{}
330+
nil, err := c.fakeAPI(EtcdGetActionType, action)
331+
if err != nil {
332+
return
333+
}
334+
return
335+
}
336+
337+
func (c *FakeEtcdClient) PutKey(_, _ string) error {
338+
action := &Action{}
339+
_, err := c.fakeAPI(EtcdPutActionType, action)
340+
return err
341+
}
342+
343+
func (c *FakeEtcdClient) PutTTLKey(_, _ string, _ int64) error {
344+
action := &Action{}
345+
_, err := c.fakeAPI(EtcdPutActionType, action)
346+
return err
347+
}
348+
349+
func (c *FakeEtcdClient) DeleteKey(_ string) error {
350+
action := &Action{}
351+
_, err := c.fakeAPI(EtcdDeleteActionType, action)
352+
return err
353+
}
354+
355+
func (c *FakeEtcdClient) UpdateMember(_ uint64, _ []string) error {
356+
action := &Action{}
357+
_, err := c.fakeAPI(EtcdUpdateMemberActionType, action)
358+
return err
359+
}
360+
361+
func (c *FakeEtcdClient) Close() error {
362+
action := &Action{}
363+
_, err := c.fakeAPI(EtcdCloseActionType, action)
364+
return err
365+
}

pkg/pdapi/pd_control.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,24 @@ type FakePDControl struct {
337337

338338
func NewFakePDControl(secretLister corelisterv1.SecretLister) *FakePDControl {
339339
return &FakePDControl{
340-
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}},
340+
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, pdEtcdClients: map[string]PDEtcdClient{}},
341341
}
342342
}
343343

344+
func (fpc *FakePDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool, opts ...Option) PDClient {
345+
if tlsEnabled {
346+
return fpc.defaultPDControl.pdClients[genClientKey("http", namespace, tcName, "")]
347+
}
348+
return fpc.defaultPDControl.GetPDClient(namespace, tcName, tlsEnabled, opts...)
349+
}
350+
351+
func (fpc *FakePDControl) GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool, opts ...Option) (PDEtcdClient, error) {
352+
if tlsEnabled {
353+
return fpc.defaultPDControl.pdEtcdClients[genClientKey("http", namespace, tcName, "")], nil
354+
}
355+
return fpc.defaultPDControl.GetPDEtcdClient(namespace, tcName, tlsEnabled, opts...)
356+
}
357+
344358
func (fpc *FakePDControl) SetPDClient(namespace Namespace, tcName string, pdclient PDClient) {
345359
fpc.defaultPDControl.pdClients[genClientKey("http", namespace, tcName, "")] = pdclient
346360
}
@@ -352,3 +366,7 @@ func (fpc *FakePDControl) SetPDClientWithClusterDomain(namespace Namespace, tcNa
352366
func (fpc *FakePDControl) SetPDClientWithAddress(peerURL string, pdclient PDClient) {
353367
fpc.defaultPDControl.pdClients[peerURL] = pdclient
354368
}
369+
370+
func (fpc *FakePDControl) SetEtcdClient(namespace Namespace, tcName string, etcdClient PDEtcdClient) {
371+
fpc.defaultPDControl.pdEtcdClients[genClientKey("http", namespace, tcName, "")] = etcdClient
372+
}

pkg/pdapi/pdetcd.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ type PDEtcdClient interface {
3333
Get(key string, prefix bool) (kvs []*KeyValue, err error)
3434
// PutKey will put key to the target pd etcd cluster
3535
PutKey(key, value string) error
36-
// PutKey will put key with ttl to the target pd etcd cluster
36+
// PutTTLKey will put key with ttl to the target pd etcd cluster
3737
PutTTLKey(key, value string, ttl int64) error
3838
// DeleteKey will delete key from the target pd etcd cluster
3939
DeleteKey(key string) error
40+
// UpdateMember will update the member list of the target pd etcd cluster
41+
UpdateMember(id uint64, peerAddrs []string) error
4042
// Close will close the etcd connection
4143
Close() error
4244
}
@@ -121,3 +123,15 @@ func (c *pdEtcdClient) DeleteKey(key string) error {
121123
}
122124
return nil
123125
}
126+
127+
func (c *pdEtcdClient) UpdateMember(id uint64, peerAddrs []string) error {
128+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
129+
defer cancel()
130+
131+
_, err := c.etcdClient.MemberUpdate(ctx, id, peerAddrs)
132+
if err != nil {
133+
return err
134+
}
135+
136+
return nil
137+
}

0 commit comments

Comments
 (0)