Skip to content

Commit

Permalink
Create app package (#12)
Browse files Browse the repository at this point in the history
* xds: create app interface

* Create k8s pkg (#11)

* xds: create app interface

* create k8s package

* Update README.md

* create the app in main

* to save

* merge with main

* update readme

* connect xds to app
  • Loading branch information
mohammadVatandoost authored Oct 23, 2023
1 parent 0eeb090 commit c93d2bf
Show file tree
Hide file tree
Showing 36 changed files with 250 additions and 456 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
vendor
vendor/*

deployments
deployments/*
build
build/*
mvatandoost
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ A proxy-less service mesh for grpc services in kubernetes.
Use argoCD yaml files or Helm charts to deploy on K8s

### Running Locally by Kind
Setup dev tools
```shell
make dev/tools
```

Setup local k8s
```shell
make kind/start
```

Deploy xds-control-plane with server and client example servoce to k8s
```shell
make kind/deploy/control-plane
```


### Issues
- [] for ADS, the request names must match the snapshot names, if they do not, then the watch is never responded, and it is expected that envoy makes another request. So we can only add service names to the snapshot that client exactly watch. this is wierld. It means if client watch xds-grpc-server-example-headless resource, you can only send listner for this resource (you couldn't resolve all the k8s services)
(WARN[0010] ADS mode: not responding to request: "kube-prometheus-prometheus:9090" not listed, ResourceNames: [xds-grpc-server-example-headless:8888] )
Expand Down
31 changes: 27 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
package main

import (
"os"
"context"
"log/slog"
"os"

controlplane "github.com/mohammadVatandoost/xds-conrol-plane/internal/app/control-plane"
"github.com/mohammadVatandoost/xds-conrol-plane/internal/informer"
"github.com/mohammadVatandoost/xds-conrol-plane/internal/k8s"
"github.com/mohammadVatandoost/xds-conrol-plane/internal/xds"
"github.com/mohammadVatandoost/xds-conrol-plane/pkg/config"
controlplaneConfig "github.com/mohammadVatandoost/xds-conrol-plane/pkg/config/app/controlplane"
"github.com/mohammadVatandoost/xds-conrol-plane/pkg/utils"
"github.com/mohammadVatandoost/xds-conrol-plane/pkg/version"
)

const serviceName = "xds_control_plane"

func main() {
slog.Info("Initializing", "service", serviceName, "information", version.Build.FormatDetailedProductInfo())
exitCode := 0
defer func () {
os.Exit(exitCode)
}()
// conf := loadConfigOrPanic(cmd)
// configureLoggerOrPanic(conf.Logger)

serverContext, serverCancel := utils.WithSignalCancellation(context.Background())
defer serverCancel()

conf := controlplaneConfig.DefaultControlPlaneConfig()
err := config.Load("", conf)
Expand All @@ -27,11 +35,26 @@ func main() {
return
}

k8sClient, err := k8s.CreateClusterClient()
if err != nil {
slog.Error("couldn't create k8s client", "error", err)
exitCode = -1
return
}

app := controlplane.NewApp(conf)

runTimeInformer := informer.NewRunTime(k8sClient)
serviceInformer := informer.NewServiceInformer(runTimeInformer.GetInformerFactory(), app)
runTimeInformer.AddInformer(serviceInformer)
runTimeInformer.RunInformers(serverContext.Done())

slog.Info("XDS control plane config", "XDS.ADSEnabled", conf.XDSConfig.ADSEnabled, "ListenPort", conf.XDSConfig.Port)
xdsServer := xds.NewControlPlane(conf.XDSConfig)
xdsServer := xds.NewControlPlane(conf.XDSConfig, app)
err = xdsServer.Run()
if err != nil {
slog.Error("couldn't run xdsServer", "error", err)
exitCode = -1
}
}

1 change: 1 addition & 0 deletions internal/app/control-plane/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package controlplane
20 changes: 20 additions & 0 deletions internal/app/control-plane/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package controlplane

import (
"log/slog"

v1 "k8s.io/api/core/v1"
)

func (a *App) OnAddSerivce(key string, serviceObj *v1.Service) {
slog.Info("OnAddSerivce", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
}

func (a *App) OnDeleteService(key string, serviceObj *v1.Service) {
slog.Info("OnDeleteService", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
}

func (a *App) OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service) {
slog.Info("OnUpdateService", "newKey", newKey, "newServiceName", newServiceObj.Name, "newServiceNamespace", newServiceObj.Namespace,
"oldKey", oldKey, "oleServiceName", oldServiceObj.Name, "oldServiceNamespace", oldServiceObj.Namespace)
}
67 changes: 67 additions & 0 deletions internal/app/control-plane/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package controlplane

import (
"fmt"

"github.com/mohammadVatandoost/xds-conrol-plane/internal/node"
)

func (a *App) GetServices() {
}

func (a *App) CreateNode(id string) *node.Node {
a.mu.Lock()
defer a.mu.Unlock()
n, ok := a.nodes[id]
if !ok {
n = node.NewNode()
}
a.nodes[id] = n
return n
}

func (a *App) GetNode(id string) (*node.Node, error) {
a.mu.RLock()
defer a.mu.RUnlock()
node, ok := a.nodes[id]
if !ok {
return nil, fmt.Errorf("node with id: %s is not exist", id)
}
return node, nil
}

func (a *App) DeleteNode(id string) error {
a.mu.Lock()
defer a.mu.Unlock()
_, ok := a.nodes[id]
if !ok {
return fmt.Errorf("node with id: %s is not exist", id)
}
delete(a.nodes, id)
return nil
}

func (a *App) AddResourceWatchToNode(id string, resource string) {
a.muResource.Lock()
defer a.muResource.Unlock()
nodes, ok := a.resources[resource]
if !ok {
nodes = make(map[string]struct{})
a.resources[resource] = nodes
}
nodes[id] = struct{}{}
}

func (a *App) GetNodesWatchTheResource(resource string) []string {
a.muResource.RLock()
defer a.muResource.RUnlock()
nodesArray := make([]string, 0)
nodes, ok := a.resources[resource]
if !ok {
return nodesArray
}
for n := range nodes {
nodesArray = append(nodesArray, n)
}
return nodesArray
}
19 changes: 19 additions & 0 deletions internal/app/control-plane/xds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package controlplane

import "log/slog"


func (a *App) NewStreamRequest(id string, resourceNames []string) {
node := a.CreateNode(id)
for _, rn := range resourceNames {
node.AddWatcher(rn)
a.AddResourceWatchToNode(id, rn)
}
}

func (a *App) StreamClosed(id string) {
err := a.DeleteNode(id)
if err != nil {
slog.Error("app stream closed", "error", err, "id", id)
}
}
7 changes: 7 additions & 0 deletions internal/informer/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ func (rt *RunTime) RunInformers(stopCh <- chan struct{}) {

func (rt *RunTime) GetInformerFactory() informers.SharedInformerFactory {
return informers.NewSharedInformerFactoryWithOptions(rt.client, time.Second*10, informers.WithNamespace(""))
}


func NewRunTime(client kubernetes.Interface) *RunTime {
return &RunTime{
client: client,
}
}
3 changes: 2 additions & 1 deletion internal/informer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ type ServiceEventHandler interface {
OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service)
}

func NewServiceInformer(factory informers.SharedInformerFactory) *ServiceInformer {
func NewServiceInformer(factory informers.SharedInformerFactory, handler ServiceEventHandler) *ServiceInformer {
sharedCache := factory.Core().V1().Services().Informer()

si := &ServiceInformer{
cache: sharedCache,
handler: handler,
}

sharedCache.AddEventHandler(si)
Expand Down
5 changes: 2 additions & 3 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/mohammadVatandoost/xds-conrol-plane/pkg/util"

"github.com/mohammadVatandoost/xds-conrol-plane/pkg/utils"
)

func CreateClusterClient() (kubernetes.Interface, error) {
Expand Down Expand Up @@ -37,6 +36,6 @@ func CreateClusterClient() (kubernetes.Interface, error) {
if err != nil {
return nil, err
}
// dynamic.NewForConfig()

return clientset, nil
}
2 changes: 1 addition & 1 deletion internal/xds/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package xds


type App interface {
NewStreamRequest(id string)
NewStreamRequest(id string, resourceNames []string)
StreamClosed(id string)
}
Loading

0 comments on commit c93d2bf

Please sign in to comment.