Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

canal-adapter从kafka消费数据一定要配置源数据库信息吗 #5314

Open
hcb1987 opened this issue Oct 31, 2024 · 4 comments
Open

canal-adapter从kafka消费数据一定要配置源数据库信息吗 #5314

hcb1987 opened this issue Oct 31, 2024 · 4 comments

Comments

@hcb1987
Copy link

hcb1987 commented Oct 31, 2024

Question

我有一个需求就是两个不同网络的mysql需要同步数据,如果要做主从就需要把mysql端口映射到外网,我不想把数据库映射出来,想通过mysql同步至canal-deployer,canal-deployer同步kafka,canal-adapter消费同步至另外一台mysql这种方式同步,canal-adapter部署再从机mysql网络,和源MySQL网络不通,canal-adapter一定要配置源数据库的连接信息吗?
我的配置如下,这样不能同步吗
`server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 192.168.126.129:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 192.168.126.129:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

canalAdapters:

  • instance: example # canal instance Name or mq topic name
    groups:
    • groupId: g1
      outerAdapters:
      • name: logger

      • name: rdb
        key: mysql1
        properties:
        jdbc.driverClassName: com.mysql.jdbc.Driver
        jdbc.url: jdbc:mysql://127.0.0.1:3306/hucb?useUnicode=true
        jdbc.username: hucb
        jdbc.password: 123456
        druid.stat.enable: false
        druid.stat.slowSqlMillis: 1000`

启动后一直有一个告警
2024-10-30 07:37:15.011 [Thread-3] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=c9f4a0, groupId=g1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 2024-10-30 07:37:15.985 [Thread-3] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=c9f4a0, groupId=g1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 2024-10-30 07:37:16.933 [Thread-3] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=c9f4a0, groupId=g1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
我配置了远端的kafka地址怎么还要读本地的kafka呢,这个要怎么配置

@callmedba
Copy link

callmedba commented Oct 31, 2024

试试在 canal-deployer或者kafka资源所在节点部署一个 otter 是不是就可以干这个事情 ?

https://github.com/alibaba/otter

@hcb1987
Copy link
Author

hcb1987 commented Oct 31, 2024

之前kafka的报错解决了 是因为kafka的监听地址 没有配置 ,配置了ip地址之后可以连接了,但是现在报了个新的错就是再canal-adapter同步数据的时候报错如下:
2024-10-30 12:13:04.238 [Thread-3] INFO o.a.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=6a4270, groupId=g1] Successfully joined group with generation 4
2024-10-30 12:13:04.242 [Thread-3] INFO o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=6a4270, groupId=g1] Adding newly assigned partitions: example-0
2024-10-30 12:13:04.264 [Thread-3] INFO o.a.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=6a4270, groupId=g1] Setting offset for partition example-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.126.129:9092 (id: 0 rack: null), epoch=-1}}
2024-10-30 12:13:05.057 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0
2024-10-30 12:13:05.077 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 *6
binlog.000005�� *UTF-80�ٽ�28BJPK �j
binlog.000005�� *UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)�
binlog.000005�� *UTF-80�ٽ�28BhucbJt1P7Xb
rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9
binlog.000005�� *UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 1
XshellXshellXshellXshell2024-10-30 12:13:05.600 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0
2024-10-30 12:13:05.621 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 *6
binlog.000005�� *UTF-80�ٽ�28BJPK �j
binlog.000005�� *UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)�
binlog.000005�� *UTF-80�ٽ�28BhucbJt1P7Xb
rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9
binlog.000005�� *UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 2
XshellXshellXshellXshell2024-10-30 12:13:06.130 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0
2024-10-30 12:13:06.132 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 *6
binlog.000005�� *UTF-80�ٽ�28BJPK �j
binlog.000005�� *UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)�
binlog.000005�� *UTF-80�ٽ�28BhucbJt1P7Xb
rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9
binlog.000005�� *UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 3
XshellXshellXshellXshell2024-10-30 12:13:06.652 [Thread-3] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=6a4270, groupId=g1] Seeking to offset 1 for partition example-0
2024-10-30 12:13:06.656 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - illegal fieldName input, offset 1, character , line 1, column 2, fastjson-version 2.0.31 *6
binlog.000005�� *UTF-80�ٽ�28BJPK �j
binlog.000005�� *UTF-80�ٽ�28BJt1PGX3Z/insert into t1(id) values (1),(2),(3),(34)�
binlog.000005�� *UTF-80�ٽ�28BhucbJt1P7Xb
rowsCount4�Pid (0B1Rintid (0B2Rintid (0B3Rintid (0B34Rint9
binlog.000005�� *UTF-80�ٽ�28BJP4410 Error sync and rollback, execute times: 4

@hcb1987
Copy link
Author

hcb1987 commented Oct 31, 2024

@callmedba otter解决不了这个问题吧 两个网络之间只有kafka是通的 只能通过kafka去传输数据,otter不能读取kafka的数据吧

@hcb1987
Copy link
Author

hcb1987 commented Oct 31, 2024

rdb配置文件如下:

Mirror schema synchronize config

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
mirrorDb: true
database: hucb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants