From bb4f3cf89c31f44eb04837e2bcf0803f09284952 Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Fri, 15 Nov 2024 20:08:08 +0800 Subject: [PATCH] This is an automated cherry-pick of #11733 Signed-off-by: ti-chi-bot --- cdc/puller/ddl_puller.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 2357e87ee2c..66147d0b1e9 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -80,7 +80,6 @@ type ddlJobPullerImpl struct { kvStorage tidbkv.Storage schemaStorage entry.SchemaStorage resolvedTs uint64 - schemaVersion int64 filter filter.Filter // ddlJobsTable is initialized when receive the first concurrent DDL job. // It holds the info of table `tidb_ddl_jobs` of upstream TiDB. @@ -375,6 +374,23 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { return false, nil } +<<<<<<< HEAD +======= + if job.BinlogInfo.FinishedTS <= p.getResolvedTs() { + log.Info("ddl job finishedTs less than puller resolvedTs,"+ + "discard the ddl job", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("query", job.Query), + zap.Uint64("pullerResolvedTs", p.getResolvedTs())) + return true, nil + } + +>>>>>>> b38183b086 (ddl_puller.go(ticdc): fix DDLs are ignored when schema versions are out of order (#11733)) defer func() { if skip && err == nil { log.Info("ddl job schema or table does not match, discard it", @@ -486,7 +502,6 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { } p.setResolvedTs(job.BinlogInfo.FinishedTS) - p.schemaVersion = job.BinlogInfo.SchemaVersion return p.checkIneligibleTableDDL(snap, job) }