Skip to content

Commit

Permalink
fix: update cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadVatandoost committed Nov 4, 2023
1 parent 990245b commit bfe6621
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 9 deletions.
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ func main() {
exitCode = -1
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ configMap:
LOGGER_LEVEL: "info"
GRPC_GO_LOG_VERBOSITY_LEVEL: 99
GRPC_GO_LOG_SEVERITY_LEVEL: "info"
Server1Address: "xds:///xds-grpc-server-example-headless:8888"
Server1Address: "xds:///xds-grpc-server-example-headless.control-plane-example:8888"
GRPC_XDS_BOOTSTRAP: "./xds_bootstrap.json"
# GRPC_XDS_BOOTSTRAP_CONFIG: "{\n \"xds_servers\": [\n {\n \"server_uri\": \"xds-control-plane-headless.xds-control-plane.svc.cluster.local:8888\",\n \"channel_creds\": [\n {\n \"type\": \"insecure\"\n }\n ],\n \"server_features\": [\"xds_v3\"] \n }\n ],\n \"node\": {\n \"id\": \"b7f9c818-fb46-43ca-8662-d3bdbcf7ec18~10.0.0.1\",\n \"metadata\": {\n \"R_GCP_PROJECT_NUMBER\": \"123456789012\"\n },\n \"locality\": {\n \"zone\": \"us-central1-a\"\n }\n }\n}\n"

2 changes: 1 addition & 1 deletion example/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
func main() {

// viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless:8888")
viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless")
viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless.control-plane-example")
// viper.SetDefault("Server1Address", "xds-grpc-server-example-headless:8888")
// Read Config from ENV
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
Expand Down
1 change: 0 additions & 1 deletion internal/app/control-plane/controller.go

This file was deleted.

7 changes: 5 additions & 2 deletions internal/app/control-plane/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (a *App) OnDeleteService(key string, serviceObj *v1.Service) {
}

func (a *App) OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service) {
slog.Info("OnUpdateService", "newKey", newKey, "newServiceName", newServiceObj.Name, "newServiceNamespace", newServiceObj.Namespace,
"oldKey", oldKey, "oleServiceName", oldServiceObj.Name, "oldServiceNamespace", oldServiceObj.Namespace)
// slog.Info("OnUpdateService", "newKey", newKey, "newServiceName", newServiceObj.Name, "newServiceNamespace", newServiceObj.Namespace,
// "oldKey", oldKey, "oleServiceName", oldServiceObj.Name, "oldServiceNamespace", oldServiceObj.Namespace)
a.muResource.Lock()
defer a.muResource.Unlock()
resourceInstance, ok := a.resources[oldKey]
Expand All @@ -51,3 +51,6 @@ func (a *App) OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey s
resourceInstance.ServiceObj = newServiceObj
a.resources[newKey] = resourceInstance
}


// xds-grpc-server-example-headless.control-plane-example
18 changes: 15 additions & 3 deletions internal/app/control-plane/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ import (

// *** callbacks
func (a *App) NewStreamRequest(id string, resourceNames []string, typeURL string) {
node := a.CreateNode(id)
isUpdateNeeded := false
node, err := a.GetNode(id)
if err != nil {
node = a.CreateNode(id)
isUpdateNeeded = true
}
slog.Info("app NewStreamRequest", "id", id, "resourceNames", resourceNames, "typeURL", typeURL)
for _, rn := range resourceNames {
node.AddWatching(rn)
a.AddResourceWatchToNode(id, rn, typeURL)
}
if isUpdateNeeded {
a.UpdateNodeCache(id)
}
}

func (a *App) StreamClosed(id string) {
Expand All @@ -34,19 +42,23 @@ func (a *App) UpdateNodeCache(nodeID string) {
}
resources := node.GetWatchings()
node.ClearResources()
slog.Info("UpdateCache", "nodeID", nodeID)
slog.Info("UpdateCache", "nodeID", nodeID, "version", node.GetVersion())
for _, rn := range resources {
resource, ok := a.resources[rn]
if !ok {
slog.Error("UpdateCache, resource doesn't exist", "resource", rn, "nodeID", nodeID)
continue
}
//ToDo: later fix loop through each port name
endPoint, cluster, listner, route, err := xds.MakeXDSResource(resource, a.conf.Region, a.conf.Zone, resource.ServiceObj.Spec.Ports[0].Name)
endPoint, cluster, route, listner, err := xds.MakeXDSResource(resource, a.conf.Region, a.conf.Zone, resource.ServiceObj.Spec.Ports[0].Name)
if err != nil {
slog.Error("UpdateCache, failed to Make XDS Resource", "error", err, "resource", rn, "nodeID", nodeID)
continue
}
slog.Info("UpdateCache, resource", "resource", rn, "nodeID", nodeID, "endPoint", endPoint)
slog.Info("UpdateCache, resource", "resource", rn, "nodeID", nodeID, "cluster", cluster)
slog.Info("UpdateCache, resource", "resource", rn, "nodeID", nodeID, "listner", listner)
slog.Info("UpdateCache, resource", "resource", rn, "nodeID", nodeID, "route",route)
node.AddCluster(cluster)
node.AddListener(listner)
node.AddEndpoint(endPoint)
Expand Down
2 changes: 1 addition & 1 deletion internal/informer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewServiceInformer(factory informers.SharedInformerFactory, handler Service
}

func getServiceKey(service *v1.Service) string {
return service.Namespace + "." + service.Name
return service.Name + "." + service.Namespace
}

func (si *ServiceInformer) Run(stopCh <-chan struct{}) {
Expand Down

0 comments on commit bfe6621

Please sign in to comment.