Skip to content

Commit

Permalink
wire in errorCode tracking based on conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
mjudeikis committed Oct 24, 2024
1 parent b36adce commit 8c40ccb
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 52 deletions.
92 changes: 59 additions & 33 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Result struct {
Shard string
// Cluster canonical path
Cluster logicalcluster.Name

// ErrorCode is the HTTP error code to return for the request.
// If this is set, the URL and Shard fields are ignored.
ErrorCode int
}

// PathRewriter can rewrite a logical cluster path before the actual mapping through
Expand All @@ -55,9 +59,13 @@ func New(rewriters []PathRewriter) *State {
shardClusterParentCluster: map[string]map[logicalcluster.Name]logicalcluster.Name{},
shardBaseURLs: map[string]string{},
// Experimental feature: allow mounts to be used with Workspaces
// structure: (clusterName, workspace name) -> string serialized mount objects
// structure: (shard, logical cluster, workspace name) -> string serialized mount objects
// This should be simplified once we promote this to workspace structure.
clusterWorkspaceMountAnnotation: map[logicalcluster.Name]map[string]string{},
shardClusterWorkspaceMountAnnotation: map[string]map[logicalcluster.Name]map[string]string{},

// shardClusterWorkspaceNameErrorCode is a map of shar,logical cluster, workspace to error code when we want to return an error code
// instead of a URL.
shardClusterWorkspaceNameErrorCode: map[string]map[logicalcluster.Name]map[string]int{},
}
}

Expand All @@ -74,37 +82,39 @@ type State struct {
shardClusterParentCluster map[string]map[logicalcluster.Name]logicalcluster.Name // (shard name, logical cluster) -> parent logical cluster
shardBaseURLs map[string]string // shard name -> base URL
// Experimental feature: allow mounts to be used with Workspaces
clusterWorkspaceMountAnnotation map[logicalcluster.Name]map[string]string // (clusterName, workspace name) -> mount object string
shardClusterWorkspaceMountAnnotation map[string]map[logicalcluster.Name]map[string]string // (shard name, logical cluster, workspace name) -> mount object string

shardClusterWorkspaceNameErrorCode map[string]map[logicalcluster.Name]map[string]int // (shard name, logical cluster, workspace name) -> error code
}

func (c *State) UpsertWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {
if ws.Status.Phase == corev1alpha1.LogicalClusterPhaseScheduling {
return
}
// If the workspace is unavailable, we should delete it from the index,
// as it is not usable.
if ws.Status.Phase == corev1alpha1.LogicalClusterPhaseUnavailable {
c.DeleteWorkspace(shard, ws)
return
}
clusterName := logicalcluster.From(ws)

c.lock.RLock()
cluster := c.shardWorkspaceNameCluster[shard][clusterName][ws.Name]
mountObjString := c.clusterWorkspaceMountAnnotation[clusterName][ws.Name] // experimental feature
c.lock.RUnlock()

// TODO(mjudeikis): we are allowing upsert in 2 cases:
// 1. cluster name is different
// 2. mount object string is different (updated, added, or removed)
// When we promote this to workspace structure, we should make this check smarter and better tested.
if (cluster.String() == ws.Spec.Cluster) && (ws.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey] != "" && mountObjString == ws.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey]) {
return
}
clusterName := logicalcluster.From(ws)

c.lock.Lock()
defer c.lock.Unlock()

// If the workspace is unavailable, we set custom error code for it. And add it to the index as normal.
// TODO(mjudeikis): Once we have one more case - move to a separate function.
if ws.Status.Phase == corev1alpha1.LogicalClusterPhaseUnavailable {
if c.shardClusterWorkspaceNameErrorCode[shard] == nil {
c.shardClusterWorkspaceNameErrorCode[shard] = map[logicalcluster.Name]map[string]int{}
}
if c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)] == nil {
c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)] = map[string]int{}
}
// Unavailable workspaces should return 503
c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)][ws.Name] = 503
} else {
delete(c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)], ws.Name)
if len(c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)]) == 0 {
delete(c.shardClusterWorkspaceNameErrorCode[shard], logicalcluster.Name(ws.Spec.Cluster))
}
}

