Skip to content

Commit

Permalink
refactor(broadcast): moved interface to broadcast folder
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksander-vedvik committed Apr 29, 2024
1 parent 35d5589 commit 42c554e
Show file tree
Hide file tree
Showing 17 changed files with 326 additions and 119 deletions.
20 changes: 3 additions & 17 deletions broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

"github.com/relab/gorums/broadcast"
"google.golang.org/protobuf/reflect/protoreflect"
)

type broadcastServer struct {
Expand All @@ -19,11 +18,9 @@ type broadcastServer struct {
view RawConfiguration
createBroadcaster func(m BroadcastMetadata, o *BroadcastOrchestrator) Broadcaster
orchestrator *BroadcastOrchestrator
//state BroadcastState
manager BroadcastManger
//router BroadcastRouter
logger *slog.Logger
metrics *broadcast.Metric
manager broadcast.BroadcastManager
logger *slog.Logger
metrics *broadcast.Metric
}

func (srv *Server) PrintStats() {
Expand Down Expand Up @@ -82,17 +79,6 @@ func (srv *broadcastServer) stop() {
//AddClientHandler(method string, handler broadcast.ClientHandler)
//}

type BroadcastManger interface {
Process(broadcast.Content) error
ProcessBroadcast(uint64, protoreflect.ProtoMessage, string)
ProcessSendToClient(uint64, protoreflect.ProtoMessage, error)
NewBroadcastID() uint64
AddAddr(id uint32, addr string)
AddServerHandler(method string, handler broadcast.ServerHandler)
AddClientHandler(method string)
Close() error
}

type Snowflake interface {
NewBroadcastID() uint64
}
Expand Down
39 changes: 27 additions & 12 deletions broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,39 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

type BroadcastManager struct {
type BroadcastManager interface {
Process(Content) error
ProcessBroadcast(uint64, protoreflect.ProtoMessage, string, ...BroadcastOptions)
ProcessSendToClient(uint64, protoreflect.ProtoMessage, error)
NewBroadcastID() uint64
AddAddr(id uint32, addr string)
AddServerHandler(method string, handler ServerHandler)
AddClientHandler(method string)
Close() error
}

type broadcastManager struct {
state *BroadcastState
router *BroadcastRouter
metrics *Metric
logger *slog.Logger
}

func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)) *BroadcastManager {
func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)) BroadcastManager {
router := NewRouter(logger, m, createClient)
state := NewState(logger, m)
for _, shard := range state.shards {
go shard.run(router, state.reqTTL, state.sendBuffer, state.shardBuffer, m)
}
return &BroadcastManager{
return &broadcastManager{
state: state,
router: router,
logger: logger,
metrics: m,
}
}

func (mgr *BroadcastManager) Process(msg Content) error {
func (mgr *broadcastManager) Process(msg Content) error {
_, shardID, _, _ := DecodeBroadcastID(msg.BroadcastID)
shardID = shardID % NumShards
shard := mgr.state.shards[shardID]
Expand All @@ -49,22 +60,26 @@ func (mgr *BroadcastManager) Process(msg Content) error {
}
}

func (mgr *BroadcastManager) ProcessBroadcast(broadcastID uint64, req protoreflect.ProtoMessage, method string) {
func (mgr *broadcastManager) ProcessBroadcast(broadcastID uint64, req protoreflect.ProtoMessage, method string, opts ...BroadcastOptions) {
var options BroadcastOptions
if len(opts) > 0 {
options = opts[0]
}
_, shardID, _, _ := DecodeBroadcastID(broadcastID)
shardID = shardID % NumShards
shard := mgr.state.shards[shardID]
select {
case shard.broadcastChan <- Msg{
Broadcast: true,
Msg: NewMsg(broadcastID, req, method),
Msg: NewMsg(broadcastID, req, method, options),
Method: method,
BroadcastID: broadcastID,
}:
case <-shard.ctx.Done():
}
}

func (mgr *BroadcastManager) ProcessSendToClient(broadcastID uint64, resp protoreflect.ProtoMessage, err error) {
func (mgr *broadcastManager) ProcessSendToClient(broadcastID uint64, resp protoreflect.ProtoMessage, err error) {
_, shardID, _, _ := DecodeBroadcastID(broadcastID)
shardID = shardID % NumShards
shard := mgr.state.shards[shardID]
Expand All @@ -80,24 +95,24 @@ func (mgr *BroadcastManager) ProcessSendToClient(broadcastID uint64, resp protor
}
}

func (mgr *BroadcastManager) NewBroadcastID() uint64 {
func (mgr *broadcastManager) NewBroadcastID() uint64 {
return mgr.state.snowflake.NewBroadcastID()
}

func (mgr *BroadcastManager) AddAddr(id uint32, addr string) {
func (mgr *broadcastManager) AddAddr(id uint32, addr string) {
mgr.router.id = id
mgr.router.addr = addr
mgr.state.snowflake = NewSnowflake(addr)
}

func (mgr *BroadcastManager) AddServerHandler(method string, handler ServerHandler) {
func (mgr *broadcastManager) AddServerHandler(method string, handler ServerHandler) {
mgr.router.serverHandlers[method] = handler
}

func (mgr *BroadcastManager) AddClientHandler(method string) {
func (mgr *broadcastManager) AddClientHandler(method string) {
mgr.router.clientHandlers[method] = struct{}{}
}

func (mgr *BroadcastManager) Close() error {
func (mgr *broadcastManager) Close() error {
return mgr.state.Close()
}
13 changes: 2 additions & 11 deletions broadcast/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,13 @@ type broadcastMsg struct {
ctx context.Context
}

func newMsg(broadcastID uint64, req protoreflect.ProtoMessage, method string, options BroadcastOptions) *broadcastMsg {
func NewMsg(broadcastID uint64, req protoreflect.ProtoMessage, method string, options BroadcastOptions) *broadcastMsg {
return &broadcastMsg{
request: req,
method: method,
broadcastID: broadcastID,
options: options,
ctx: context.WithValue(context.Background(), BroadcastID, broadcastID),
}
}

func NewMsg(broadcastID uint64, req protoreflect.ProtoMessage, method string) *broadcastMsg {
return &broadcastMsg{
request: req,
method: method,
broadcastID: broadcastID,
ctx: context.WithValue(context.Background(), BroadcastID, broadcastID),
ctx: context.Background(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion broadcastCall.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type broadcastCallData struct {
// checks whether the given address is contained in the given subset
// of server addresses. Will return true if a subset is not given.
func (bcd *broadcastCallData) inSubset(addr string) bool {
if len(bcd.ServerAddresses) <= 0 {
if bcd.ServerAddresses == nil || len(bcd.ServerAddresses) <= 0 {
return true
}
for _, srvAddr := range bcd.ServerAddresses {
Expand Down
4 changes: 3 additions & 1 deletion broadcastTypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func WithRelationToRequest(broadcastID uint64) BroadcastOption {
}

func NewBroadcastOptions() broadcast.BroadcastOptions {
return broadcast.BroadcastOptions{}
return broadcast.BroadcastOptions{
ServerAddresses: make([]string, 0), // to prevent nil errors
}
}

type Broadcaster interface{}
Expand Down
10 changes: 10 additions & 0 deletions cmd/protoc-gen-gorums/dev/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newBroadcaster(m gorums.BroadcastMetadata, o *gorums.BroadcastOrchestrator)
return &Broadcast{
orchestrator: o,
metadata: m,
srvAddrs: make([]string, 0),
}
}

Expand All @@ -41,6 +42,7 @@ func (srv *Server) SetView(config *Configuration) {
type Broadcast struct {
orchestrator *gorums.BroadcastOrchestrator
metadata gorums.BroadcastMetadata
srvAddrs []string
}

// Returns a readonly struct of the metadata used in the broadcast.
Expand All @@ -63,6 +65,14 @@ func (c *clientServerImpl) stop() {
}
}

func (b *Broadcast) To(addrs ...string) *Broadcast {
if len(addrs) <= 0 {
return b
}
b.srvAddrs = append(b.srvAddrs, addrs...)
return b
}

func (b *Broadcast) Forward(req protoreflect.ProtoMessage, addr string) error {
if addr == "" {
return fmt.Errorf("cannot forward to empty addr, got: %s", addr)
Expand Down
4 changes: 4 additions & 0 deletions cmd/protoc-gen-gorums/dev/zorums_broadcast_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/protoc-gen-gorums/gengorums/template_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var broadcastBody = `
for _, opt := range opts {
opt(&options)
}
options.ServerAddresses = append(options.ServerAddresses, b.srvAddrs...)
b.orchestrator.BroadcastHandler("{{.Method.Desc.FullName}}", req, b.metadata.BroadcastID, options)
}
`
Expand Down
10 changes: 10 additions & 0 deletions cmd/protoc-gen-gorums/gengorums/template_static.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (srv *Server) RegisterBroadcaster(b func(m BroadcastMetadata, o *BroadcastO

func (srv *broadcastServer) broadcastHandler(method string, req protoreflect.ProtoMessage, broadcastID uint64, opts ...broadcast.BroadcastOptions) {
//srv.state.ProcessBroadcast(broadcastID, req, method)
srv.manager.ProcessBroadcast(broadcastID, req, method)
srv.manager.ProcessBroadcast(broadcastID, req, method, opts...)
}

func (srv *broadcastServer) sendToClientHandler(broadcastID uint64, resp protoreflect.ProtoMessage, err error) {
Expand Down
Loading

0 comments on commit 42c554e

Please sign in to comment.