Skip to content

Commit

Permalink
support tso service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 8, 2024
1 parent 38b6c96 commit 67a77d7
Show file tree
Hide file tree
Showing 21 changed files with 615 additions and 11 deletions.
6 changes: 4 additions & 2 deletions components/cluster/command/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
func newUpgradeCmd() *cobra.Command {
offlineMode := false
ignoreVersionCheck := false
var tidbVer, tikvVer, pdVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string
var tidbVer, tikvVer, pdVer, tsoVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string

cmd := &cobra.Command{
Use: "upgrade <cluster-name> <version>",
Expand All @@ -47,6 +47,7 @@ func newUpgradeCmd() *cobra.Command {
spec.ComponentTiDB: tidbVer,
spec.ComponentTiKV: tikvVer,
spec.ComponentPD: pdVer,
spec.ComponentTSO: tsoVer,
spec.ComponentTiFlash: tiflashVer,
spec.ComponentTiKVCDC: kvcdcVer,
spec.ComponentCDC: cdcVer,
Expand Down Expand Up @@ -76,7 +77,8 @@ func newUpgradeCmd() *cobra.Command {

// cmd.Flags().StringVar(&tidbVer, "tidb-version", "", "Fix the version of tidb and no longer follows the cluster version.")
cmd.Flags().StringVar(&tikvVer, "tikv-version", "", "Fix the version of tikv and no longer follows the cluster version.")
cmd.Flags().StringVar(&pdVer, "pd-version", "", "Fix the version of pv and no longer follows the cluster version.")
cmd.Flags().StringVar(&pdVer, "pd-version", "", "Fix the version of pd and no longer follows the cluster version.")
cmd.Flags().StringVar(&tsoVer, "tso-version", "", "Fix the version of tso and no longer follows the cluster version.")
cmd.Flags().StringVar(&tiflashVer, "tiflash-version", "", "Fix the version of tiflash and no longer follows the cluster version.")
cmd.Flags().StringVar(&dashboardVer, "tidb-dashboard-version", "", "Fix the version of tidb-dashboard and no longer follows the cluster version.")
cmd.Flags().StringVar(&cdcVer, "cdc-version", "", "Fix the version of cdc and no longer follows the cluster version.")
Expand Down
1 change: 1 addition & 0 deletions doc/rfcs/0001-separate-component-version-in-cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ComponentVersions struct {
TiKV string `yaml:"tikv,omitempty"`
TiFlash string `yaml:"tiflash,omitempty"`
PD string `yaml:"pd,omitempty"`
TSO string `yaml:"tso,omitempty"`
Dashboard string `yaml:"tidb_dashboard,omitempty"`
Pump string `yaml:"pump,omitempty"`
Drainer string `yaml:"drainer,omitempty"`
Expand Down
15 changes: 15 additions & 0 deletions embed/templates/config/prometheus.yml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ scrape_configs:
- targets:
{{- range .PDAddrs}}
- '{{.}}'
{{- end}}
- job_name: "tso"
honor_labels: true # don't overwrite job & instance labels
{{- if .TLSEnabled}}
scheme: https
tls_config:
insecure_skip_verify: false
ca_file: ../tls/ca.crt
cert_file: ../tls/prometheus.crt
key_file: ../tls/prometheus.pem
{{- end}}
static_configs:
- targets:
{{- range .TSOAddrs}}
- '{{.}}'
{{- end}}
{{- if .TiFlashStatusAddrs}}
- job_name: "tiflash"
Expand Down
10 changes: 7 additions & 3 deletions embed/templates/scripts/run_pd.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ DEPLOY_DIR={{.DeployDir}}

cd "${DEPLOY_DIR}" || exit 1

exec \
{{- if .NumaNode}}
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server \
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server \
numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} \
{{- end}}
env GODEBUG=madvdontneed=1 \
{{- if .MSMode}}
PD_SERVICE_MODE=api \
{{- end}}
bin/pd-server \
--name="{{.Name}}" \
--client-urls="{{.ClientURL}}" \
--advertise-client-urls="{{.AdvertiseClientURL}}" \
Expand Down
10 changes: 7 additions & 3 deletions embed/templates/scripts/run_pd_scale.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ DEPLOY_DIR={{.DeployDir}}

cd "${DEPLOY_DIR}" || exit 1

exec \
{{- if .NumaNode}}
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server \
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server \
numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} \
{{- end}}
env GODEBUG=madvdontneed=1 \
{{- if .MSMode}}
PD_SERVICE_MODE=api \
{{- end}}
bin/pd-server \
--name="{{.Name}}" \
--client-urls="{{.ClientURL}}" \
--advertise-client-urls="{{.AdvertiseClientURL}}" \
Expand Down
19 changes: 19 additions & 0 deletions embed/templates/scripts/run_tso.sh.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
set -e

# WARNING: This file was auto-generated. Do not edit!
# All your edit might be overwritten!
DEPLOY_DIR={{.DeployDir}}

cd "${DEPLOY_DIR}" || exit 1

{{- if .NumaNode}}
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services tso\
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server services tso \
{{- end}}
--backend-endpoints="{{.BackendEndpoints}}" \
--listen-addr="{{.ListenURL}}" \
--advertise-listen-addr="{{.AdvertiseListenURL}}" \
--config=conf/tso.toml \
--log-file="{{.LogDir}}/tso.log" 2>> "{{.LogDir}}/tso_stderr.log"
4 changes: 3 additions & 1 deletion pkg/cluster/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ server_configs:
tidb:
binlog.enable: true
tikv: {}
pd: {}
pd: {}
tso: {}
tidb_dashboard: {}
tiflash: {}
tiproxy: {}
Expand All @@ -138,6 +139,7 @@ tikv_servers: []
tiflash_servers: []
tiproxy_servers: []
pd_servers: []
tso_servers: []
monitoring_servers: []
`)

Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/ansible/test-data/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ topology:
binlog.enable: true
tikv: {}
pd: {}
tso: {}
tidb_dashboard: {}
tiflash: {}
tiproxy: {}
Expand Down Expand Up @@ -195,3 +196,4 @@ topology:
arch: amd64
os: linux
tiproxy_servers: []
tso_servers: []
116 changes: 116 additions & 0 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var (
pdStoresURI = "pd/api/v1/stores"
pdStoresLimitURI = "pd/api/v1/stores/limit"
pdRegionsCheckURI = "pd/api/v1/regions/check"
pdServicePrimaryURI = "pd/api/v2/ms/primary"
)

func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byte, error) {
Expand Down Expand Up @@ -978,3 +979,118 @@ func (pc *PDClient) SetAllStoreLimits(value int) error {
pc.l().Debugf("setting store limit: %d", value)
return pc.updateConfig(pdStoresLimitURI, bytes.NewBuffer(body))
}

// GetServicePrimary queries for the primary of a service
func (pc *PDClient) GetServicePrimary(service string) (string, error) {
endpoints := pc.getEndpoints(fmt.Sprintf("%s/%s", pdServicePrimaryURI, service))

var primary string
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(pc.ctx, endpoint)
if err != nil {
return body, err
}

return body, json.Unmarshal(body, &primary)
})
return primary, err
}

const (
tsoStatusURI = "status"
)

// TSOClient is an HTTP client of the TSO server
type TSOClient struct {
version string
addrs []string
tlsEnabled bool
httpClient *utils.HTTPClient
ctx context.Context
}

// NewTSOClient returns a new TSOClient, the context must have
// a *logprinter.Logger as value of "logger"
func NewTSOClient(
ctx context.Context,
addrs []string,
timeout time.Duration,
tlsConfig *tls.Config,
) *TSOClient {
enableTLS := false
if tlsConfig != nil {
enableTLS = true
}

if _, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); !ok {
panic("the context must have logger inside")
}

cli := &TSOClient{
addrs: addrs,
tlsEnabled: enableTLS,
httpClient: utils.NewHTTPClient(timeout, tlsConfig),
ctx: ctx,
}

cli.tryIdentifyVersion()
return cli
}

// func (tc *TSOClient) l() *logprinter.Logger {
// return tc.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger)
// }

func (tc *TSOClient) tryIdentifyVersion() {
endpoints := tc.getEndpoints(tsoStatusURI)
response := map[string]string{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := tc.httpClient.Get(tc.ctx, endpoint)
if err != nil {
return body, err
}

return body, json.Unmarshal(body, &response)
})
if err == nil {
tc.version = response["version"]
}
}

// GetURL builds the client URL of PDClient
func (tc *TSOClient) GetURL(addr string) string {
httpPrefix := "http"
if tc.tlsEnabled {
httpPrefix = "https"
}
return fmt.Sprintf("%s://%s", httpPrefix, addr)
}

func (tc *TSOClient) getEndpoints(uri string) (endpoints []string) {
for _, addr := range tc.addrs {
endpoint := fmt.Sprintf("%s/%s", tc.GetURL(addr), uri)
endpoints = append(endpoints, endpoint)
}

return
}

// CheckHealth checks the health of TSO node.
func (tc *TSOClient) CheckHealth() error {
endpoints := tc.getEndpoints(tsoStatusURI)

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := tc.httpClient.Get(tc.ctx, endpoint)
if err != nil {
return body, err
}

return body, nil
})

if err != nil {
return err
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/cluster/clusterutil/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (r *repositoryT) VerifyComponent(comp, version, target string) error {
}

func (r *repositoryT) ComponentBinEntry(comp, version string) (string, error) {
if comp == "tso" {
comp = "pd"
}
versionItem, err := r.repo.ComponentVersion(comp, version, true)
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/executor/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (e *EasySSHExecutor) Execute(ctx context.Context, cmd string, sudo bool, ti

// set a basic PATH in case it's empty on login
cmd = fmt.Sprintf("PATH=$PATH:/bin:/sbin:/usr/bin:/usr/sbin; %s", cmd)

if e.Locale != "" {
cmd = fmt.Sprintf("export LANG=%s; %s", e.Locale, cmd)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
ComponentTiDB = "tidb"
ComponentTiKV = "tikv"
ComponentPD = "pd"
ComponentTSO = "tso"
ComponentTiFlash = "tiflash"
ComponentTiProxy = "tiproxy"
ComponentGrafana = "grafana"
Expand Down
7 changes: 7 additions & 0 deletions pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ func (i *MonitorInstance) InitConfig(
cfig.AddPD(pd.Host, uint64(pd.ClientPort))
}
}
if servers, found := topoHasField("TSOServers"); found {
for i := 0; i < servers.Len(); i++ {
tso := servers.Index(i).Interface().(*TSOSpec)
uniqueHosts.Insert(tso.Host)
cfig.AddTSO(tso.Host, uint64(tso.Port))
}
}
if servers, found := topoHasField("TiKVServers"); found {
for i := 0; i < servers.Len(); i++ {
kv := servers.Index(i).Interface().(*TiKVSpec)
Expand Down
3 changes: 3 additions & 0 deletions pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type PDSpec struct {
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
MSMode bool `yaml:"ms_mode,omitempty"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Arch string `yaml:"arch,omitempty"`
Expand Down Expand Up @@ -254,6 +255,7 @@ func (i *PDInstance) InitConfig(
LogDir: paths.Log,
InitialCluster: strings.Join(initialCluster, ","),
NumaNode: spec.NumaNode,
MSMode: spec.MSMode,
}

fp := filepath.Join(paths.Cache, fmt.Sprintf("run_pd_%s_%d.sh", i.GetHost(), i.GetPort()))
Expand Down Expand Up @@ -378,6 +380,7 @@ func (i *PDInstance) ScaleConfig(
LogDir: paths.Log,
InitialCluster: strings.Join(initialCluster, ","),
NumaNode: spec.NumaNode,
MSMode: spec.MSMode,
}

join := []string{}
Expand Down
Loading

0 comments on commit 67a77d7

Please sign in to comment.