From c5362bd121cec9931d5c03023c2f741f7c06f133 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 9 Nov 2020 19:31:58 +0800 Subject: [PATCH 1/4] add a case for gc in compaction filter Signed-off-by: qupeng --- Makefile | 4 + cmd/gc-in-compaction-filter/cases.go | 289 +++++++++++++++++++++++++++ cmd/gc-in-compaction-filter/main.go | 172 ++++++++++++++++ go.mod | 10 +- go.sum | 8 +- 5 files changed, 478 insertions(+), 5 deletions(-) create mode 100644 cmd/gc-in-compaction-filter/cases.go create mode 100644 cmd/gc-in-compaction-filter/main.go diff --git a/Makefile b/Makefile index 8c988da0..86f07f31 100644 --- a/Makefile +++ b/Makefile @@ -119,6 +119,10 @@ titan: manager: $(GOBUILD) $(GOMOD) -o bin/manager cmd/cluster/manager/*.go +gc-in-compaction-filter: + $(GOBUILD) $(GOMOD) -o bin/gc-in-compaction-filter cmd/gc-in-compaction-filter/*.go + + fmt: groupimports go fmt ./... diff --git a/cmd/gc-in-compaction-filter/cases.go b/cmd/gc-in-compaction-filter/cases.go new file mode 100644 index 00000000..57f662b9 --- /dev/null +++ b/cmd/gc-in-compaction-filter/cases.go @@ -0,0 +1,289 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "strconv" + "time" + + "github.com/ngaut/log" + "github.com/pingcap/kvproto/pkg/debugpb" + tidbcodec "github.com/pingcap/tidb/util/codec" + "github.com/tikv/client-go/rawkv" + "google.golang.org/grpc" +) + +const ( + DATA_FENSE = 1000 * 60 * 10 // A large gap(in ms) before safe point. +) + +func (c *client) mustGetSafePoint() uint64 { + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + resp, err := c.etcdCli.Get(ctx, "/tidb/store/gcworker/saved_safe_point") + if err != nil { + log.Fatalf("[%s] get safe point error %v", caseLabel, err) + } + if len(resp.Kvs) == 0 { + log.Infof("[%s] no safe point in pd", caseLabel) + time.Sleep(10 * time.Second) + continue + } + value := string(resp.Kvs[0].Value) + t, err := strconv.ParseUint(value, 10, 64) + if err != nil { + log.Fatalf("[%s] parse safe point error %v", caseLabel, err) + } + log.Infof("[%s] load safe point %d", caseLabel, t) + return t + } +} + +func (c *client) mustGetTiKVCtlClient(kvAddr string) debugpb.DebugClient { + u, err := url.Parse(fmt.Sprintf("http://%s", kvAddr)) + if err != nil { + log.Fatalf("[%s] parse tikv addr error %v", caseLabel, err) + } + conn, err := grpc.DialContext(context.TODO(), u.Host, grpc.WithInsecure()) + if err != nil { + log.Fatalf("[%s] connect to tikv error %v", caseLabel, err) + } + return debugpb.NewDebugClient(conn) +} + +func GcByCompact(debugCli debugpb.DebugClient) { + req := &debugpb.CompactRequest{ + Db: debugpb.DB_KV, + Cf: "write", + Threads: 1, + // It's the default value used in TiKV internal compactions. + BottommostLevelCompaction: debugpb.BottommostLevelCompaction_Force, + } + if _, err := debugCli.Compact(context.TODO(), req); err != nil { + log.Fatalf("[%s] compact tikv error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) +} + +func genMvccPut(key []byte, value []byte, startTs uint64, commitTs uint64) ([]byte, []byte) { + writeKey := tidbcodec.EncodeUintDesc(key, commitTs) + writeValue := make([]byte, 0, 256) + writeValue = append(writeValue, 'P') // type + writeValue = tidbcodec.EncodeUvarint(writeValue, startTs) + writeValue = append(writeValue, 'v') // short value prefix + writeValue = append(writeValue, byte(len(value))) + writeValue = append(writeValue, value...) + return writeKey, writeValue +} + +func genMvccDelete(key []byte, startTs uint64, commitTs uint64) ([]byte, []byte) { + writeKey := tidbcodec.EncodeUintDesc(key, commitTs) + writeValue := make([]byte, 0, 256) + writeValue = append(writeValue, 'D') // type + writeValue = tidbcodec.EncodeUvarint(writeValue, startTs) + return writeKey, writeValue +} + +// Case 1: test old versions before the safe point will be cleaned by compaction. +func (c *client) testGcStaleVersions() { + var err error + debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) + + key := []byte("test_gc_stale_versions") + value := []byte("test_gc_stale_versions") + + safepoint := c.mustGetSafePoint() + physical := safepoint >> 18 + oldStartTs := (physical - DATA_FENSE) << 18 + oldCommitTs := oldStartTs + 1000 + newStartTs := (physical - DATA_FENSE + 10) << 18 + newCommitTs := newStartTs + 1000 + + oldKey, oldValue := genMvccPut(key, value, oldStartTs, oldCommitTs) + err = c.rawKvCli.Put(context.TODO(), oldKey, oldValue, rawkv.PutOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv put error %v", caseLabel, err) + } + + newKey, newValue := genMvccPut(key, value, newStartTs, newCommitTs) + err = c.rawKvCli.Put(context.TODO(), newKey, newValue, rawkv.PutOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv put error %v", caseLabel, err) + } + + GcByCompact(debugCli) + value, err = c.rawKvCli.Get(context.TODO(), oldKey, rawkv.GetOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv get error %v", caseLabel, err) + } + if value != nil { + log.Fatalf("[%s] stale version is not cleaned", caseLabel) + } +} + +// Case 2: test the latest version before the safe point won't be cleaned incorrectly. +func (c *client) testGcLatestPutBeforeSafePoint() { + var err error + debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) + + key := []byte("test_gc_latest_put") + value := []byte("test_gc_latest_put") + + safepoint := c.mustGetSafePoint() + physical := safepoint >> 18 + oldStartTs := (physical - DATA_FENSE) << 18 + oldCommitTs := oldStartTs + 1000 + newStartTs := (physical + DATA_FENSE) << 18 + newCommitTs := newStartTs + 1000 + + oldKey, oldValue := genMvccPut(key, value, oldStartTs, oldCommitTs) + err = c.rawKvCli.Put(context.TODO(), oldKey, oldValue, rawkv.PutOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv put error %v", caseLabel, err) + } + + GcByCompact(debugCli) + value, err = c.rawKvCli.Get(context.TODO(), oldKey, rawkv.GetOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv get error %v", caseLabel, err) + } + if !sliceEqual(value, oldValue) { + log.Fatalf("[%s] the latest version before safe point is cleaned", caseLabel) + } + + newKey, newValue := genMvccPut(key, value, newStartTs, newCommitTs) + err = c.rawKvCli.Put(context.TODO(), newKey, newValue, rawkv.PutOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv put error %v", caseLabel, err) + } + + GcByCompact(debugCli) + value, err = c.rawKvCli.Get(context.TODO(), oldKey, rawkv.GetOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv get error %v", caseLabel, err) + } + if !sliceEqual(value, oldValue) { + log.Fatalf("[%s] the latest version before safe point is cleaned", caseLabel) + } +} + +// Case 3: test the latest delete mark before the safe point should be cleaned. +func (c *client) testGcLatestStaleDeleteMark(shouldGc bool) { + var err error + debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) + + key := []byte("test_gc_latest_stale_delete") + value := []byte("test_gc_latest_stale_delete") + + safepoint := c.mustGetSafePoint() + physical := safepoint >> 18 + oldStartTs := (physical - DATA_FENSE) << 18 + oldCommitTs := oldStartTs + 1000 + newStartTs := (physical - DATA_FENSE + 10) << 18 + newCommitTs := newStartTs + 1000 + + oldKey, oldValue := genMvccPut(key, value, oldStartTs, oldCommitTs) + err = c.rawKvCli.Put(context.TODO(), oldKey, oldValue, rawkv.PutOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv put error %v", caseLabel, err) + } + + newKey, newValue := genMvccDelete(key, newStartTs, newCommitTs) + err = c.rawKvCli.Put(context.TODO(), newKey, newValue, rawkv.PutOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv put error %v", caseLabel, err) + } + + GcByCompact(debugCli) + + type Pair struct { + key []byte + value []byte + } + + for _, pair := range []Pair{Pair{oldKey, oldValue}, Pair{newKey, newValue}} { + value, err = c.rawKvCli.Get(context.TODO(), pair.key, rawkv.GetOption{Cf: rawkv.CfWrite}) + if err != nil { + log.Fatalf("[%s] kv get error %v", caseLabel, err) + } + if shouldGc && value != nil { + log.Fatalf("[%s] the version should be cleaned", caseLabel) + } else if !shouldGc && !sliceEqual(pair.value, value) { + log.Fatalf("[%s] the version shouldn't be cleaned", caseLabel) + } + } +} + +func (c *client) testDynamicConfChange() { + var err error + + debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) + GcByCompact(debugCli) // Regenerate all sst files. + + // Change ratio-threshold to a large value, so that GC won't run in compaction filter. + log.Infof("[%s] Testing change gc.ratio-threshold to 10", caseLabel) + _, err = c.db.Exec("set config tikv gc.ratio-threshold = 10") + if err != nil { + log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. + c.testGcLatestStaleDeleteMark(false) + + log.Infof("[%s] Testing change gc.ratio-threshold to 0.9", caseLabel) + _, err = c.db.Exec("set config tikv gc.ratio-threshold = 0.9") + if err != nil { + log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. + c.testGcLatestStaleDeleteMark(true) + + if *clusterVersion == "4.x" { + // Change skip-version-check. + log.Infof("[%s] Testing change gc.compaction-filter-skip-version-check to false", caseLabel) + _, err = c.db.Exec("set config tikv gc.`compaction-filter-skip-version-check` = false") + if err != nil { + log.Fatalf("[%s] change gc.compaction-filter-skip-version-check fail, error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. + c.testGcLatestStaleDeleteMark(false) + + log.Infof("[%s] Testing change gc.compaction-filter-skip-version-check to true", caseLabel) + _, err = c.db.Exec("set config tikv gc.`compaction-filter-skip-version-check` = true") + if err != nil { + log.Fatalf("[%s] change gc.compaction-filter-skip-version-check fail, error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. + c.testGcLatestStaleDeleteMark(true) + } + + // Change enable-compaction-filter + log.Infof("[%s] Testing change gc.enable-compaction-filter", caseLabel) + _, err = c.db.Exec("set config tikv gc.enable-compaction-filter = false") + if err != nil { + log.Fatalf("[%s] change gc.enable-compaction-filter fail, error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. + c.testGcLatestStaleDeleteMark(false) + + log.Infof("[%s] Testing change gc.enable-compaction-filter", caseLabel) + _, err = c.db.Exec("set config tikv gc.enable-compaction-filter = true") + if err != nil { + log.Fatalf("[%s] change gc.enable-compaction-filter fail, error %v", caseLabel, err) + } + time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. + c.testGcLatestStaleDeleteMark(true) +} + +func sliceEqual(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} diff --git a/cmd/gc-in-compaction-filter/main.go b/cmd/gc-in-compaction-filter/main.go new file mode 100644 index 00000000..2d4c4b88 --- /dev/null +++ b/cmd/gc-in-compaction-filter/main.go @@ -0,0 +1,172 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "flag" + "fmt" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/ngaut/log" + + "github.com/tikv/client-go/config" + "github.com/tikv/client-go/rawkv" + etcd "go.etcd.io/etcd/clientv3" + + cmd_util "github.com/pingcap/tipocket/cmd/util" + "github.com/pingcap/tipocket/pkg/cluster" + "github.com/pingcap/tipocket/pkg/control" + "github.com/pingcap/tipocket/pkg/core" + test_infra "github.com/pingcap/tipocket/pkg/test-infra" + "github.com/pingcap/tipocket/pkg/test-infra/fixture" + "github.com/pingcap/tipocket/pkg/verify" + "github.com/pingcap/tipocket/util" +) + +const ( + caseLabel = "gc-in-compaction-filter" +) + +type clientCreator struct{} +type client struct { + pdAddrs []string + kvAddrs []string + rawKvCli *rawkv.Client + etcdCli *etcd.Client + db *sql.DB +} + +func (c clientCreator) Create(_ cluster.ClientNode) core.Client { + return &client{} +} + +func (c *client) String() string { + return caseLabel +} + +func (c *client) SetUp(ctx context.Context, nodes []cluster.Node, clientNodes []cluster.ClientNode, idx int) error { + var err error + + node := clientNodes[idx] + dsn := fmt.Sprintf("root@tcp(%s:%d)/", node.IP, node.Port) + + log.Infof("[%s] connnecting to tidb %s...", caseLabel, dsn) + c.db, err = util.OpenDB(dsn, 1) + if err != nil { + log.Fatalf("[%s] create db client error %v", caseLabel, err) + } + + _, err = c.db.Exec(`update mysql.tidb set VARIABLE_VALUE="1m" where VARIABLE_NAME="tikv_gc_run_interval";`) + if err != nil { + log.Fatalf("[%s] update gc interval error %v", caseLabel, err) + } + + for _, node := range nodes { + if node.Component == cluster.PD { + addr := fmt.Sprintf("%s:%d", node.IP, node.Port) + c.pdAddrs = append(c.pdAddrs, addr) + } else if node.Component == cluster.TiKV { + addr := fmt.Sprintf("%s:%d", node.IP, node.Port) + c.kvAddrs = append(c.kvAddrs, addr) + } + } + + c.rawKvCli, err = rawkv.NewClient(context.TODO(), c.pdAddrs, config.Default()) + if err != nil { + log.Fatalf("[%s] create rawkv client error %v", caseLabel, err) + } + + c.etcdCli, err = etcd.New(etcd.Config{Endpoints: c.pdAddrs}) + if err != nil { + log.Fatalf("[%s] create etcd client error %v", caseLabel, err) + } + + if *waitSafePoint { + // TiDB needs some time (about 3 minutes) to update a safe point to PD. + // And TiKV needs more time to load it. So, sleep a while. + log.Infof("[%s] sleep 5 minutes to wait TiDB updates safe point", caseLabel) + time.Sleep(time.Second * 300) + } + + return nil +} + +func (c *client) TearDown(_ context.Context, _ []cluster.ClientNode, _ int) error { + c.rawKvCli.Close() + c.etcdCli.Close() + return nil +} + +func (c *client) Invoke(_ context.Context, _ cluster.ClientNode, _ interface{}) core.UnknownResponse { + panic("implement me") + +} + +func (c *client) NextRequest() interface{} { + panic("implement me") +} + +func (c *client) DumpState(_ context.Context) (interface{}, error) { + panic("implement me") +} + +func (c *client) Start(ctx context.Context, cfg interface{}, clientNodes []cluster.ClientNode) error { + c.testGcStaleVersions() + c.testGcLatestPutBeforeSafePoint() + c.testGcLatestStaleDeleteMark(true) + c.testDynamicConfChange() + return nil +} + +var ( + clusterVersion = flag.String("cluster-version", "4.x", "support values: 4.x / 5.x, default value: 4.x") + localMode = flag.Bool("local-mode", false, "use local mode or not") + waitSafePoint = flag.Bool("wait-safe-point", false, "wait tidb to update safe point or not") +) + +func main() { + flag.Parse() + + cfg := control.Config{ + Mode: control.ModeSelfScheduled, + ClientCount: 1, + RunTime: fixture.Context.RunTime, + RunRound: 1, + } + + var provider cluster.Provider + if *localMode { + provider = cluster.NewLocalClusterProvisioner( + []string{"127.0.0.1:50011"}, // TiDBs + []string{"127.0.0.1:50001"}, // PDs + []string{"127.0.0.1:50003"}, // TiKVs + ) + } else { + provider = cluster.NewDefaultClusterProvider() + } + + suit := cmd_util.Suit{ + Config: &cfg, + Provider: provider, + ClientCreator: clientCreator{}, + NemesisGens: cmd_util.ParseNemesisGenerators(fixture.Context.Nemesis), + VerifySuit: verify.Suit{}, + ClusterDefs: test_infra.NewDefaultCluster(fixture.Context.Namespace, fixture.Context.Namespace, + fixture.Context.TiDBClusterConfig), + } + suit.Run(context.Background()) +} diff --git a/go.mod b/go.mod index 4441ee0b..5be15954 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce // indirect github.com/pingcap/go-tpc v0.0.0-20200229030315-98ee0f8f09d3 github.com/pingcap/kvproto v0.0.0-20200324130106-b8bc94dd8a36 - github.com/pingcap/log v0.0.0-20200511115504-543df19646ad + github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 github.com/pingcap/parser v0.0.0-20200317021010-cd90cc2a7d87 github.com/pingcap/pd v2.1.17+incompatible github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3 @@ -54,14 +54,16 @@ require ( github.com/sergi/go-diff v1.1.0 // indirect github.com/spf13/cobra v1.0.0 github.com/stretchr/testify v1.5.1 - github.com/tikv/client-go v0.0.0-20200110101306-a3ebdb020c83 + github.com/tikv/client-go v0.0.0-20201015080021-528475568618 github.com/uber-go/atomic v1.5.0 - go.uber.org/zap v1.14.0 + go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 + go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 golang.org/x/sys v0.0.0-20200918174421-af09f7315aff // indirect golang.org/x/tools v0.0.0-20200921210052-fa0125251cc4 // indirect + google.golang.org/grpc v1.27.0 google.golang.org/protobuf v1.25.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 k8s.io/api v0.19.2 @@ -128,3 +130,5 @@ replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.0.0-20 replace github.com/Azure/go-autorest => github.com/Azure/go-autorest v12.2.0+incompatible replace golang.org/x/net v0.0.0-20190813000000-74dc4d7220e7 => golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 + +replace github.com/tikv/client-go v0.0.0-20201015080021-528475568618 => github.com/cosven/client-go v0.0.0-20201029085241-63a1539d6469 diff --git a/go.sum b/go.sum index 824b73fa..1ba87a98 100644 --- a/go.sum +++ b/go.sum @@ -265,6 +265,8 @@ github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4r github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI= github.com/cortexproject/cortex v0.7.0-rc.0 h1:oa/RzR9E09/5AkmTPGk97ObbhZmB5TycFzL59inProQ= github.com/cortexproject/cortex v0.7.0-rc.0/go.mod h1:aiDfjSBZGE+q213mWACqjawNVN9CqFG4F+20TkeChA0= +github.com/cosven/client-go v0.0.0-20201029085241-63a1539d6469 h1:6rt45KPCIBSP4+9f6EV9l2bWND3VW8/IGYrvcfxVBuc= +github.com/cosven/client-go v0.0.0-20201029085241-63a1539d6469/go.mod h1:xH5dwesiR+e168duhosW9T+6ik46ll0utr5CabYPIfA= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -1276,6 +1278,8 @@ github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfE github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= +github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 h1:Jboj+s4jSCp5E1WDgmRUv5rIFKFHaaSWuSZ4wMwXIcc= +github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/parser v0.0.0-20200317021010-cd90cc2a7d87 h1:533jEUp3mtfWjk0el+awLbyGVxiHcUIGWcR1Y7gB+fg= github.com/pingcap/parser v0.0.0-20200317021010-cd90cc2a7d87/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3 h1:Yrp99FnjHAEuDrSBql2l0IqCtJX7KwJbTsD5hIArkvk= @@ -1505,8 +1509,6 @@ github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJH github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go v0.0.0-20200110101306-a3ebdb020c83 h1:IfRSAmMavarPgSj0Rr4iUVJ+w6sxF1a6/cTUXZl5yZY= -github.com/tikv/client-go v0.0.0-20200110101306-a3ebdb020c83/go.mod h1:K0NcdVNrXDq92YPLytsrAwRMyuXi7GZCO6dXNH7OzQc= github.com/timakin/bodyclose v0.0.0-20190721030226-87058b9bfcec/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -1620,6 +1622,8 @@ go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o= go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE= gocloud.dev v0.18.0/go.mod h1:lhLOb91+9tKB8RnNlsx+weJGEd0AHM94huK1bmrhPwM= golang.org/x/build v0.0.0-20190927031335-2835ba2e683f/go.mod h1:fYw7AShPAhGMdXqA9gRadk/CcMsvLlClpE5oBwnS3dM= From 8a1f3d8131b74417d3b7ec299d5942cd1739b1eb Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 12 Nov 2020 17:43:40 +0800 Subject: [PATCH 2/4] add argo files Signed-off-by: qupeng --- argo/template/gc-in-compaction-filter.yaml | 107 ++++++++++++++++++ .../gc-in-compaction-filter-basic.yaml | 40 +++++++ cmd/gc-in-compaction-filter/cases.go | 26 ++--- .../tikv/gc-in-compaction-filter-basic.toml | 6 + 4 files changed, 166 insertions(+), 13 deletions(-) create mode 100644 argo/template/gc-in-compaction-filter.yaml create mode 100644 argo/workflow/gc-in-compaction-filter-basic.yaml create mode 100644 config/tikv/gc-in-compaction-filter-basic.toml diff --git a/argo/template/gc-in-compaction-filter.yaml b/argo/template/gc-in-compaction-filter.yaml new file mode 100644 index 00000000..16481fc9 --- /dev/null +++ b/argo/template/gc-in-compaction-filter.yaml @@ -0,0 +1,107 @@ +metadata: + name: tipocket-gc-in-compaction-filter + namespace: argo +spec: + templates: + - name: tipocket-gc-in-compaction-filter + inputs: + parameters: + - name: ns + default: tipocket-gc-in-compaction-filter + - name: purge + default: "false" + - name: hub + default: "docker.io" + - name: repository + default: pingcap + - name: image-version + default: nightly + - name: tidb-image + default: "" + - name: tikv-image + default: "" + - name: pd-image + default: "" + - name: storage-class + default: local-storage + - name: nemesis + default: "" + - name: client + default: "5" + - name: request-count + default: "10000" + - name: round + default: "100" + - name: loki-addr + default: http://gateway.loki.svc + - name: loki-username + default: loki + - name: loki-password + default: admin + - name: tidb-config + default: "" + - name: tikv-config + default: "" + - name: pd-config + default: "" + - name: tikv-replicas + default: "4" + - name: matrix-config + default: "" + - name: matrix-tidb + default: "tidb.toml" + - name: matrix-tikv + default: "tikv.toml" + - name: matrix-pd + default: "pd.toml" + - name: matrix-sql + default: "mysql-system-vars.sql,tidb-system-vars.sql" + outputs: + artifacts: + - name: case-logs + archiveLogs: true + path: /logs + - name: tidb-logs + archiveLogs: true + path: /var/run/tipocket-logs + metadata: + labels: + ns: "{{inputs.parameters.ns}}" + container: + name: tipocket + image: 'pingcap/tipocket:latest' + imagePullPolicy: Always + workingDir: /logs + command: + - sh + - '-c' + - | + /bin/gc-in-compaction-filter \ + -namespace={{inputs.parameters.ns}} \ + -hub={{inputs.parameters.hub}} \ + -repository={{inputs.parameters.repository}} \ + -storage-class={{inputs.parameters.storage-class}} \ + -image-version={{inputs.parameters.image-version}} \ + -tidb-image={{inputs.parameters.tidb-image}} \ + -tikv-image={{inputs.parameters.tikv-image}} \ + -pd-image={{inputs.parameters.pd-image}} \ + -purge={{inputs.parameters.purge}} \ + -delNS=true \ + -case=bank \ + -client={{inputs.parameters.client}} \ + -tidb-config={{inputs.parameters.tidb-config}} \ + -tikv-config={{inputs.parameters.tikv-config}} \ + -pd-config={{inputs.parameters.pd-config}} \ + -tikv-replicas={{inputs.parameters.tikv-replicas}} \ + -nemesis={{inputs.parameters.nemesis}} \ + -round={{inputs.parameters.round}} \ + -request-count={{inputs.parameters.request-count}} \ + -loki-addr={{inputs.parameters.loki-addr}} \ + -loki-username={{inputs.parameters.loki-username}} \ + -loki-password={{inputs.parameters.loki-password}} \ + -matrix-config={{inputs.parameters.matrix-config}} \ + -matrix-tidb={{inputs.parameters.matrix-tidb}} \ + -matrix-tikv={{inputs.parameters.matrix-tikv}} \ + -matrix-pd={{inputs.parameters.matrix-pd}} \ + -matrix-sql={{inputs.parameters.matrix-sql}} \ + -wait-safe-point diff --git a/argo/workflow/gc-in-compaction-filter-basic.yaml b/argo/workflow/gc-in-compaction-filter-basic.yaml new file mode 100644 index 00000000..1fdfa05d --- /dev/null +++ b/argo/workflow/gc-in-compaction-filter-basic.yaml @@ -0,0 +1,40 @@ +metadata: + generateName: tipocket-gc-in-compaction-filter- + namespace: argo +spec: + entrypoint: call-tipocket-gc-in-compaction-filter + arguments: + parameters: + - name: ns + value: tipocket-gc-in-compaction-filter-basic + - name: purge + value: "true" + - name: image-version + value: nightly + - name: storage-class + value: local-storage + - name: nemesis + value: "" + - name: tikv-config + value: "/config/tikv/gc-in-compaction-filter-basic.toml" + templates: + - name: call-tipocket-gc-in-compaction-filter + steps: + - - name: call-tipocket-gc-in-compaction-filter + templateRef: + name: tipocket-gc-in-compaction-filter + template: tipocket-gc-in-compaction-filter + arguments: + parameters: + - name: ns + value: "{{workflow.parameters.ns}}" + - name: purge + value: "{{workflow.parameters.purge}}" + - name: image-version + value: "{{workflow.parameters.image-version}}" + - name: storage-class + value: "{{workflow.parameters.storage-class}}" + - name: nemesis + value: "{{workflow.parameters.nemesis}}" + - name: tikv-config + value: "{{workflow.parameters.tikv-config}}" diff --git a/cmd/gc-in-compaction-filter/cases.go b/cmd/gc-in-compaction-filter/cases.go index 57f662b9..1362bc39 100644 --- a/cmd/gc-in-compaction-filter/cases.go +++ b/cmd/gc-in-compaction-filter/cases.go @@ -15,7 +15,7 @@ import ( ) const ( - DATA_FENSE = 1000 * 60 * 10 // A large gap(in ms) before safe point. + timeFence = 1000 * 60 * 10 // A large gap(in ms) before safe point. ) func (c *client) mustGetSafePoint() uint64 { @@ -53,7 +53,7 @@ func (c *client) mustGetTiKVCtlClient(kvAddr string) debugpb.DebugClient { return debugpb.NewDebugClient(conn) } -func GcByCompact(debugCli debugpb.DebugClient) { +func gcByCompact(debugCli debugpb.DebugClient) { req := &debugpb.CompactRequest{ Db: debugpb.DB_KV, Cf: "write", @@ -96,9 +96,9 @@ func (c *client) testGcStaleVersions() { safepoint := c.mustGetSafePoint() physical := safepoint >> 18 - oldStartTs := (physical - DATA_FENSE) << 18 + oldStartTs := (physical - timeFence) << 18 oldCommitTs := oldStartTs + 1000 - newStartTs := (physical - DATA_FENSE + 10) << 18 + newStartTs := (physical - timeFence + 10) << 18 newCommitTs := newStartTs + 1000 oldKey, oldValue := genMvccPut(key, value, oldStartTs, oldCommitTs) @@ -113,7 +113,7 @@ func (c *client) testGcStaleVersions() { log.Fatalf("[%s] kv put error %v", caseLabel, err) } - GcByCompact(debugCli) + gcByCompact(debugCli) value, err = c.rawKvCli.Get(context.TODO(), oldKey, rawkv.GetOption{Cf: rawkv.CfWrite}) if err != nil { log.Fatalf("[%s] kv get error %v", caseLabel, err) @@ -133,9 +133,9 @@ func (c *client) testGcLatestPutBeforeSafePoint() { safepoint := c.mustGetSafePoint() physical := safepoint >> 18 - oldStartTs := (physical - DATA_FENSE) << 18 + oldStartTs := (physical - timeFence) << 18 oldCommitTs := oldStartTs + 1000 - newStartTs := (physical + DATA_FENSE) << 18 + newStartTs := (physical + timeFence) << 18 newCommitTs := newStartTs + 1000 oldKey, oldValue := genMvccPut(key, value, oldStartTs, oldCommitTs) @@ -144,7 +144,7 @@ func (c *client) testGcLatestPutBeforeSafePoint() { log.Fatalf("[%s] kv put error %v", caseLabel, err) } - GcByCompact(debugCli) + gcByCompact(debugCli) value, err = c.rawKvCli.Get(context.TODO(), oldKey, rawkv.GetOption{Cf: rawkv.CfWrite}) if err != nil { log.Fatalf("[%s] kv get error %v", caseLabel, err) @@ -159,7 +159,7 @@ func (c *client) testGcLatestPutBeforeSafePoint() { log.Fatalf("[%s] kv put error %v", caseLabel, err) } - GcByCompact(debugCli) + gcByCompact(debugCli) value, err = c.rawKvCli.Get(context.TODO(), oldKey, rawkv.GetOption{Cf: rawkv.CfWrite}) if err != nil { log.Fatalf("[%s] kv get error %v", caseLabel, err) @@ -179,9 +179,9 @@ func (c *client) testGcLatestStaleDeleteMark(shouldGc bool) { safepoint := c.mustGetSafePoint() physical := safepoint >> 18 - oldStartTs := (physical - DATA_FENSE) << 18 + oldStartTs := (physical - timeFence) << 18 oldCommitTs := oldStartTs + 1000 - newStartTs := (physical - DATA_FENSE + 10) << 18 + newStartTs := (physical - timeFence + 10) << 18 newCommitTs := newStartTs + 1000 oldKey, oldValue := genMvccPut(key, value, oldStartTs, oldCommitTs) @@ -196,7 +196,7 @@ func (c *client) testGcLatestStaleDeleteMark(shouldGc bool) { log.Fatalf("[%s] kv put error %v", caseLabel, err) } - GcByCompact(debugCli) + gcByCompact(debugCli) type Pair struct { key []byte @@ -220,7 +220,7 @@ func (c *client) testDynamicConfChange() { var err error debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) - GcByCompact(debugCli) // Regenerate all sst files. + gcByCompact(debugCli) // Regenerate all sst files. // Change ratio-threshold to a large value, so that GC won't run in compaction filter. log.Infof("[%s] Testing change gc.ratio-threshold to 10", caseLabel) diff --git a/config/tikv/gc-in-compaction-filter-basic.toml b/config/tikv/gc-in-compaction-filter-basic.toml new file mode 100644 index 00000000..c87962a2 --- /dev/null +++ b/config/tikv/gc-in-compaction-filter-basic.toml @@ -0,0 +1,6 @@ +# Used to test basic functions for gc-in-compaction-filter. + +[gc] +ratio-threshold = 0.9 +enable-compaction-filter = true +compaction-filter-skip-version-check = true From 5b38ec052dd53818b80726aafee5566a017edec3 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 25 Dec 2020 14:51:48 +0800 Subject: [PATCH 3/4] update Signed-off-by: qupeng --- cmd/gc-in-compaction-filter/cases.go | 19 ++++++++++--------- cmd/gc-in-compaction-filter/main.go | 2 +- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/cmd/gc-in-compaction-filter/cases.go b/cmd/gc-in-compaction-filter/cases.go index 1362bc39..11bc1cfa 100644 --- a/cmd/gc-in-compaction-filter/cases.go +++ b/cmd/gc-in-compaction-filter/cases.go @@ -91,6 +91,11 @@ func (c *client) testGcStaleVersions() { var err error debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) + _, err = c.db.Exec("set config tikv gc.ratio-threshold = 0.9") + if err != nil { + log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) + } + key := []byte("test_gc_stale_versions") value := []byte("test_gc_stale_versions") @@ -128,6 +133,11 @@ func (c *client) testGcLatestPutBeforeSafePoint() { var err error debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) + _, err = c.db.Exec("set config tikv gc.ratio-threshold = 0.9") + if err != nil { + log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) + } + key := []byte("test_gc_latest_put") value := []byte("test_gc_latest_put") @@ -222,15 +232,6 @@ func (c *client) testDynamicConfChange() { debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) gcByCompact(debugCli) // Regenerate all sst files. - // Change ratio-threshold to a large value, so that GC won't run in compaction filter. - log.Infof("[%s] Testing change gc.ratio-threshold to 10", caseLabel) - _, err = c.db.Exec("set config tikv gc.ratio-threshold = 10") - if err != nil { - log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) - } - time.Sleep(5 * time.Second) // Sleep a while to wait TiKVs get the update. - c.testGcLatestStaleDeleteMark(false) - log.Infof("[%s] Testing change gc.ratio-threshold to 0.9", caseLabel) _, err = c.db.Exec("set config tikv gc.ratio-threshold = 0.9") if err != nil { diff --git a/cmd/gc-in-compaction-filter/main.go b/cmd/gc-in-compaction-filter/main.go index 2d4c4b88..671d15ad 100644 --- a/cmd/gc-in-compaction-filter/main.go +++ b/cmd/gc-in-compaction-filter/main.go @@ -133,7 +133,7 @@ func (c *client) Start(ctx context.Context, cfg interface{}, clientNodes []clust } var ( - clusterVersion = flag.String("cluster-version", "4.x", "support values: 4.x / 5.x, default value: 4.x") + clusterVersion = flag.String("cluster-version", "5.x", "support values: 4.x / 5.x, default value: 5.x") localMode = flag.Bool("local-mode", false, "use local mode or not") waitSafePoint = flag.Bool("wait-safe-point", false, "wait tidb to update safe point or not") ) From 18c810706d9e8a4ec6fd461eb878fff8dbf224a4 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 5 Jan 2021 17:13:58 +0800 Subject: [PATCH 4/4] append random bytes on keys Signed-off-by: qupeng --- cmd/gc-in-compaction-filter/cases.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/cmd/gc-in-compaction-filter/cases.go b/cmd/gc-in-compaction-filter/cases.go index 11bc1cfa..697912ee 100644 --- a/cmd/gc-in-compaction-filter/cases.go +++ b/cmd/gc-in-compaction-filter/cases.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/rand" "fmt" "net/url" "strconv" @@ -86,6 +87,12 @@ func genMvccDelete(key []byte, startTs uint64, commitTs uint64) ([]byte, []byte) return writeKey, writeValue } +func genRandomBytes(size int) (blk []byte) { + blk = make([]byte, size) + _, _ = rand.Read(blk) + return +} + // Case 1: test old versions before the safe point will be cleaned by compaction. func (c *client) testGcStaleVersions() { var err error @@ -96,7 +103,7 @@ func (c *client) testGcStaleVersions() { log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) } - key := []byte("test_gc_stale_versions") + key := append([]byte("test_gc_stale_versions"), genRandomBytes(8)...) value := []byte("test_gc_stale_versions") safepoint := c.mustGetSafePoint() @@ -138,7 +145,7 @@ func (c *client) testGcLatestPutBeforeSafePoint() { log.Fatalf("[%s] change gc.ratio-threshold fail, error %v", caseLabel, err) } - key := []byte("test_gc_latest_put") + key := append([]byte("test_gc_latest_put"), genRandomBytes(8)...) value := []byte("test_gc_latest_put") safepoint := c.mustGetSafePoint() @@ -184,7 +191,7 @@ func (c *client) testGcLatestStaleDeleteMark(shouldGc bool) { var err error debugCli := c.mustGetTiKVCtlClient(c.kvAddrs[0]) - key := []byte("test_gc_latest_stale_delete") + key := append([]byte("test_gc_latest_stale_delete"), genRandomBytes(8)...) value := []byte("test_gc_latest_stale_delete") safepoint := c.mustGetSafePoint()