Skip to content

Commit

Permalink
chore(pika_cdc): correct the code format
Browse files Browse the repository at this point in the history
Signed-off-by: LeeHao <1838249551@qq.com>
  • Loading branch information
ForestLH committed Oct 21, 2024
1 parent 8108709 commit d493b7f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 19 deletions.
4 changes: 3 additions & 1 deletion tools/pika_cdc/conf/cdc.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# pika_server, this port is pika replication port.
pika_server : 127.0.0.1:11221
pika_repl_server : 127.0.0.1:11221
# pika_server, this port is pika redis client port.
pika_client_server : 127.0.0.1:9221
# For data from one DB of one pika, a separate MQ topic is created,
# and the name of the topic is the dbname of the pika
kafka_servers:
Expand Down
3 changes: 2 additions & 1 deletion tools/pika_cdc/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type PikaCdcConfig struct {
PikaServer string `yaml:"pika_server"`
PikaReplServer string `yaml:"pika_repl_server"`
PikaClientServer string `yaml:"pika_client_server"`
KafkaServers []string `yaml:"kafka_servers"`
RedisServers []string `yaml:"redis_servers"`
Retries int `yaml:"retries"`
Expand Down
3 changes: 1 addition & 2 deletions tools/pika_cdc/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Ka
conn, err := kafka.DialLeader(context.Background(), "tcp", server, dbname, 0)
if err != nil {
return k, err
} else {
k.kafkaConns[dbname] = conn
}
k.kafkaConns[dbname] = conn
k.msgChanns[dbname] = chann
}
k.stopChan = make(chan bool)
Expand Down
2 changes: 1 addition & 1 deletion tools/pika_cdc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func main() {
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, conf.ConfigInstance.BufferMsgNumbers); err != nil {
if pikaServer, err := pika.New(conf.ConfigInstance.PikaReplServer, conf.ConfigInstance.BufferMsgNumbers); err != nil {
logrus.Fatal("failed to connect pika server, {}", err)
} else {
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChanns); err != nil {
Expand Down
26 changes: 12 additions & 14 deletions tools/pika_cdc/pika/replprotocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
"os"
"pika_cdc/conf"
"pika_cdc/pika/proto/inner"
"strconv"
"testing"
"time"
)

func TestConnect(t *testing.T) {
cxt := context.Background()
addr := "127.0.0.1:9221"

addr := conf.ConfigInstance.PikaClientServer
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: "", // no password set
Expand All @@ -31,14 +31,13 @@ func TestConnect(t *testing.T) {
}

func TestSendMetaSync(t *testing.T) {
ip := string("127.0.0.1")
ip := "127.0.0.1"
listener, e := net.Listen("tcp", ":0")
if e != nil {
os.Exit(1)
}
selfPort := getPort(listener.Addr().String())
var masterPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000
addr := ip + ":" + strconv.Itoa(int(masterPort))
addr := conf.ConfigInstance.PikaReplServer
tt := inner.Type_kMetaSync
request := inner.InnerRequest{
Type: &tt,
Expand Down Expand Up @@ -116,9 +115,7 @@ func getResponse(conn net.Conn) *inner.InnerResponse {

func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) {
if conn == nil {
ip := string("127.0.0.1")
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000
addr := ip + ":" + strconv.Itoa(int(masterReplPort))
addr := conf.ConfigInstance.PikaReplServer
newConn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -141,9 +138,7 @@ func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) {

func sendMetaSyncRequest(conn net.Conn) (net.Conn, error) {
if conn == nil {
ip := string("127.0.0.1")
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000
addr := ip + ":" + strconv.Itoa(int(masterReplPort))
addr := conf.ConfigInstance.PikaReplServer
newConn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -166,7 +161,7 @@ func sendMetaSyncRequest(conn net.Conn) (net.Conn, error) {
}

func TestGetOffsetFromMaster(t *testing.T) {
ip := string("127.0.0.1")
ip := "127.0.0.1"
listener, e := net.Listen("tcp", ":0")
if e != nil {
os.Exit(1)
Expand Down Expand Up @@ -217,7 +212,7 @@ func TestGetOffsetFromMaster(t *testing.T) {
}

func TestSendDbSyncReqMsg(t *testing.T) {
ip := string("127.0.0.1")
ip := "127.0.0.1"
listener, e := net.Listen("tcp", ":0")
if e != nil {
os.Exit(1)
Expand Down Expand Up @@ -279,7 +274,7 @@ func BuildInternalTag(resp []byte) (tag string) {
return string(buf)
}

// CustomData 解析后的自定义数据结构
// Pika Binlog Item
type BinlogItem struct {
Type uint16
CreateTime uint32
Expand All @@ -291,6 +286,8 @@ type BinlogItem struct {
Content []byte
}

// Test incremental synchronization of data
// pika -> redis
func TestGetIncrementalSync(t *testing.T) {
conn, err := sendMetaSyncRequest(nil)
if err != nil {
Expand Down Expand Up @@ -391,6 +388,7 @@ func TestGetIncrementalSync(t *testing.T) {
}
}()
}
// never stop as a backend service
for {
}
}
Expand Down

0 comments on commit d493b7f

Please sign in to comment.