Skip to content

Commit

Permalink
feat(service): cancel tcp check on game errors
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Becker <sebastian.becker@de.bosch.com>
  • Loading branch information
sbckr committed Nov 21, 2024
1 parent 57e6b35 commit 6767663
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 17 deletions.
3 changes: 2 additions & 1 deletion pkg/discovery/game.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 - for information on the respective copyright owner
// Copyright (c) 2021-2024 - for information on the respective copyright owner
// see the NOTICE file and/or the repository https://github.com/carbynestack/ephemeral.
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -79,6 +79,7 @@ func NewGame(ctx context.Context, id string, bus mb.MessageBus, stateTimeout tim
fsm.WhenIn(Playing).GotEvent(GameFinishedWithError).GoTo(GameError),
fsm.WhenIn(Playing).GotEvent(GameSuccess).GoTo(GameDone),
fsm.WhenIn(Playing).GotEvent(GameError).GoTo(GameError),
fsm.WhenInAnyState().GotEvent(GameFinishedWithError).GoTo(GameError),
fsm.WhenInAnyState().GotEvent(StateTimeoutError).GoTo(GameError),
fsm.WhenInAnyState().GotEvent(GameDone).GoTo(GameDone),
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ephemeral/network/proxy.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 - for information on the respective copyright owner
// Copyright (c) 2021-2024 - for information on the respective copyright owner
// see the NOTICE file and/or the repository https://github.com/carbynestack/ephemeral.
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -110,7 +110,7 @@ func (p *Proxy) checkConnectionToPeers() error {
proxyEntry := proxyEntry
waitGroup.Add(1)
go func() {
err := p.checkTCPConnectionToPeer(proxyEntry)
err := p.checkTCPConnectionToPeer(p.ctx.Context, proxyEntry)
defer waitGroup.Done()
if err != nil {
errorsCheckingConnection = append(errorsCheckingConnection, err)
Expand Down Expand Up @@ -142,9 +142,9 @@ func (p *Proxy) addProxyEntry(config *ProxyConfig) *PingAwareTarget {
return pat
}

func (p *Proxy) checkTCPConnectionToPeer(config *ProxyConfig) error {
func (p *Proxy) checkTCPConnectionToPeer(ctx context.Context, config *ProxyConfig) error {
p.logger.Info(fmt.Sprintf("Checking if connection to peer works for config: %s", config))
err := p.tcpChecker.Verify(config.Host, config.Port)
err := p.tcpChecker.Verify(ctx, config.Host, config.Port)
if err != nil {
return fmt.Errorf("error checking connection to the peer '%s:%s': %s", config.Host, config.Port, err)
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/ephemeral/network/tcpchecker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) 2021 - for information on the respective copyright owner
// Copyright (c) 2021-2024 - for information on the respective copyright owner
// see the NOTICE file and/or the repository https://github.com/carbynestack/ephemeral.
//
// SPDX-License-Identifier: Apache-2.0
package network

import (
"context"
"fmt"
"io"
"net"
Expand All @@ -15,15 +16,15 @@ import (

// NetworkChecker verifies the network connectivity between the players before starting the computation.
type NetworkChecker interface {
Verify(string, string) error
Verify(context.Context, string, string) error
}

// NoopChecker verifies the network for all MPC players is in place.
type NoopChecker struct {
}

// Verify checks network connectivity between the players and communicates its results to discovery and players FSM.
func (t *NoopChecker) Verify(host, port string) error {
func (t *NoopChecker) Verify(context.Context, string, string) error {
return nil
}

Expand All @@ -48,10 +49,12 @@ type TCPChecker struct {
}

// Verify checks network connectivity between the players and communicates its results to discovery and players FSM.
func (t *TCPChecker) Verify(host, port string) error {
func (t *TCPChecker) Verify(ctx context.Context, host, port string) error {
done := time.After(t.conf.RetryTimeout)
for {
select {
case <-ctx.Done():
return fmt.Errorf("TCPCheck for '%s:%s' aborted after %d attempts", host, port, t.retries)
case <-done:
return fmt.Errorf("TCPCheck for '%s:%s' failed after %s and %d attempts", host, port, t.conf.RetryTimeout.String(), t.retries)
default:
Expand Down
25 changes: 20 additions & 5 deletions pkg/ephemeral/network/tcpchecker_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) 2021 - for information on the respective copyright owner
// Copyright (c) 2021-2024 - for information on the respective copyright owner
// see the NOTICE file and/or the repository https://github.com/carbynestack/ephemeral.
//
// SPDX-License-Identifier: Apache-2.0
package network

import (
"context"
"io"
"net"
"sync"
Expand Down Expand Up @@ -48,7 +49,7 @@ var _ = Describe("TcpChecker", func() {
Logger: zap.NewNop().Sugar(),
}
checker := NewTCPChecker(conf)
err := checker.Verify(host, port)
err := checker.Verify(context.TODO(), host, port)
Expect(err).NotTo(HaveOccurred())
wg.Wait()
})
Expand All @@ -59,7 +60,7 @@ var _ = Describe("TcpChecker", func() {
Logger: zap.NewNop().Sugar(),
}
checker := NewTCPChecker(conf)
err := checker.Verify(host, port)
err := checker.Verify(context.TODO(), host, port)
Expect(err).To(HaveOccurred())
})
It("returns an error if dialing succeeds but the connection is closed down shortly", func() {
Expand Down Expand Up @@ -87,7 +88,7 @@ var _ = Describe("TcpChecker", func() {
Logger: zap.NewNop().Sugar(),
}
checker := NewTCPChecker(conf)
err := checker.Verify(host, port)
err := checker.Verify(context.TODO(), host, port)
Expect(err).To(HaveOccurred())
Expect(checker.retries > 1).To(BeTrue())
wg.Wait()
Expand All @@ -100,8 +101,22 @@ var _ = Describe("TcpChecker", func() {
Logger: zap.NewNop().Sugar(),
}
checker := NewTCPChecker(conf)
err := checker.Verify(host, port)
err := checker.Verify(context.TODO(), host, port)
Expect(err).To(HaveOccurred())
Expect(checker.retries > 1).To(BeTrue())
})
It("aborts if context is closed", func() {
ctx, cancel := context.WithCancel(context.TODO())
cancel()
conf := &TCPCheckerConf{
DialTimeout: 50 * time.Millisecond,
RetryTimeout: 100 * time.Millisecond,
Logger: zap.NewNop().Sugar(),
}
checker := NewTCPChecker(conf)
err := checker.Verify(ctx, host, port)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("TCPCheck for 'localhost:9999' aborted after 0 attempts"))
Expect(checker.retries == 0).To(BeTrue())
})
})
5 changes: 2 additions & 3 deletions pkg/ephemeral/player.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 - for information on the respective copyright owner
// Copyright (c) 2021-2024 - for information on the respective copyright owner
// see the NOTICE file and/or the repository https://github.com/carbynestack/ephemeral.
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -49,8 +49,7 @@ func NewPlayer(ctx context.Context, bus mb.MessageBus, stateTimeout time.Duratio
fsm.WhenIn(Init).GotEvent(Register).GoTo(Registering),
fsm.WhenIn(Registering).GotEvent(PlayersReady).GoTo(Playing).WithTimeout(computationTimeout),
fsm.WhenIn(Playing).GotEvent(PlayerFinishedWithSuccess).GoTo(PlayerFinishedWithSuccess),
fsm.WhenIn(Playing).GotEvent(PlayingError).GoTo(PlayerFinishedWithError),
fsm.WhenInAnyState().GotEvent(GameError).GoTo(PlayerFinishedWithError),
fsm.WhenInAnyState().GotEvent(PlayingError).GoTo(PlayerFinishedWithError),
fsm.WhenInAnyState().GotEvent(PlayerDone).GoTo(PlayerDone),
fsm.WhenInAnyState().GotEvent(StateTimeoutError).GoTo(PlayerFinishedWithError),
}
Expand Down

0 comments on commit 6767663

Please sign in to comment.