diff --git a/.gitignore b/.gitignore index ab567194a1..f066ba0931 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,4 @@ pkg !codis/cmd/fe/assets/** tests/tmp +tools/pika_cdc/pika/proto \ No newline at end of file diff --git a/src/pika_inner_message.proto b/src/pika_inner_message.proto index 9e2a3ef04c..92619f651d 100644 --- a/src/pika_inner_message.proto +++ b/src/pika_inner_message.proto @@ -1,6 +1,8 @@ syntax = "proto2"; package InnerMessage; +option go_package = "./proto/inner"; + enum Type { kMetaSync = 1; kTrySync = 2; diff --git a/src/rsync_service.proto b/src/rsync_service.proto index ee23b3e8a4..abaf7671ae 100644 --- a/src/rsync_service.proto +++ b/src/rsync_service.proto @@ -1,6 +1,8 @@ syntax = "proto2"; package RsyncService; +option go_package = "./proto/rsync"; + enum Type { kRsyncMeta = 1; kRsyncFile = 2; diff --git a/third/blackwidow b/third/blackwidow new file mode 160000 index 0000000000..904475824b --- /dev/null +++ b/third/blackwidow @@ -0,0 +1 @@ +Subproject commit 904475824bc9d12b7bea4dc7ca30a00c7317d0f6 diff --git a/third/glog b/third/glog new file mode 160000 index 0000000000..ecdbd7cda6 --- /dev/null +++ b/third/glog @@ -0,0 +1 @@ +Subproject commit ecdbd7cda69e1ff304ac02f7f277715a162e1474 diff --git a/third/pink b/third/pink new file mode 160000 index 0000000000..60ac6c5677 --- /dev/null +++ b/third/pink @@ -0,0 +1 @@ +Subproject commit 60ac6c5677eb1dd51bb9b95e4c2f12a903633d0b diff --git a/tools/pika_cdc/Makefile b/tools/pika_cdc/Makefile new file mode 100644 index 0000000000..837d0cc1dc --- /dev/null +++ b/tools/pika_cdc/Makefile @@ -0,0 +1,26 @@ +PROTO_OUT = ./pika +PROTO_DIR = ../../src/ + +GO_FILES = $(shell find . -name '*.go') + +PROTO_FILES := $(wildcard $(PROTO_DIR)/*.proto) + +OUTPUT_BIN = pika_cdc + +PROTOC = protoc +GO_BUILD = go build +GO_CLEAN = go clean + +.PHONY: all proto build clean + +all: proto build + +proto: $(PROTO_FILES) + $(PROTOC) --proto_path=$(PROTO_DIR) --go_out=$(PROTO_OUT) $^ + +build: $(GO_FILES) + $(GO_BUILD) -o $(OUTPUT_BIN) + +clean: + $(GO_CLEAN) + rm -f $(OUTPUT_BIN) \ No newline at end of file diff --git a/tools/pika_cdc/README.md b/tools/pika_cdc/README.md new file mode 100644 index 0000000000..d2923924f0 --- /dev/null +++ b/tools/pika_cdc/README.md @@ -0,0 +1,24 @@ +# Pika cdc +**A tool for incremental synchronization of pika command** + +By imitating a pika slave + +# Build +**Make sure the system has protoc installed** +```bash +brew install protobuf +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +``` + +## Build pika cdc +```bash +make +``` + +## Todo: + +Consumer side: +- [x] **redis** +- [x] **kafka** Create a topic of the same name for each pika's DB +- [ ] **bifrost** \ No newline at end of file diff --git a/tools/pika_cdc/conf/cdc.yml b/tools/pika_cdc/conf/cdc.yml new file mode 100644 index 0000000000..be123c410f --- /dev/null +++ b/tools/pika_cdc/conf/cdc.yml @@ -0,0 +1,20 @@ +# pika_server, this port is pika replication port. +pika_repl_server : 127.0.0.1:11221 +# pika_server, this port is pika redis client port. +pika_client_server : 127.0.0.1:9221 +# For data from one DB of one pika, a separate MQ topic is created, +# and the name of the topic is the dbname of the pika +kafka_servers: + - 127.0.0.1:9092 +redis_servers: + - 127.0.0.1:6379 +# retry times while send message failed +retries : 0 +# retry interval while send message failed(ms) +retry_interval: 10 +parallel_thread_size: 1 +# the size of the cached channel in pika cdc +buffer_msg_numbers: 10 + + + diff --git a/tools/pika_cdc/conf/conf.go b/tools/pika_cdc/conf/conf.go new file mode 100644 index 0000000000..feef12a570 --- /dev/null +++ b/tools/pika_cdc/conf/conf.go @@ -0,0 +1,50 @@ +package conf + +import ( + "fmt" + "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "runtime" +) + +type PikaCdcConfig struct { + PikaReplServer string `yaml:"pika_repl_server"` + PikaClientServer string `yaml:"pika_client_server"` + KafkaServers []string `yaml:"kafka_servers"` + RedisServers []string `yaml:"redis_servers"` + Retries int `yaml:"retries"` + RetryInterval int `yaml:"retry_interval"` + ParallelThreadSize int `yaml:"parallel_thread_size"` + BufferMsgNumbers int `yaml:"buffer_msg_numbers"` +} + +var ConfigInstance = PikaCdcConfig{} + +func init() { + _, filename, _, _ := runtime.Caller(0) + filename = filepath.Join(filepath.Dir(filename), "cdc.yml") + file, err := ioutil.ReadFile(filename) + if err != nil { + log.Fatal("fail to read file:", err) + } + + err = yaml.Unmarshal(file, &ConfigInstance) + if err != nil { + log.Fatal("fail to yaml unmarshal:", err) + } + + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + return "", fmt.Sprintf("%s:%d", path.Base(f.File), f.Line) + }, + }) + + logrus.SetReportCaller(true) + logrus.SetOutput(os.Stdout) +} diff --git a/tools/pika_cdc/consumer/consumer.go b/tools/pika_cdc/consumer/consumer.go new file mode 100644 index 0000000000..fc102d637d --- /dev/null +++ b/tools/pika_cdc/consumer/consumer.go @@ -0,0 +1,32 @@ +package consumer + +import ( + "pika_cdc/conf" +) + +type Consumer interface { + SendCmdMessage(dbName string, msg []byte) error + Name() string + Close() error + Run() + Stop() +} + +type Factory struct{} + +func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) { + var consumers []Consumer + + // kafka + for _, k := range config.KafkaServers { + kafka, _ := NewKafka(k, config.Retries, msgChanns) + consumers = append(consumers, kafka) + } + + // redis + for _, r := range config.RedisServers { + newRedis, _ := NewRedis(r, msgChanns) + consumers = append(consumers, newRedis) + } + return consumers, nil +} diff --git a/tools/pika_cdc/consumer/kafka.go b/tools/pika_cdc/consumer/kafka.go new file mode 100644 index 0000000000..51740cd6fe --- /dev/null +++ b/tools/pika_cdc/consumer/kafka.go @@ -0,0 +1,76 @@ +package consumer + +import ( + "context" + "github.com/segmentio/kafka-go" + "sync" +) + +type Kafka struct { + servers string + topics []string + retries int + kafkaConns map[string]*kafka.Conn + wg sync.WaitGroup + msgChanns map[string]chan []byte + stopChan chan bool + protocol KafkaProtocol +} + +func (k *Kafka) SendCmdMessage(dbName string, msg []byte) error { + k.kafkaConns[dbName].Write(k.protocol.ToConsumer(msg)) + return nil +} + +func (k *Kafka) Name() string { + return "Kafka" +} + +func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) { + k := &Kafka{} + k.protocol = KafkaProtocol{} + k.kafkaConns = make(map[string]*kafka.Conn) + k.msgChanns = make(map[string]chan []byte) + for dbname, chann := range msgChanns { + conn, err := kafka.DialLeader(context.Background(), "tcp", server, dbname, 0) + if err != nil { + return k, err + } + k.kafkaConns[dbname] = conn + k.msgChanns[dbname] = chann + } + k.stopChan = make(chan bool) + k.retries = retries + return k, nil +} + +func (k *Kafka) Close() error { + k.Stop() + for _, conn := range k.kafkaConns { + if err := conn.Close(); err != nil { + return err + } + } + return nil +} +func (k *Kafka) Run() { + var wg sync.WaitGroup + for dbName, chann := range k.msgChanns { + wg.Add(1) + go func(dbName string, ch chan []byte) { + defer wg.Done() + for { + select { + case msg := <-ch: + k.SendCmdMessage(dbName, msg) + case <-k.stopChan: + return + } + } + }(dbName, chann) + } + wg.Wait() +} +func (k *Kafka) Stop() { + k.stopChan <- true +} diff --git a/tools/pika_cdc/consumer/protocol.go b/tools/pika_cdc/consumer/protocol.go new file mode 100644 index 0000000000..2c77b8d15e --- /dev/null +++ b/tools/pika_cdc/consumer/protocol.go @@ -0,0 +1,27 @@ +package consumer + +import ( + "fmt" + "strconv" +) + +type Protocol interface { + ToConsumer(msg []byte) []byte +} +type RedisProtocol struct{} + +func (rp RedisProtocol) ToConsumer(msg []byte) []byte { + return msg +} +func (rp RedisProtocol) Select(dbName string) []byte { + db, _ := strconv.Atoi(dbName[len(dbName)-1:]) + dbStr := strconv.Itoa(db) + msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr) + return []byte(msg) +} + +type KafkaProtocol struct{} + +func (kp KafkaProtocol) ToConsumer(msg []byte) []byte { + return msg +} diff --git a/tools/pika_cdc/consumer/redis.go b/tools/pika_cdc/consumer/redis.go new file mode 100644 index 0000000000..ff50cdf17f --- /dev/null +++ b/tools/pika_cdc/consumer/redis.go @@ -0,0 +1,80 @@ +package consumer + +import ( + "bufio" + "fmt" + "net" + "sync" + "time" +) + +type Redis struct { + redisProtocol RedisProtocol + conns map[string]net.Conn + msgChanns map[string]chan []byte + stopChan chan bool +} + +func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) { + r := &Redis{redisProtocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)} + var err error + for dbName, _ := range msgChanns { + r.conns[dbName], err = net.Dial("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to connect to Redis server: %v", err) + } + selectCmdBytes := r.redisProtocol.Select(dbName) + r.conns[dbName].Write(selectCmdBytes) + } + return r, nil +} + +func (r *Redis) SendCmdMessage(dbName string, msg []byte) error { + _, err := r.sendRedisData(dbName, msg) + return err +} + +func (r *Redis) Name() string { + return string("Redis") +} +func (r *Redis) Close() error { + for _, conn := range r.conns { + conn.Close() + } + return nil +} + +func (r *Redis) sendRedisData(dbName string, data []byte) (string, error) { + r.conns[dbName].SetDeadline(time.Now().Add(5 * time.Second)) + _, err := r.conns[dbName].Write(data) + if err != nil { + return "", fmt.Errorf("failed to send data to Redis server: %v", err) + } + reader := bufio.NewReader(r.conns[dbName]) + response, err := reader.ReadString('\n') + if err != nil { + return "", fmt.Errorf("failed to read response from Redis server: %v", err) + } + return response, nil +} +func (r *Redis) Run() { + var wg sync.WaitGroup + for dbName, chann := range r.msgChanns { + wg.Add(1) + go func(dbName string, ch chan []byte) { + defer wg.Done() + for { + select { + case msg := <-ch: + r.sendRedisData(dbName, msg) + case <-r.stopChan: + return + } + } + }(dbName, chann) + } + wg.Wait() +} +func (r *Redis) Stop() { + r.stopChan <- true +} diff --git a/tools/pika_cdc/go.mod b/tools/pika_cdc/go.mod new file mode 100644 index 0000000000..4fa1eb9066 --- /dev/null +++ b/tools/pika_cdc/go.mod @@ -0,0 +1,44 @@ +module pika_cdc + +go 1.20 + +require ( + github.com/gin-gonic/gin v1.10.0 + github.com/golang/protobuf v1.5.4 + github.com/redis/go-redis/v9 v9.6.0 + github.com/segmentio/kafka-go v0.4.47 + github.com/sirupsen/logrus v1.9.3 + google.golang.org/protobuf v1.34.2 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect +) diff --git a/tools/pika_cdc/go.sum b/tools/pika_cdc/go.sum new file mode 100644 index 0000000000..14162e1595 --- /dev/null +++ b/tools/pika_cdc/go.sum @@ -0,0 +1,150 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= +github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= +github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.6.0 h1:NLck+Rab3AOTHw21CGRpvQpgTrAU4sgdCswqGtlhGRA= +github.com/redis/go-redis/v9 v9.6.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= +golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/tools/pika_cdc/main.go b/tools/pika_cdc/main.go new file mode 100644 index 0000000000..043dc72271 --- /dev/null +++ b/tools/pika_cdc/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "github.com/sirupsen/logrus" + "pika_cdc/conf" + "pika_cdc/consumer" + "pika_cdc/pika" +) + +func main() { + if pikaServer, err := pika.New(conf.ConfigInstance.PikaReplServer, conf.ConfigInstance.BufferMsgNumbers); err != nil { + logrus.Fatal("failed to connect pika server, {}", err) + } else { + if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChanns); err != nil { + logrus.Fatal("failed to generate consumers, {}", err) + } else { + for _, c := range consumers { + go c.Run() + } + } + pikaServer.Run() + } +} diff --git a/tools/pika_cdc/pika/cmd.go b/tools/pika_cdc/pika/cmd.go new file mode 100644 index 0000000000..aea5886bc0 --- /dev/null +++ b/tools/pika_cdc/pika/cmd.go @@ -0,0 +1,7 @@ +package pika + +type Cmd struct{} + +func (c *Cmd) Name() string { + return "unimplemented" +} diff --git a/tools/pika_cdc/pika/replprotocol.go b/tools/pika_cdc/pika/replprotocol.go new file mode 100644 index 0000000000..f2cc2d9545 --- /dev/null +++ b/tools/pika_cdc/pika/replprotocol.go @@ -0,0 +1,335 @@ +package pika + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/sirupsen/logrus" + "io" + "pika_cdc/pika/proto/inner" +) + +const HeaderLength = 4 + +type ReplProtocol struct { + writer *bufio.Writer + reader *bufio.Reader + binlogSyncInfos map[string]binlogSyncInfo + dbMetaInfo *inner.InnerResponse_MetaSync + ip string + port int32 +} + +type binlogSyncInfo struct { + binlogOffset *inner.BinlogOffset + fileNum uint32 + offset uint64 + sessionId int32 + isFirst bool +} + +func (repl *ReplProtocol) GetSyncWithPika() error { + if err := repl.sendMetaSyncRequest(); err != nil { + return err + } + metaResp, err := repl.getResponse() + if err != nil { + logrus.Fatal("Failed to get metaResp:", err) + } + repl.dbMetaInfo = metaResp.MetaSync + + trySyncType := inner.Type_kTrySync + binlogSyncType := inner.Type_kBinlogSync + + replDBs := metaResp.MetaSync.DbsInfo + var a uint64 = 0 + var b uint32 = 0 + repl.binlogSyncInfos = make(map[string]binlogSyncInfo) + for _, dbInfo := range replDBs { + newMetaInfo := binlogSyncInfo{ + binlogOffset: &inner.BinlogOffset{ + Filenum: nil, + Offset: nil, + Term: nil, + Index: nil, + }, + fileNum: 0, + offset: 0, + sessionId: 0, + } + newMetaInfo.binlogOffset.Offset = &a + newMetaInfo.binlogOffset.Filenum = &b + + slotId := uint32(*dbInfo.SlotNum) + trySync := &inner.InnerRequest{ + Type: &trySyncType, + TrySync: &inner.InnerRequest_TrySync{ + Node: &inner.Node{ + Ip: &repl.ip, + Port: &repl.port, + }, + Slot: &inner.Slot{ + DbName: dbInfo.DbName, + SlotId: &slotId, + }, + BinlogOffset: newMetaInfo.binlogOffset, + }, + ConsensusMeta: nil, + } + if err := repl.sendReplReq(trySync); err != nil { + return err + } + + trySyncResp, err := repl.getResponse() + if err != nil || trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { + logrus.Fatal("Failed to get TrySync Response Msg") + } + startOffset := trySyncResp.TrySync.GetBinlogOffset() + trySync.TrySync.BinlogOffset = startOffset + // send twice to get session id + if err := repl.sendReplReq(trySync); err != nil { + return err + } + trySyncResp, err = repl.getResponse() + + newMetaInfo.binlogOffset = startOffset + newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId + newMetaInfo.isFirst = true + repl.binlogSyncInfos[dbInfo.GetDbName()] = newMetaInfo + } + + // todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine + for dbName, dbInfo := range repl.binlogSyncInfos { + var slotId uint32 = 0 + binlogSyncReq := &inner.InnerRequest{ + Type: &binlogSyncType, + MetaSync: nil, + TrySync: nil, + DbSync: nil, + BinlogSync: &inner.InnerRequest_BinlogSync{ + Node: &inner.Node{ + Ip: &repl.ip, + Port: &repl.port, + }, + DbName: &dbName, + SlotId: &slotId, + AckRangeStart: dbInfo.binlogOffset, + AckRangeEnd: dbInfo.binlogOffset, + SessionId: &dbInfo.sessionId, + FirstSend: &dbInfo.isFirst, + }, + RemoveSlaveNode: nil, + ConsensusMeta: nil, + } + if err := repl.sendReplReq(binlogSyncReq); err != nil { + return err + } + } + return nil +} + +func (repl *ReplProtocol) GetBinlogSync() (map[string][]byte, error) { + + binlogSyncType := inner.Type_kBinlogSync + // This is a collection of binlogs for all DB's + binlogBytes := make(map[string][]byte) + // todo(leehao): Receive multiple binlog sync responses simultaneously + binlogSyncResp, err := repl.getResponse() + if err != nil { + return nil, err + } + if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || + *binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil { + logrus.Fatal("get binlog sync response failed") + } else { + for _, item := range binlogSyncResp.BinlogSync { + slotId := *item.Slot.SlotId + dbName := *item.Slot.DbName + binlogInfo := repl.binlogSyncInfos[dbName] + binlogInfo.isFirst = false + binlogOffset := item.BinlogOffset + if len(item.Binlog) == 0 || (*binlogOffset.Offset == binlogInfo.offset && *binlogOffset.Filenum == binlogInfo.fileNum) { + *binlogOffset.Filenum = 0 + *binlogOffset.Offset = 0 + } else { + binlogInfo.binlogOffset = binlogOffset + if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil { + logrus.Fatal(err) + } else { + logrus.Info("recv binlog db:", dbName, " ,size:", len(item.Binlog)) + binlogBytes[dbName] = binlogItem.Content + } + } + err := repl.sendReplReq(&inner.InnerRequest{ + Type: &binlogSyncType, + MetaSync: nil, + TrySync: nil, + DbSync: nil, + BinlogSync: &inner.InnerRequest_BinlogSync{ + Node: &inner.Node{ + Ip: &repl.ip, + Port: &repl.port, + }, + DbName: &dbName, + SlotId: &slotId, + AckRangeStart: binlogOffset, + AckRangeEnd: binlogOffset, + SessionId: &binlogInfo.sessionId, + FirstSend: &binlogInfo.isFirst, + }, + RemoveSlaveNode: nil, + ConsensusMeta: nil, + }) + repl.binlogSyncInfos[dbName] = binlogInfo + if err != nil { + logrus.Warn("Failed to send binlog sync, ", err) + return nil, err + } + } + } + return binlogBytes, nil +} + +func (repl *ReplProtocol) Ping() string { + _, err := repl.writer.WriteString("PING\r\n") + if err != nil { + logrus.Warn("Error writing to connection:", err) + return string("") + } + repl.writer.Flush() + + resp, err := repl.reader.ReadString('\n') + if err != nil { + logrus.Warn("Error reading from connection:", err) + return string("") + } + return resp +} + +func (repl *ReplProtocol) sendMetaSyncRequest() error { + metaSyncType := inner.Type_kMetaSync + request := &inner.InnerRequest{ + Type: &metaSyncType, + MetaSync: &inner.InnerRequest_MetaSync{ + Node: &inner.Node{ + Ip: &repl.ip, + Port: &repl.port, + }, + }, + } + return repl.sendReplReq(request) +} + +func (repl *ReplProtocol) sendReplReq(request *inner.InnerRequest) error { + msg, err := proto.Marshal(request) + if err != nil { + logrus.Fatal("Error Marshal:", err) + } + + pikaTag := []byte(repl.buildInternalTag(msg)) + allBytes := append(pikaTag, msg...) + _, err = repl.writer.Write(allBytes) + if err != nil { + logrus.Fatal("Error writing to server:", err) + } + repl.writer.Flush() + return nil +} + +func (repl *ReplProtocol) getResponse() (*inner.InnerResponse, error) { + header := make([]byte, HeaderLength) + _, err := repl.reader.Read(header) + if err != nil { + if err != io.EOF { + logrus.Fatal("Error reading header:", err) + } + return nil, err + } + + // Convert the header to an integer + var bodyLength uint32 + buffer := bytes.NewBuffer(header) + err = binary.Read(buffer, binary.BigEndian, &bodyLength) + if err != nil { + logrus.Fatal("Error converting header to integer:", err) + return nil, err + } + // Read the body + body := make([]byte, bodyLength) + _, err = repl.reader.Read(body) + if err != nil { + logrus.Fatal("Error reading body:", err) + return nil, err + } + + res := &inner.InnerResponse{} + err = proto.Unmarshal(body, res) + if err != nil { + logrus.Warn("Error Deserialization:", err) + return nil, err + } + return res, nil +} + +func (repl *ReplProtocol) buildInternalTag(resp []byte) (tag string) { + respSize := uint32(len(resp)) + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, respSize) + return string(buf) +} + +type binlogItem struct { + Type uint16 + CreateTime uint32 + TermId uint32 + LogicId uint64 + FileNum uint32 + Offset uint64 + ContentLength uint32 + Content []byte +} + +func (repl *ReplProtocol) decodeBinlogItem(data []byte) (*binlogItem, error) { + if len(data) < 34 { + return nil, fmt.Errorf("data length is too short") + } + + reader := bytes.NewReader(data) + + binlogItem := &binlogItem{} + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.Type); err != nil { + return nil, fmt.Errorf("failed to read Type: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.CreateTime); err != nil { + return nil, fmt.Errorf("failed to read Create Time: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.TermId); err != nil { + return nil, fmt.Errorf("failed to read Term Id: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.LogicId); err != nil { + return nil, fmt.Errorf("failed to read Logic Id: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.FileNum); err != nil { + return nil, fmt.Errorf("failed to read File Num: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.Offset); err != nil { + return nil, fmt.Errorf("failed to read Offset: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.ContentLength); err != nil { + return nil, fmt.Errorf("failed to read Content Length: %v", err) + } + + contentLength := int(binlogItem.ContentLength) + if len(data) < 34+contentLength { + return nil, fmt.Errorf("data length is too short for content") + } + + binlogItem.Content = make([]byte, contentLength) + if _, err := reader.Read(binlogItem.Content); err != nil { + return nil, fmt.Errorf("failed to read Content: %v", err) + } + + return binlogItem, nil +} diff --git a/tools/pika_cdc/pika/replprotocol_test.go b/tools/pika_cdc/pika/replprotocol_test.go new file mode 100644 index 0000000000..8c78a4f1fd --- /dev/null +++ b/tools/pika_cdc/pika/replprotocol_test.go @@ -0,0 +1,460 @@ +package pika + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" + "io" + "net" + "os" + "pika_cdc/conf" + "pika_cdc/pika/proto/inner" + "testing" + "time" +) + +func TestConnect(t *testing.T) { + cxt := context.Background() + + addr := conf.ConfigInstance.PikaClientServer + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: "", // no password set + DB: 0, // use default DB + }) + fmt.Println(client.Get(cxt, "key")) +} + +func TestSendMetaSync(t *testing.T) { + ip := "127.0.0.1" + listener, e := net.Listen("tcp", ":0") + if e != nil { + os.Exit(1) + } + selfPort := getPort(listener.Addr().String()) + addr := conf.ConfigInstance.PikaReplServer + tt := inner.Type_kMetaSync + request := inner.InnerRequest{ + Type: &tt, + MetaSync: &inner.InnerRequest_MetaSync{ + Node: &inner.Node{ + Ip: &ip, + Port: &selfPort, + }, + }, + } + msg, err := proto.Marshal(&request) + conn, err := net.Dial("tcp", addr) + if err != nil { + fmt.Println("Error connecting:", err) + os.Exit(1) + } + defer conn.Close() + + pikaTag := []byte(BuildInternalTag(msg)) + allBytes := append(pikaTag, msg...) + _, err = conn.Write(allBytes) + if err != nil { + fmt.Println("Error writing to server:", err) + os.Exit(1) + } +} + +func receiveReplMsg(listener net.Listener) { + defer listener.Close() + fmt.Println("Listening on ", listener.Addr().String()) + for { + conn, err := listener.Accept() + fmt.Println(conn.LocalAddr().String() + " connect") + if err != nil { + fmt.Println("Error accepting connection:", err) + continue + } + //go handleConnection(conn) + } +} +func getResponse(conn net.Conn) *inner.InnerResponse { + // Read the header (length) + header := make([]byte, HeaderLength) + _, err := io.ReadFull(conn, header) + if err != nil { + if err != io.EOF { + fmt.Println("Error reading header:", err) + } + return nil + } + + // Convert the header to an integer + var bodyLength uint32 + buffer := bytes.NewBuffer(header) + err = binary.Read(buffer, binary.BigEndian, &bodyLength) + if err != nil { + logrus.Fatal("Error converting header to integer:", err) + return nil + } + // Read the body + body := make([]byte, bodyLength) + _, err = io.ReadFull(conn, body) + if err != nil { + logrus.Fatal("Error reading body:", err) + return nil + } + + res := &inner.InnerResponse{} + err = proto.Unmarshal(body, res) + if err != nil { + logrus.Fatal("Error Deserialization:", err) + } + return res +} + +func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) { + if conn == nil { + addr := conf.ConfigInstance.PikaReplServer + newConn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + conn = newConn + } + msg, err := proto.Marshal(request) + if err != nil { + logrus.Fatal("Error Marshal:", err) + } + + pikaTag := []byte(BuildInternalTag(msg)) + allBytes := append(pikaTag, msg...) + _, err = conn.Write(allBytes) + if err != nil { + logrus.Fatal("Error writing to server:", err) + } + return conn, nil +} + +func sendMetaSyncRequest(conn net.Conn) (net.Conn, error) { + if conn == nil { + addr := conf.ConfigInstance.PikaReplServer + newConn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + conn = newConn + } + metaSyncType := inner.Type_kMetaSync + port := getPort(conn.LocalAddr().String()) + ip := getIP(conn.LocalAddr().String()) + request := &inner.InnerRequest{ + Type: &metaSyncType, + MetaSync: &inner.InnerRequest_MetaSync{ + Node: &inner.Node{ + Ip: &ip, + Port: &port, + }, + }, + } + return sendReplReq(conn, request) +} + +func TestGetOffsetFromMaster(t *testing.T) { + ip := "127.0.0.1" + listener, e := net.Listen("tcp", ":0") + if e != nil { + os.Exit(1) + } + selfPort := getPort(listener.Addr().String()) + conn, err := sendMetaSyncRequest(nil) + if err != nil { + logrus.Fatal("Failed to sendMetaSyncRequest") + } + metaResp := getResponse(conn) + trySyncType := inner.Type_kTrySync + replDBs := metaResp.MetaSync.DbsInfo + var fileNum uint32 = 1 + var offset uint64 = 0 + for _, db := range replDBs { + slotId := uint32(*db.SlotNum) + trySync := &inner.InnerRequest{ + Type: &trySyncType, + TrySync: &inner.InnerRequest_TrySync{ + Node: &inner.Node{ + Ip: &ip, + Port: &selfPort, + }, + Slot: &inner.Slot{ + DbName: db.DbName, + SlotId: &slotId, + }, + BinlogOffset: &inner.BinlogOffset{ + Filenum: &fileNum, + Offset: &offset, + Term: nil, + Index: nil, + }, + }, + ConsensusMeta: nil, + } + _, err = sendReplReq(conn, trySync) + if err != nil { + logrus.Fatal("Failed to send TrySync Msg", err) + } + trySyncResp := getResponse(conn) + if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { + logrus.Fatal("Failed to get TrySync Response Msg", err) + } + trySync.TrySync.BinlogOffset = trySyncResp.TrySync.GetBinlogOffset() + logrus.Println("get offset:", trySync.TrySync.BinlogOffset) + } +} + +func TestSendDbSyncReqMsg(t *testing.T) { + ip := "127.0.0.1" + listener, e := net.Listen("tcp", ":0") + if e != nil { + os.Exit(1) + } + + selfPort := getPort(listener.Addr().String()) + + metaSyncType := inner.Type_kMetaSync + + request := &inner.InnerRequest{ + Type: &metaSyncType, + MetaSync: &inner.InnerRequest_MetaSync{ + Node: &inner.Node{ + Ip: &ip, + Port: &selfPort, + }, + }, + } + conn, err := sendReplReq(nil, request) + if err != nil { + os.Exit(1) + } + metaResp := getResponse(conn) + + dbSyncType := inner.Type_kDBSync + replDBs := metaResp.MetaSync.DbsInfo + for _, db := range replDBs { + var fileNum uint32 = 1 + var offset uint64 = 0 + slotId := uint32(*db.SlotNum) + dbSyncReq := &inner.InnerRequest{ + Type: &dbSyncType, + DbSync: &inner.InnerRequest_DBSync{ + Node: &inner.Node{ + Ip: &ip, + Port: &selfPort, + }, + Slot: &inner.Slot{ + DbName: db.DbName, + SlotId: &slotId, + }, + BinlogOffset: &inner.BinlogOffset{ + Filenum: &fileNum, + Offset: &offset, + Term: nil, + Index: nil, + }, + }, + ConsensusMeta: nil, + } + sendReplReq(conn, dbSyncReq) + } +} + +func BuildInternalTag(resp []byte) (tag string) { + respSize := uint32(len(resp)) + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, respSize) + return string(buf) +} + +// Pika Binlog Item +type BinlogItem struct { + Type uint16 + CreateTime uint32 + TermId uint32 + LogicId uint64 + FileNum uint32 + Offset uint64 + ContentLength uint32 + Content []byte +} + +// Test incremental synchronization of data +// pika -> redis +func TestGetIncrementalSync(t *testing.T) { + conn, err := sendMetaSyncRequest(nil) + if err != nil { + logrus.Fatal(err) + } + metaResp := getResponse(conn) + if metaResp == nil { + logrus.Fatal("Failed to get metaResp") + } + trySyncType := inner.Type_kTrySync + binlogSyncType := inner.Type_kBinlogSync + replDBs := metaResp.MetaSync.DbsInfo + var fileNum uint32 = 1 + var offset uint64 = 0 + ip := getIP(conn.LocalAddr().String()) + port := getPort(conn.LocalAddr().String()) + + for _, db := range replDBs { + slotId := uint32(*db.SlotNum) + trySync := &inner.InnerRequest{ + Type: &trySyncType, + TrySync: &inner.InnerRequest_TrySync{ + Node: &inner.Node{ + Ip: &ip, + Port: &port, + }, + Slot: &inner.Slot{ + DbName: db.DbName, + SlotId: &slotId, + }, + BinlogOffset: &inner.BinlogOffset{ + Filenum: &fileNum, + Offset: &offset, + Term: nil, + Index: nil, + }, + }, + ConsensusMeta: nil, + } + _, err = sendReplReq(conn, trySync) + if err != nil { + logrus.Fatal("Failed to send TrySync Msg", err) + } + trySyncResp := getResponse(conn) + if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { + logrus.Fatal("Failed to get TrySync Response Msg", err) + } + startOffset := trySyncResp.TrySync.GetBinlogOffset() + trySync.TrySync.BinlogOffset = startOffset + // send twice to get session id + sendReplReq(conn, trySync) + trySyncResp = getResponse(conn) + + isFirst := true + binlogSyncReq := &inner.InnerRequest{ + Type: &binlogSyncType, + MetaSync: nil, + TrySync: nil, + DbSync: nil, + BinlogSync: &inner.InnerRequest_BinlogSync{ + Node: trySync.TrySync.Node, + DbName: db.DbName, + SlotId: &slotId, + AckRangeStart: startOffset, + AckRangeEnd: startOffset, + SessionId: trySyncResp.TrySync.SessionId, + FirstSend: &isFirst, + }, + RemoveSlaveNode: nil, + ConsensusMeta: nil, + } + sendReplReq(conn, binlogSyncReq) + go func() { + for { + binlogSyncResp := getResponse(conn) + if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || + *binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil { + logrus.Fatal("get binlog sync response failed") + } else { + for _, item := range binlogSyncResp.BinlogSync { + *binlogSyncReq.BinlogSync.FirstSend = false + if len(item.Binlog) == 0 { + *binlogSyncReq.BinlogSync.AckRangeStart.Filenum = 0 + *binlogSyncReq.BinlogSync.AckRangeStart.Offset = 0 + logrus.Println("receive binlog response keep alive") + } else { + binlogSyncReq.BinlogSync.AckRangeStart = item.BinlogOffset + binlogSyncReq.BinlogSync.AckRangeEnd = item.BinlogOffset + if binlogItem, err := DecodeBinlogItem(item.Binlog); err != nil { + logrus.Fatal(err) + } else { + SendRedisData("127.0.0.1:6379", binlogItem.Content) + } + } + sendReplReq(conn, binlogSyncReq) + } + } + } + }() + } + // never stop as a backend service + for { + } +} + +func SendRedisData(addr string, data []byte) (string, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return "", fmt.Errorf("failed to connect to Redis server: %v", err) + } + defer conn.Close() + + conn.SetDeadline(time.Now().Add(5 * time.Second)) + + _, err = conn.Write(data) + if err != nil { + return "", fmt.Errorf("failed to send data to Redis server: %v", err) + } + + reader := bufio.NewReader(conn) + response, err := reader.ReadString('\n') + if err != nil { + return "", fmt.Errorf("failed to read response from Redis server: %v", err) + } + + return response, nil +} + +func DecodeBinlogItem(data []byte) (*BinlogItem, error) { + if len(data) < 34 { + return nil, fmt.Errorf("data length is too short") + } + + reader := bytes.NewReader(data) + + binlogItem := &BinlogItem{} + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.Type); err != nil { + return nil, fmt.Errorf("failed to read Type: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.CreateTime); err != nil { + return nil, fmt.Errorf("failed to read Create Time: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.TermId); err != nil { + return nil, fmt.Errorf("failed to read Term Id: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.LogicId); err != nil { + return nil, fmt.Errorf("failed to read Logic Id: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.FileNum); err != nil { + return nil, fmt.Errorf("failed to read File Num: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.Offset); err != nil { + return nil, fmt.Errorf("failed to read Offset: %v", err) + } + if err := binary.Read(reader, binary.LittleEndian, &binlogItem.ContentLength); err != nil { + return nil, fmt.Errorf("failed to read Content Length: %v", err) + } + + contentLength := int(binlogItem.ContentLength) + if len(data) < 34+contentLength { + return nil, fmt.Errorf("data length is too short for content") + } + + binlogItem.Content = make([]byte, contentLength) + if _, err := reader.Read(binlogItem.Content); err != nil { + return nil, fmt.Errorf("failed to read Content: %v", err) + } + + return binlogItem, nil +} diff --git a/tools/pika_cdc/pika/server.go b/tools/pika_cdc/pika/server.go new file mode 100644 index 0000000000..280f356392 --- /dev/null +++ b/tools/pika_cdc/pika/server.go @@ -0,0 +1,100 @@ +package pika + +import ( + "bufio" + "github.com/sirupsen/logrus" + "net" + "strconv" + "strings" + "time" +) + +type Server struct { + stop chan bool + pikaConn net.Conn + pikaAddr string + bufferMsgNumber int + MsgChanns map[string]chan []byte + pikaReplProtocol ReplProtocol + writer *bufio.Writer + reader *bufio.Reader +} + +// Use todo(leehao): middleware can be added here in the future +func Use() { + +} +func getPort(addr string) int32 { + portStr := addr[strings.LastIndex(addr, ":")+1:] + port, _ := strconv.Atoi(portStr) + return int32(port) +} +func getIP(addr string) string { + index := strings.LastIndex(addr, ":") + if index == -1 { + return addr + } + return addr[:index] +} + +func New(s string, bufferMsgNumber int) (Server, error) { + server := Server{} + server.MsgChanns = make(map[string]chan []byte) + conn, err := net.Dial("tcp", s) + if err != nil { + logrus.Fatal("Error connecting to Pika server:", err) + } + server.bufferMsgNumber = bufferMsgNumber + server.pikaConn = conn + server.writer = bufio.NewWriter(server.pikaConn) + server.reader = bufio.NewReader(server.pikaConn) + server.pikaReplProtocol = ReplProtocol{ + writer: server.writer, + reader: server.reader, + ip: getIP(conn.LocalAddr().String()), + port: getPort(conn.LocalAddr().String()), + } + err = server.CreateSyncWithPika() + server.buildMsgChann() + return server, err +} +func (s *Server) buildMsgChann() { + dbMetaInfo := s.pikaReplProtocol.dbMetaInfo + for _, dbInfo := range dbMetaInfo.DbsInfo { + s.MsgChanns[*dbInfo.DbName] = make(chan []byte, s.bufferMsgNumber) + } +} + +// Run This method will block execution until an error occurs +func (s *Server) Run() { + for { + select { + case <-s.stop: + return + case <-time.After(100 * time.Millisecond): + binlogBytes, _ := s.pikaReplProtocol.GetBinlogSync() + if len(binlogBytes) != 0 { + for dbName, binlog := range binlogBytes { + chann, exists := s.MsgChanns[dbName] + if !exists { + chann = make(chan []byte, s.bufferMsgNumber) + s.MsgChanns[dbName] = chann + } + chann <- binlog + } + } + } + } +} + +func (s *Server) Exit() { + s.stop <- true + close(s.stop) + for _, chann := range s.MsgChanns { + close(chann) + } +} + +func (s *Server) CreateSyncWithPika() error { + return s.pikaReplProtocol.GetSyncWithPika() +}