Skip to content

Commit

Permalink
feat: pika cdc for incremental synchronization (ospp 2024) (#2855)
Browse files Browse the repository at this point in the history
* feat-cdc:initial docking Kafka

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:add pika repl send proxy test

Signed-off-by: LeeHao <1838249551@qq.com>

feat-cdc:add pika repl send proxy test

Signed-off-by: LeeHao <1838249551@qq.com>

feat-cdc:add pika repl send proxy test

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:add send metaSync test

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:add send binlogSync test

Basic incremental synchronization to redis has been completed

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:Basic incremental synchronization to redis has been completed

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:Use Makefile to build pika cdc

Signed-off-by: LeeHao <1838249551@qq.com>

a

Signed-off-by: LeeHao <1838249551@qq.com>

feat-cdc:Use Makefile to build pika cdc

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:remove track of pb file

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:use multi chann communication between consumer and pika master

Signed-off-by: LeeHao <1838249551@qq.com>

* feat-cdc:add consumer kafka

Signed-off-by: LeeHao <1838249551@qq.com>

* chore(pika_cdc): remove some redundant code

Signed-off-by: LeeHao <1838249551@qq.com>

* chore(pika_cdc): correct the code format

Signed-off-by: LeeHao <1838249551@qq.com>

---------

Signed-off-by: LeeHao <1838249551@qq.com>
  • Loading branch information
ForestLH authored Oct 26, 2024
1 parent 218b68b commit 995cd77
Show file tree
Hide file tree
Showing 21 changed files with 1,462 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ pkg
!codis/cmd/fe/assets/**

tests/tmp
tools/pika_cdc/pika/proto
2 changes: 2 additions & 0 deletions src/pika_inner_message.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
syntax = "proto2";
package InnerMessage;

option go_package = "./proto/inner";

enum Type {
kMetaSync = 1;
kTrySync = 2;
Expand Down
2 changes: 2 additions & 0 deletions src/rsync_service.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
syntax = "proto2";
package RsyncService;

option go_package = "./proto/rsync";

enum Type {
kRsyncMeta = 1;
kRsyncFile = 2;
Expand Down
1 change: 1 addition & 0 deletions third/blackwidow
Submodule blackwidow added at 904475
1 change: 1 addition & 0 deletions third/glog
Submodule glog added at ecdbd7
1 change: 1 addition & 0 deletions third/pink
Submodule pink added at 60ac6c
26 changes: 26 additions & 0 deletions tools/pika_cdc/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
PROTO_OUT = ./pika
PROTO_DIR = ../../src/

GO_FILES = $(shell find . -name '*.go')

PROTO_FILES := $(wildcard $(PROTO_DIR)/*.proto)

OUTPUT_BIN = pika_cdc

PROTOC = protoc
GO_BUILD = go build
GO_CLEAN = go clean

.PHONY: all proto build clean

all: proto build

proto: $(PROTO_FILES)
$(PROTOC) --proto_path=$(PROTO_DIR) --go_out=$(PROTO_OUT) $^

build: $(GO_FILES)
$(GO_BUILD) -o $(OUTPUT_BIN)

clean:
$(GO_CLEAN)
rm -f $(OUTPUT_BIN)
24 changes: 24 additions & 0 deletions tools/pika_cdc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Pika cdc
**A tool for incremental synchronization of pika command**

By imitating a pika slave

# Build
**Make sure the system has protoc installed**
```bash
brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
```

## Build pika cdc
```bash
make
```

## Todo:

Consumer side:
- [x] **redis**
- [x] **kafka** Create a topic of the same name for each pika's DB
- [ ] **bifrost**
20 changes: 20 additions & 0 deletions tools/pika_cdc/conf/cdc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# pika_server, this port is pika replication port.
pika_repl_server : 127.0.0.1:11221
# pika_server, this port is pika redis client port.
pika_client_server : 127.0.0.1:9221
# For data from one DB of one pika, a separate MQ topic is created,
# and the name of the topic is the dbname of the pika
kafka_servers:
- 127.0.0.1:9092
redis_servers:
- 127.0.0.1:6379
# retry times while send message failed
retries : 0
# retry interval while send message failed(ms)
retry_interval: 10
parallel_thread_size: 1
# the size of the cached channel in pika cdc
buffer_msg_numbers: 10



50 changes: 50 additions & 0 deletions tools/pika_cdc/conf/conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package conf

import (
"fmt"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"io/ioutil"
"log"
"os"
"path"
"path/filepath"
"runtime"
)

type PikaCdcConfig struct {
PikaReplServer string `yaml:"pika_repl_server"`
PikaClientServer string `yaml:"pika_client_server"`
KafkaServers []string `yaml:"kafka_servers"`
RedisServers []string `yaml:"redis_servers"`
Retries int `yaml:"retries"`
RetryInterval int `yaml:"retry_interval"`
ParallelThreadSize int `yaml:"parallel_thread_size"`
BufferMsgNumbers int `yaml:"buffer_msg_numbers"`
}

var ConfigInstance = PikaCdcConfig{}

func init() {
_, filename, _, _ := runtime.Caller(0)
filename = filepath.Join(filepath.Dir(filename), "cdc.yml")
file, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatal("fail to read file:", err)
}

err = yaml.Unmarshal(file, &ConfigInstance)
if err != nil {
log.Fatal("fail to yaml unmarshal:", err)
}

logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
return "", fmt.Sprintf("%s:%d", path.Base(f.File), f.Line)
},
})

logrus.SetReportCaller(true)
logrus.SetOutput(os.Stdout)
}
32 changes: 32 additions & 0 deletions tools/pika_cdc/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package consumer

import (
"pika_cdc/conf"
)

type Consumer interface {
SendCmdMessage(dbName string, msg []byte) error
Name() string
Close() error
Run()
Stop()
}

type Factory struct{}

func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
var consumers []Consumer

// kafka
for _, k := range config.KafkaServers {
kafka, _ := NewKafka(k, config.Retries, msgChanns)
consumers = append(consumers, kafka)
}

// redis
for _, r := range config.RedisServers {
newRedis, _ := NewRedis(r, msgChanns)
consumers = append(consumers, newRedis)
}
return consumers, nil
}
76 changes: 76 additions & 0 deletions tools/pika_cdc/consumer/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package consumer

import (
"context"
"github.com/segmentio/kafka-go"
"sync"
)

type Kafka struct {
servers string
topics []string
retries int
kafkaConns map[string]*kafka.Conn
wg sync.WaitGroup
msgChanns map[string]chan []byte
stopChan chan bool
protocol KafkaProtocol
}

func (k *Kafka) SendCmdMessage(dbName string, msg []byte) error {
k.kafkaConns[dbName].Write(k.protocol.ToConsumer(msg))
return nil
}

func (k *Kafka) Name() string {
return "Kafka"
}

func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) {
k := &Kafka{}
k.protocol = KafkaProtocol{}
k.kafkaConns = make(map[string]*kafka.Conn)
k.msgChanns = make(map[string]chan []byte)
for dbname, chann := range msgChanns {
conn, err := kafka.DialLeader(context.Background(), "tcp", server, dbname, 0)
if err != nil {
return k, err
}
k.kafkaConns[dbname] = conn
k.msgChanns[dbname] = chann
}
k.stopChan = make(chan bool)
k.retries = retries
return k, nil
}

func (k *Kafka) Close() error {
k.Stop()
for _, conn := range k.kafkaConns {
if err := conn.Close(); err != nil {
return err
}
}
return nil
}
func (k *Kafka) Run() {
var wg sync.WaitGroup
for dbName, chann := range k.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
k.SendCmdMessage(dbName, msg)
case <-k.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
func (k *Kafka) Stop() {
k.stopChan <- true
}
27 changes: 27 additions & 0 deletions tools/pika_cdc/consumer/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package consumer

import (
"fmt"
"strconv"
)

type Protocol interface {
ToConsumer(msg []byte) []byte
}
type RedisProtocol struct{}

func (rp RedisProtocol) ToConsumer(msg []byte) []byte {
return msg
}
func (rp RedisProtocol) Select(dbName string) []byte {
db, _ := strconv.Atoi(dbName[len(dbName)-1:])
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg)
}

type KafkaProtocol struct{}

func (kp KafkaProtocol) ToConsumer(msg []byte) []byte {
return msg
}
80 changes: 80 additions & 0 deletions tools/pika_cdc/consumer/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package consumer

import (
"bufio"
"fmt"
"net"
"sync"
"time"
)

type Redis struct {
redisProtocol RedisProtocol
conns map[string]net.Conn
msgChanns map[string]chan []byte
stopChan chan bool
}

func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) {
r := &Redis{redisProtocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)}
var err error
for dbName, _ := range msgChanns {
r.conns[dbName], err = net.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
}
selectCmdBytes := r.redisProtocol.Select(dbName)
r.conns[dbName].Write(selectCmdBytes)
}
return r, nil
}

func (r *Redis) SendCmdMessage(dbName string, msg []byte) error {
_, err := r.sendRedisData(dbName, msg)
return err
}

func (r *Redis) Name() string {
return string("Redis")
}
func (r *Redis) Close() error {
for _, conn := range r.conns {
conn.Close()
}
return nil
}

func (r *Redis) sendRedisData(dbName string, data []byte) (string, error) {
r.conns[dbName].SetDeadline(time.Now().Add(5 * time.Second))
_, err := r.conns[dbName].Write(data)
if err != nil {
return "", fmt.Errorf("failed to send data to Redis server: %v", err)
}
reader := bufio.NewReader(r.conns[dbName])
response, err := reader.ReadString('\n')
if err != nil {
return "", fmt.Errorf("failed to read response from Redis server: %v", err)
}
return response, nil
}
func (r *Redis) Run() {
var wg sync.WaitGroup
for dbName, chann := range r.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
r.sendRedisData(dbName, msg)
case <-r.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
func (r *Redis) Stop() {
r.stopChan <- true
}
Loading

0 comments on commit 995cd77

Please sign in to comment.