if cluster := c.shardWorkspaceNameCluster[shard][clusterName][ws.Name]; cluster.String() != ws.Spec.Cluster {
if c.shardWorkspaceNameCluster[shard] == nil {
c.shardWorkspaceNameCluster[shard] = map[logicalcluster.Name]map[string]logicalcluster.Name{}
Expand All @@ -119,11 +129,14 @@ func (c *State) UpsertWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {
c.shardClusterParentCluster[shard][logicalcluster.Name(ws.Spec.Cluster)] = clusterName
}

if mountObjString := c.clusterWorkspaceMountAnnotation[clusterName][ws.Name]; mountObjString != ws.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey] {
if c.clusterWorkspaceMountAnnotation[clusterName] == nil {
c.clusterWorkspaceMountAnnotation[clusterName] = map[string]string{}
if mountObjString := c.shardClusterWorkspaceMountAnnotation[shard][clusterName][ws.Name]; mountObjString != ws.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey] {
if c.shardClusterWorkspaceMountAnnotation[shard] == nil {
c.shardClusterWorkspaceMountAnnotation[shard] = map[logicalcluster.Name]map[string]string{}
}
if c.shardClusterWorkspaceMountAnnotation[shard][clusterName] == nil {
c.shardClusterWorkspaceMountAnnotation[shard][clusterName] = map[string]string{}
}
c.clusterWorkspaceMountAnnotation[clusterName][ws.Name] = ws.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey]
c.shardClusterWorkspaceMountAnnotation[shard][clusterName][ws.Name] = ws.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey]
}
}

