Skip to content

Commit

Permalink
VReplication: recover from closed connection (#17249)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Nov 20, 2024
1 parent 626b16f commit 2c6e053
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 3 deletions.
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DBClient interface {
Commit() error
Rollback() error
Close()
IsClosed() bool
ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error)
SupportsCapability(capability capabilities.FlavorCapability) (bool, error)
Expand Down Expand Up @@ -125,6 +126,10 @@ func (dc *dbClientImpl) Close() {
dc.dbConn.Close()
}

func (dc *dbClientImpl) IsClosed() bool {
return dc.dbConn.IsClosed()
}

func (dc *dbClientImpl) SupportsCapability(capability capabilities.FlavorCapability) (bool, error) {
return dc.dbConn.SupportsCapability(capability)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (dc *fakeDBClient) Rollback() error {
func (dc *fakeDBClient) Close() {
}

func (dc *fakeDBClient) IsClosed() bool {
return false
}

func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
query = strings.ToLower(query)
switch {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (dc *MockDBClient) Rollback() error {
func (dc *MockDBClient) Close() {
}

func (dc *MockDBClient) IsClosed() bool {
return false
}

// ExecuteFetch is part of the DBClient interface
func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
// Serialize ExecuteFetch to enforce a strict order on shared dbClients.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ func (dbc *realDBClient) Close() {
dbc.conn.Close()
}

func (dbc *realDBClient) IsClosed() bool {
return dbc.conn.IsClosed()
}

func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
// Use Clone() because the contents of memory region referenced by
// string can change when clients (e.g. vcopier) use unsafe string methods.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ func (dbc *realDBClient) Close() {
dbc.conn.Close()
}

func (dbc *realDBClient) IsClosed() bool {
return dbc.conn.IsClosed()
}

func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
// Use Clone() because the contents of memory region referenced by
// string can change when clients (e.g. vcopier) use unsafe string methods.
Expand Down
13 changes: 10 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,18 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
// code.
func (vr *vreplicator) Replicate(ctx context.Context) error {
err := vr.replicate(ctx)
if err != nil {
if err := vr.setMessage(err.Error()); err != nil {
binlogplayer.LogError("Failed to set error state", err)
if err == nil {
return nil
}
if vr.dbClient.IsClosed() {
// Connection was possible terminated by the server. We should renew it.
if cerr := vr.dbClient.Connect(); cerr != nil {
return vterrors.Wrapf(err, "failed to reconnect to the database: %v", cerr)
}
}
if err := vr.setMessage(err.Error()); err != nil {
binlogplayer.LogError("Failed to set error state", err)
}
return err
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (dc *fakeDBClient) Rollback() error {
func (dc *fakeDBClient) Close() {
}

func (dc *fakeDBClient) IsClosed() bool {
return false
}

// ExecuteFetch is part of the DBClient interface
func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
dc.mu.Lock()
Expand Down

0 comments on commit 2c6e053

Please sign in to comment.