Skip to content

Commit

Permalink
chore(broadcast): made the linter happy, I hope ☺
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksander-vedvik committed Jun 11, 2024
1 parent 478f1d1 commit c634b01
Show file tree
Hide file tree
Showing 24 changed files with 152 additions and 249 deletions.
22 changes: 17 additions & 5 deletions authentication/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestAuthentication(t *testing.T) {
ec := New(elliptic.P256())
ec.GenerateKeys()
_ = ec.GenerateKeys()
err := ec.test()
if err != nil {
t.Error(err)
Expand All @@ -18,10 +18,16 @@ func TestAuthentication(t *testing.T) {

func TestSignAndVerify(t *testing.T) {
ec1 := New(elliptic.P256())
ec1.GenerateKeys()
err := ec1.GenerateKeys()
if err != nil {
t.Fatal(err)
}

ec2 := New(elliptic.P256())
ec2.GenerateKeys()
err = ec2.GenerateKeys()
if err != nil {
t.Fatal(err)
}

message := "This is a message"

Expand Down Expand Up @@ -53,10 +59,16 @@ func TestSignAndVerify(t *testing.T) {

func TestVerifyWithWrongPubKey(t *testing.T) {
ec1 := New(elliptic.P256())
ec1.GenerateKeys()
err := ec1.GenerateKeys()
if err != nil {
t.Fatal(err)
}

ec2 := New(elliptic.P256())
ec2.GenerateKeys()
err = ec2.GenerateKeys()
if err != nil {
t.Fatal(err)
}

message := "This is a message"
encodedMsg1, err := ec1.EncodeMsg(message)
Expand Down
1 change: 0 additions & 1 deletion broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func NewAuth(curve elliptic.Curve) *authentication.EllipticCurve {
}

type broadcastServer struct {
propertiesMutex sync.Mutex
viewMutex sync.RWMutex
id uint32
addr string
Expand Down
5 changes: 3 additions & 2 deletions broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ func (mgr *manager) Done(broadcastID uint64, enqueueBroadcast func(*Msg) error)
BroadcastID: broadcastID,
}
if enqueueBroadcast != nil {
enqueueBroadcast(msg)
// no need to check error because the processor
// is stopped.
_ = enqueueBroadcast(msg)
return
}
_, shardID, _, _ := DecodeBroadcastID(broadcastID)
shardID = shardID % NumShards
shard := mgr.state.getShard(shardID)
shard.handleBMsg(msg)
return
}

func (mgr *manager) NewBroadcastID() uint64 {
Expand Down
17 changes: 7 additions & 10 deletions broadcast/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func (p *BroadcastProcessor) handleCancellation(bMsg *Msg, metadata *metadata) b
if !metadata.SentCancellation {
p.log("broadcast: sent cancellation", nil, logging.MsgType(bMsg.MsgType.String()), logging.Stopping(false))
metadata.SentCancellation = true
go p.router.Send(p.broadcastID, "", "", bMsg.Cancellation)
go func(broadcastID uint64, cancellationMsg *cancellation) {
_ = p.router.Send(broadcastID, "", "", cancellationMsg)
}(p.broadcastID, bMsg.Cancellation)
}
return false
}
Expand All @@ -123,7 +125,10 @@ func (p *BroadcastProcessor) handleBroadcast(bMsg *Msg, methods []string, metada
func (p *BroadcastProcessor) handleReply(bMsg *Msg, metadata *metadata) bool {
// BroadcastCall if origin addr is non-empty.
if metadata.isBroadcastCall() {
go p.router.Send(p.broadcastID, metadata.OriginAddr, metadata.OriginMethod, bMsg.Reply)
go func(broadcastID uint64, originAddr, originMethod string, replyMsg *reply) {
err := p.router.Send(broadcastID, originAddr, originMethod, replyMsg)
p.log("broadcast: sent reply to client", err, logging.Method(originMethod), logging.MsgType(bMsg.MsgType.String()), logging.Stopping(true), logging.IsBroadcastCall(metadata.isBroadcastCall()))
}(p.broadcastID, metadata.OriginAddr, metadata.OriginMethod, bMsg.Reply)
// the request is done becuase we have sent a reply to the client
p.log("broadcast: sending reply to client", nil, logging.Method(metadata.OriginMethod), logging.MsgType(bMsg.MsgType.String()), logging.Stopping(true), logging.IsBroadcastCall(metadata.isBroadcastCall()))
return true
Expand Down Expand Up @@ -418,14 +423,6 @@ func alreadyBroadcasted(methods []string, method string) bool {
return false
}

func (c *Content) isBroadcastCall() bool {
return c.OriginAddr != ""
}

func (c *Content) hasReceivedClientRequest() bool {
return c.IsBroadcastClient && c.SendFn != nil
}

func (p *BroadcastProcessor) initialize(msg *Content, metadata *metadata) {
p.log("processor: started", nil, logging.Started(p.started))
p.broadcastID = msg.BroadcastID
Expand Down
30 changes: 12 additions & 18 deletions broadcast/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,17 @@ type Router interface {
}

type BroadcastRouter struct {
mut sync.RWMutex
id uint32
addr string
prevMethod uint16
methodsConversion map[string]uint16
serverHandlers map[string]ServerHandler // handlers on other servers
clientHandlers map[string]struct{} // specifies what handlers a client has implemented. Used only for BroadcastCalls.
createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)
canceler func(broadcastID uint64, srvAddrs []string)
dialOpts []grpc.DialOption
dialTimeout time.Duration
logger *slog.Logger
state *BroadcastState
mut sync.RWMutex
id uint32
addr string
serverHandlers map[string]ServerHandler // handlers on other servers
clientHandlers map[string]struct{} // specifies what handlers a client has implemented. Used only for BroadcastCalls.
createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)
canceler func(broadcastID uint64, srvAddrs []string)
dialOpts []grpc.DialOption
dialTimeout time.Duration
logger *slog.Logger
state *BroadcastState
}

func NewRouter(logger *slog.Logger, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), canceler func(broadcastID uint64, srvAddrs []string), dialTimeout time.Duration, dialOpts ...grpc.DialOption) *BroadcastRouter {
Expand Down Expand Up @@ -82,7 +80,7 @@ func (r *BroadcastRouter) Send(broadcastID uint64, addr, method string, req msg)
}

func (r *BroadcastRouter) Connect(addr string) {
r.getClient(addr)
_, _ = r.getClient(addr)
}

func (r *BroadcastRouter) routeBroadcast(broadcastID uint64, addr, method string, msg *broadcastMsg) error {
Expand Down Expand Up @@ -214,10 +212,6 @@ func (r *reply) getResponse() protoreflect.ProtoMessage {
return r.Response
}

func (r *reply) getError() error {
return r.Err
}

type cancellation struct {
srvAddrs []string
end bool // end is used to stop the request.
Expand Down
2 changes: 1 addition & 1 deletion broadcast/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Epoch() time.Time {
}

func NewSnowflake(id uint64) *Snowflake {
if id < 0 || id >= uint64(MaxMachineID) {
if id >= uint64(MaxMachineID) {
id = uint64(rand.Int31n(int32(MaxMachineID)))
}
return &Snowflake{
Expand Down
56 changes: 1 addition & 55 deletions broadcast/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,6 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

//type CacheOption int

/*
redis:
- noeviction: New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database
- allkeys-lru: Keeps most recently used keys; removes least recently used (LRU) keys
- allkeys-lfu: Keeps frequently used keys; removes least frequently used (LFU) keys
- volatile-lru: Removes least recently used keys with the expire field set to true.
- volatile-lfu: Removes least frequently used keys with the expire field set to true.
- allkeys-random: Randomly removes keys to make space for the new data added.
- volatile-random: Randomly removes keys with expire field set to true.
- volatile-ttl: Removes keys with expire field set to true and the shortest remaining time-to-live (TTL) value.
const (
noeviction CacheOption = iota
allkeysLRU
allkeysLFU
volatileLRU
volatileLFU
allkeysRANDOM
volatileRANDOM
volatileTTL
)
*/

type BroadcastState struct {
mut sync.Mutex
shardMut sync.RWMutex // RW because we often read and very seldom write to the state
Expand Down Expand Up @@ -75,7 +50,6 @@ func (s *BroadcastState) Close() error {
if s.logger != nil {
s.logger.Debug("broadcast: closing state")
}
//s.debug()
s.parentCtxCancelFunc()
var err error
for _, client := range s.clients {
Expand All @@ -87,32 +61,12 @@ func (s *BroadcastState) Close() error {
return err
}

/*func (s *BroadcastState) debug() {
time.Sleep(1 * time.Second)
for _, shard := range s.shards {
for _, req := range shard.reqs {
select {
case <-req.ctx.Done():
default:
slog.Info("req not done", "req", req)
}
}
}
}
/*func (s *BroadcastState) RunShards() {
return
//for _, shard := range s.shards {
//go shard.run(s.sendBuffer)
//}
}*/

func (s *BroadcastState) reset() {
s.parentCtxCancelFunc()
s.mut.Lock()
s.parentCtx, s.parentCtxCancelFunc = context.WithCancel(context.Background())
for _, client := range s.clients {
client.Close()
_ = client.Close()
}
s.clients = make(map[string]*Client)
shards := createShards(s.parentCtx, s.shardBuffer, s.sendBuffer, s.router, s.order, s.reqTTL, s.logger)
Expand Down Expand Up @@ -187,11 +141,3 @@ type Content struct {
CancelCtx context.CancelFunc
Run func(context.Context, func(*Msg) error)
}

func (c Content) send(resp protoreflect.ProtoMessage, err error) error {
if !c.hasReceivedClientRequest() {
return MissingClientReqErr{}
}
c.SendFn(resp, err)
return nil
}
2 changes: 1 addition & 1 deletion broadcastcall.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (c RawConfiguration) BroadcastCall(ctx context.Context, d BroadcastCallData
OriginMethod: d.OriginMethod,
}}
msg := &Message{Metadata: md, Message: d.Message}
c.sign(msg)
o := getCallOptions(E_Broadcast, opts)
c.sign(msg)

var replyChan chan response
if !o.noSendWaiting {
Expand Down
9 changes: 9 additions & 0 deletions callopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "google.golang.org/protobuf/runtime/protoimpl"
type callOptions struct {
callType *protoimpl.ExtensionInfo
noSendWaiting bool
signOrigin bool
}

// CallOption is a function that sets a value in the given callOptions struct
Expand All @@ -25,3 +26,11 @@ func WithNoSendWaiting() CallOption {
o.noSendWaiting = true
}
}

// WithOriginAuthentication is a CallOption that makes BroadcastCall methods
// digitally sign messages.
func WithOriginAuthentication() CallOption {
return func(o *callOptions) {
o.signOrigin = true
}
}
8 changes: 4 additions & 4 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func dummySrv() *Server {
req := in.Message.(*mock.Request)
defer ctx.Release()
resp, err := mockSrv.Test(ctx, req)
SendMessage(ctx, finished, WrapMessage(in.Metadata, resp, err))
_ = SendMessage(ctx, finished, WrapMessage(in.Metadata, resp, err))
})
return srv
}
Expand All @@ -50,7 +50,7 @@ func TestChannelCreation(t *testing.T) {
mgr := dummyMgr()
defer mgr.Close()
// a proper connection should NOT be established here
node.connect(mgr)
_ = node.connect(mgr)

replyChan := make(chan response, 1)
go func() {
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestChannelReconnection(t *testing.T) {
mgr := dummyMgr()
defer mgr.Close()
// a proper connection should NOT be established here because server is not started
node.connect(mgr)
_ = node.connect(mgr)

// send first message when server is down
replyChan1 := make(chan response, 1)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAuthentication(t *testing.T) {
t.Fatal(err)
}
auth := NewAuth(elliptic.P256())
auth.GenerateKeys()
_ = auth.GenerateKeys()
privKey, pubKey := auth.Keys()
auth.RegisterKeys(addr, privKey, pubKey)
mgr := NewRawManager(WithAuthentication(auth))
Expand Down
23 changes: 0 additions & 23 deletions clientserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,22 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

//func init() {
//if encoding.GetCodec(ContentSubtype) == nil {
//encoding.RegisterCodec(NewCodec())
//}
//}

type ReplySpecHandler func(req protoreflect.ProtoMessage, replies []protoreflect.ProtoMessage) (protoreflect.ProtoMessage, bool)

type ClientResponse struct {
err error
msg protoreflect.ProtoMessage
}

type ClientRequest struct {
broadcastID string
doneChan chan protoreflect.ProtoMessage
handler ReplySpecHandler
}

type csr struct {
ctx context.Context
cancel context.CancelFunc
req protoreflect.ProtoMessage
resps []protoreflect.ProtoMessage
doneChan chan protoreflect.ProtoMessage
respChan chan protoreflect.ProtoMessage
handler ReplySpecHandler
}

type ClientServer struct {
id uint64 // should correpond to the MachineID given to the manager
addr string
mu sync.Mutex
csr map[uint64]*csr
reqChan chan *ClientRequest
lis net.Listener
ctx context.Context
cancelCtx context.CancelFunc
inProgress uint64
grpcServer *grpc.Server
handlers map[string]requestHandler
logger *slog.Logger
Expand Down
4 changes: 3 additions & 1 deletion cmd/protoc-gen-gorums/dev/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (mgr *Manager) AddClientServer(lis net.Listener, clientAddr net.Addr, opts
ClientServer: srv,
}
registerClientServerHandlers(srvImpl)
go srvImpl.Serve(lis)
go func() {
_ = srvImpl.Serve(lis)
}()
mgr.srv = srvImpl
return nil
}
Expand Down
Loading

0 comments on commit c634b01

Please sign in to comment.