Expand All @@ -132,7 +145,7 @@ func (c *State) DeleteWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {

c.lock.RLock()
_, foundCluster := c.shardWorkspaceNameCluster[shard][clusterName][ws.Name]
_, foundMount := c.clusterWorkspaceMountAnnotation[clusterName][ws.Name]
_, foundMount := c.shardClusterWorkspaceMountAnnotation[shard][clusterName][ws.Name]
c.lock.RUnlock()

if !foundCluster && !foundMount {
Expand Down Expand Up @@ -162,12 +175,16 @@ func (c *State) DeleteWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {
}
}

if _, foundMount = c.clusterWorkspaceMountAnnotation[clusterName][ws.Name]; foundMount {
delete(c.clusterWorkspaceMountAnnotation[clusterName], ws.Name)
if len(c.clusterWorkspaceMountAnnotation[clusterName]) == 0 {
delete(c.clusterWorkspaceMountAnnotation, clusterName)
if _, foundMount = c.shardClusterWorkspaceMountAnnotation[shard][clusterName][ws.Name]; foundMount {
delete(c.shardClusterWorkspaceMountAnnotation[shard][clusterName], ws.Name)
if len(c.shardClusterWorkspaceMountAnnotation[shard][clusterName]) == 0 {
delete(c.shardClusterWorkspaceMountAnnotation[shard], clusterName)
}
}
delete(c.shardClusterWorkspaceNameErrorCode[shard][clusterName], ws.Name)
if len(c.shardClusterWorkspaceNameErrorCode[shard][clusterName]) == 0 {
delete(c.shardClusterWorkspaceNameErrorCode[shard], clusterName)
}
}

func (c *State) UpsertLogicalCluster(shard string, logicalCluster *corev1alpha1.LogicalCluster) {
Expand Down Expand Up @@ -225,6 +242,7 @@ func (c *State) DeleteShard(shardName string) {
delete(c.shardBaseURLs, shardName)
delete(c.shardWorkspaceName, shardName)
delete(c.shardClusterParentCluster, shardName)
delete(c.shardClusterWorkspaceNameErrorCode, shardName)
}

func (c *State) Lookup(path logicalcluster.Path) (Result, bool) {
Expand All @@ -239,6 +257,7 @@ func (c *State) Lookup(path logicalcluster.Path) (Result, bool) {

var shard string
var cluster logicalcluster.Name
var errorCode int

// walk through index graph to find the final logical cluster and shard
for i, s := range segments {
Expand All @@ -253,7 +272,7 @@ func (c *State) Lookup(path logicalcluster.Path) (Result, bool) {
}

// check mounts, if found return url and true
val, foundMount := c.clusterWorkspaceMountAnnotation[cluster][s] // experimental feature
val, foundMount := c.shardClusterWorkspaceMountAnnotation[shard][cluster][s] // experimental feature
if foundMount {
mount, err := tenancyv1alpha1.ParseTenancyMountAnnotation(val)
if !(err != nil || mount == nil || mount.MountStatus.URL == "") {
Expand All @@ -275,15 +294,22 @@ func (c *State) Lookup(path logicalcluster.Path) (Result, bool) {
if !found {
return Result{}, false
}
ec, found := c.shardClusterWorkspaceNameErrorCode[shard][cluster][s]
if found {
errorCode = ec
}
}
return Result{Shard: shard, Cluster: cluster}, true
return Result{Shard: shard, Cluster: cluster, ErrorCode: errorCode}, true
}

func (c *State) LookupURL(path logicalcluster.Path) (Result, bool) {
result, found := c.Lookup(path)
if !found {
return Result{}, false
}
if result.ErrorCode != 0 {
return result, true
}

if result.URL != "" && result.Shard == "" && result.Cluster == "" {
return result, true
Expand Down
6 changes: 5 additions & 1 deletion pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func shardHandler(index index.Index, proxy http.Handler) http.HandlerFunc {
return
}

shardURLString, found := index.LookupURL(clusterPath)
shardURLString, found, errCode := index.LookupURL(clusterPath)
if errCode != 0 {
http.Error(w, "Not available.", errCode)
return
}
if !found {
logger.WithValues("clusterPath", clusterPath).V(4).Info("Unknown cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
Expand Down
9 changes: 6 additions & 3 deletions pkg/proxy/index/index_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
)

type Index interface {
LookupURL(path logicalcluster.Path) (url string, found bool)
LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int)
}

type ClusterClientGetter func(shard *corev1alpha1.Shard) (kcpclientset.ClusterInterface, error)
Expand Down Expand Up @@ -291,7 +291,10 @@ func (c *Controller) stopShard(shardName string) {
delete(c.shardLogicalClusterInformers, shardName)
}

func (c *Controller) LookupURL(path logicalcluster.Path) (url string, found bool) {
func (c *Controller) LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int) {
r, found := c.state.LookupURL(path)
return r.URL, found
if found && r.ErrorCode != 0 {
return r.URL, found, r.ErrorCode
}
return r.URL, found, 0
}
23 changes: 16 additions & 7 deletions pkg/proxy/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Mappings are done from most specific to least specific:
// Example: /clusters/cluster1/ will be matched before /clusters/
for _, m := range h.mapping {
if strings.HasPrefix(h.resolveURL(r), m.path) {
url, errorCode := h.resolveURL(r)
if errorCode != 0 {
http.Error(w, http.StatusText(errorCode), errorCode)
return
}
if strings.HasPrefix(url, m.path) {
m.handler.ServeHTTP(w, r)
return
}
Expand All @@ -87,28 +92,32 @@ func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.defaultHandler.ServeHTTP(w, r)
}

func (h *HttpHandler) resolveURL(r *http.Request) string {
func (h *HttpHandler) resolveURL(r *http.Request) (string, int) {
// if we don't match any of the paths, use the default behavior - request
var cs = strings.SplitN(strings.TrimLeft(r.URL.Path, "/"), "/", 3)
if len(cs) < 2 || cs[0] != "clusters" {
return r.URL.Path
return r.URL.Path, 0
}

clusterPath := logicalcluster.NewPath(cs[1])
if !clusterPath.IsValid() {
return r.URL.Path
return r.URL.Path, 0
}

u, found := h.index.LookupURL(clusterPath)
u, found, errCode := h.index.LookupURL(clusterPath)
if errCode != 0 {
return "", errCode
}
if found {
u, err := url.Parse(u)
if err == nil && u != nil {
u.Path = strings.TrimSuffix(u.Path, "/")
r.URL.Path = path.Join(u.Path, strings.Join(cs[2:], "/")) // override request prefix and keep kube api contextual suffix
return u.Path
return u.Path, 0
}
}

return r.URL.Path
return r.URL.Path, 0
}

func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index) (http.Handler, error) {
Expand Down
13 changes: 7 additions & 6 deletions pkg/reconciler/tenancy/workspace/workspace_reconcile_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *tenancyv1alp
conditions.MarkTrue(workspace, tenancyv1alpha1.WorkspaceInitialized)

case corev1alpha1.LogicalClusterPhaseUnavailable:
if checkConditionAndSetPhase(workspace) {
if updateTerminalConditionsAndPhase(workspace) {
return reconcileStatusStopAndRequeue, nil
}
return reconcileStatusContinue, nil
Expand Down Expand Up @@ -116,19 +116,20 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *tenancyv1alp
logger.Info("workspace content is deleted")
return reconcileStatusContinue, nil
}
logger.Info("workspace is ready, checking conditions", "conditions", workspace.Status.Conditions)

// if workspace is ready, we check if it suppose to be ready by checking conditions.
if checkConditionAndSetPhase(workspace) {
if updateTerminalConditionsAndPhase(workspace) {
logger.Info("workspace phase changed", "status", workspace.Status)
return reconcileStatusStopAndRequeue, nil
}
}

return reconcileStatusContinue, nil
}

// checkConditionAndSetPhase checks if the workspace is ready by checking conditions and sets the phase accordingly.
// updateTerminalConditionsAndPhase checks if the workspace is ready by checking conditions and sets the phase accordingly.
// It returns true if the phase was changed, false otherwise.
func checkConditionAndSetPhase(workspace *tenancyv1alpha1.Workspace) bool {
func updateTerminalConditionsAndPhase(workspace *tenancyv1alpha1.Workspace) bool {
var notReady bool
for _, c := range workspace.Status.Conditions {
if c.Status == v1.ConditionFalse && strings.HasPrefix(string(c.Type), "Workspace") {
Expand All @@ -140,7 +141,7 @@ func checkConditionAndSetPhase(workspace *tenancyv1alpha1.Workspace) bool {
workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseUnavailable
return true
}
if !notReady && workspace.Status.Phase != corev1alpha1.LogicalClusterPhaseReady {
if !notReady && workspace.Status.Phase == corev1alpha1.LogicalClusterPhaseUnavailable {
workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady
return true
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/localproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func WithLocalProxy(

// lookup in our local, potentially partial index
r, found := indexState.Lookup(path)
if found && r.ErrorCode != 0 {
// return code if set.
// TODO(mjudeikis): Would be nice to have a way to return a custom error message.
http.Error(w, "Not available.", r.ErrorCode)
return
}
if found && r.Shard != shardName && r.URL == "" {
logger.WithValues("cluster", cluster.Name, "requestedShard", r.Shard, "actualShard", shardName).Info("cluster is not on this shard, but on another")

Expand Down
6 changes: 4 additions & 2 deletions test/e2e/mounts/mounts_machinery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func TestMountsMachinery(t *testing.T) {
require.NoError(t, err)

// Workspace access should not with denied.
_, err = kcpClusterClient.Cluster(mountPath).ApisV1alpha1().APIExports().List(ctx, metav1.ListOptions{})
require.Error(t, err)
err = wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
_, err = kcpClusterClient.Cluster(mountPath).ApisV1alpha1().APIExports().List(ctx, metav1.ListOptions{})
return err != nil, nil
})
}

0 comments on commit 8c40ccb

Please sign in to comment.