From 8d17b199240c7eb2f8b7e6395459afd2108ca33f Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 20:02:06 +1000 Subject: [PATCH 1/8] Reformat markdown files with prettier Reformat markdown files with prettier 2.3.2. Format all the things because automating what can be automated is grand. Sadly all go alternatives investigated for markdown formatting fall short of prettier. --- README.md | 145 +++++++++++++++++++++-------------------- docs/inner-workings.md | 65 +++++++++--------- docs/local-cluster.md | 84 ++++++++++++------------ 3 files changed, 151 insertions(+), 143 deletions(-) diff --git a/README.md b/README.md index d63bd4e..54f2b1c 100644 --- a/README.md +++ b/README.md @@ -1,62 +1,64 @@ # Transflect [![CI/CD](https://github.com/cashapp/transflect/workflows/ci/cd/badge.svg?branch=master)](https://github.com/cashapp/transflect/actions?query=workflow%3ACI%2FCD+branch%3Amaster) [![Slack chat](https://img.shields.io/badge/slack-gophers-795679?logo=slack)](https://gophers.slack.com/messages/cashapp) -Transflect is a **Kubernetes operator** that uses **Istio** to set up -Envoy's [gRPC-JSON transcoding](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter), +Transflect is a **Kubernetes operator** that uses **Istio** to set up Envoy's +[gRPC-JSON transcoding](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter), mapping HTTP/JSON APIs to gRPC services. The transcoding is applied to -Kubernetes deployments with transflect-specific annotations. [gRPC server -reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) is required. +Kubernetes deployments with transflect-specific annotations. +[gRPC server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) +is required. -Transflect creates a **HTTP/JSON** or **REST API** that is fully defined -in the `.proto` files of the gRPC service, similar to how -[Google Cloud Endpoints](https://cloud.google.com/endpoints/docs/grpc/transcoding) define their -HTTP/JSON APIs. +Transflect creates a **HTTP/JSON** or **REST API** that is fully defined in the +`.proto` files of the gRPC service, similar to how +[Google Cloud Endpoints](https://cloud.google.com/endpoints/docs/grpc/transcoding) +define their HTTP/JSON APIs. ## What problem does transflect solve? -Transflect moves HTTP/JSON to gRPC mapping into the service mesh, just -as other common (micro-)service requirements like mTLS or observability -have been moved out of the application logic into the cluster -infrastructure. Benefits include lower developer effort, better -decoupling, consistent results across application frameworks and -languages. +Transflect moves HTTP/JSON to gRPC mapping into the service mesh, just as other +common (micro-)service requirements like mTLS or observability have been moved +out of the application logic into the cluster infrastructure. Benefits include +lower developer effort, better decoupling, consistent results across application +frameworks and languages. ## Transcoding in Action -You can try out transflect locally without Kubernetes or Istio. -Transflect can generate a gRPC-JSON transcoding Envoy configuration for -an Envoy proxy run in front of your local gRPC service. +You can try out transflect locally without Kubernetes or Istio. Transflect can +generate a gRPC-JSON transcoding Envoy configuration for an Envoy proxy run in +front of your local gRPC service. -_Pre-requisites:_ git, curl, docker (tested with version 20.10). Other - tools, including Envoy, are bootstrapped with [hermit](https://cashapp.github.io/hermit/). +_Pre-requisites:_ git, curl, docker (tested with version 20.10). Other tools, +including Envoy, are bootstrapped with +[hermit](https://cashapp.github.io/hermit/). Clone this repo and build transflect - . ./bin/activate-hermit - make install + . ./bin/activate-hermit + make install -Start a gRPC server with [gRPC server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) +Start a gRPC server with +[gRPC server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) - make run-testserver # starts a gRPC server on :9090, use your command instead + make run-testserver # starts a gRPC server on :9090, use your command instead In a second terminal generate the Envoy config and start the Envoy proxy - transflect --plaintext --format envoy --http-port 9999 localhost:9090 envoy.yaml - envoy -c envoy.yaml + transflect --plaintext --format envoy --http-port 9999 localhost:9090 envoy.yaml + envoy -c envoy.yaml In a third terminal test the HTTP/JSON API - curl localhost:9999/api/echo/hello -d '{"message": "👋"}' + curl localhost:9999/api/echo/hello -d '{"message": "👋"}' with expected output - { "response": "And to you: 👋" } + { "response": "And to you: 👋" } ## Usage ### Annotate your `.proto` files -Use the `google.api.http` option to annotate gRPC service methods with -their HTTP/JSON API mappings +Use the `google.api.http` option to annotate gRPC service methods with their +HTTP/JSON API mappings ```proto rpc GetShelf(GetShelfRequest) returns (Shelf) { @@ -69,8 +71,9 @@ message GetShelfRequest { ``` If there is no annotation specified transflect auto-generates -`{post: "/api/./"}`. -See [google/api/http.proto](https://github.com/googleapis/googleapis/blob/master/google/api/http.proto) for more details on http annotations. +`{post: "/api/./"}`. See +[google/api/http.proto](https://github.com/googleapis/googleapis/blob/master/google/api/http.proto) +for more details on http annotations. ### Annotate your deployment @@ -83,76 +86,76 @@ kind: Deployment metadata: name: your-deployment annotations: - transflect.cash.squareup.com/port: "9090" + transflect.cash.squareup.com/port: '9090' # ... ``` -You will need transflect running in your cluster for the annotation to -have an effect. See [deployment/transflect.yaml](deployment/transflect.yaml) -for a sample transflect Deployment used in integration tests. +You will need transflect running in your cluster for the annotation to have an +effect. See [deployment/transflect.yaml](deployment/transflect.yaml) for a +sample transflect Deployment used in integration tests. -You can pull the pre-built [transflect docker image](https://hub.docker.com/r/cashapp/transflect) -from Dockerhub with `docker pull cashapp/transflect`. +You can pull the pre-built +[transflect docker image](https://hub.docker.com/r/cashapp/transflect) from +Dockerhub with `docker pull cashapp/transflect`. ## Local development -_Pre-requisites:_ git, curl, docker (tested with version 20.10). Other - tools are bootstrapped with [hermit](https://cashapp.github.io/hermit/). +_Pre-requisites:_ git, curl, docker (tested with version 20.10). Other tools are +bootstrapped with [hermit](https://cashapp.github.io/hermit/). Activate your hermit environment with - . ./bin/activate-hermit + . ./bin/activate-hermit For a first time setup of the local `k3d` cluster, your root password is -required. The setup updates `/etc/hosts` with an entry for the local -Docker registry used by the cluster. Run +required. The setup updates `/etc/hosts` with an entry for the local Docker +registry used by the cluster. Run - make setup + make setup -You can build and test locally, run CI integration tests and see make targets with +You can build and test locally, run CI integration tests and see make targets +with - make - make ci - make help + make + make ci + make help -For a quick-feedback development cycle you can run the transflect -operator on your host machine (e.g. Mac laptop), outside the cluster +For a quick-feedback development cycle you can run the transflect operator on +your host machine (e.g. Mac laptop), outside the cluster - make cluster-create # Create lightweight local cluster - kubectl apply -f deployment/guppyecho.yaml # Add sample gRPC deployment - make run-local-operator # Start transflect on host machine + make cluster-create # Create lightweight local cluster + kubectl apply -f deployment/guppyecho.yaml # Add sample gRPC deployment + make run-local-operator # Start transflect on host machine -Use Ctrl + \\ to instantly shutdown the local -operator with `SIGQUIT`. `SIGINT` will first clean up resources. +Use Ctrl + \\ to instantly shutdown the local operator +with `SIGQUIT`. `SIGINT` will first clean up resources. Modify the operator sources and re-run `make run-local-operator`. -To clean-up, you can remove the entire local cluster with `make -cluster-delete` or reduce it to the initial bare-bones installation -with `make cluster-clean`. +To clean-up, you can remove the entire local cluster with `make cluster-delete` +or reduce it to the initial bare-bones installation with `make cluster-clean`. ## Under the hood -Follow the [local cluster guide](docs/local-cluster.md) to set up a -lightweight test cluster with `k3d` and see transflect in action -on Kubernetes. The same setup is used in CI and integration tests. +Follow the [local cluster guide](docs/local-cluster.md) to set up a lightweight +test cluster with `k3d` and see transflect in action on Kubernetes. The same +setup is used in CI and integration tests. -Read up on the [inner workings of transflect](docs/inner-workings.md) to -better understand which events transflect observes in the Kubernetes -cluster and what resources it creates. +Read up on the [inner workings of transflect](docs/inner-workings.md) to better +understand which events transflect observes in the Kubernetes cluster and what +resources it creates. --- Copyright 2021 Square, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of the +License at http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Unless required by applicable law or agreed to in writing, software distributed +under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. diff --git a/docs/inner-workings.md b/docs/inner-workings.md index e69bc51..4ff60eb 100644 --- a/docs/inner-workings.md +++ b/docs/inner-workings.md @@ -1,49 +1,52 @@ # Inner workings of transflect -Transflect is a **Kubernetes operator** that uses **Istio** to set up -Envoy's [gRPC-JSON transcoding](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter) via the Istio **EnvoyFilter** resources. - -An [`EnvoyFilter`](https://istio.io/latest/docs/reference/config/networking/envoy-filter/) -is a Kubernetes custom resource specific to Istio. [Istiod](https://istio.io/latest/docs/ops/deployment/architecture) -watches EnvoyFilters. It updates the Envoy sidecar configurations of all -Pods with a matching `workloadSelector`. - -The EnvoyFilter created by transflect sets up a [gRPC-JSON transcoder -filter](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter) +Transflect is a **Kubernetes operator** that uses **Istio** to set up Envoy's +[gRPC-JSON transcoding](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter) +via the Istio **EnvoyFilter** resources. + +An +[`EnvoyFilter`](https://istio.io/latest/docs/reference/config/networking/envoy-filter/) +is a Kubernetes custom resource specific to Istio. +[Istiod](https://istio.io/latest/docs/ops/deployment/architecture) watches +EnvoyFilters. It updates the Envoy sidecar configurations of all Pods with a +matching `workloadSelector`. + +The EnvoyFilter created by transflect sets up a +[gRPC-JSON transcoder filter](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/grpc_json_transcoder_filter) for all pods of a deployment with transflect annotation. -The transflect operator runs a control loop keeping resources in sync and ensuring -that there is always a transflect EnvoyFilter for any Kubernetes -Deployment that should have one and none extra. It also ensures that -the EnvoyFilter is upgraded with new gRPC API rollouts. +The transflect operator runs a control loop keeping resources in sync and +ensuring that there is always a transflect EnvoyFilter for any Kubernetes +Deployment that should have one and none extra. It also ensures that the +EnvoyFilter is upgraded with new gRPC API rollouts. -A transflect EnvoyFilter resource contains dynamic information -retrieved from the Deployment's gRPC reflection API: +A transflect EnvoyFilter resource contains dynamic information retrieved from +the Deployment's gRPC reflection API: - List of gRPC services, see `services` -- Base64 encoded protoset file, ie compiled proto files, see`proto_descriptor_bin` +- Base64 encoded protoset file, ie compiled proto files, + see`proto_descriptor_bin` Sample abridged EnvoyFilter resource for deployment `appy` ```yaml - workloadSelector: +workloadSelector: labels: app: appy - configPatches: - - patch: - value: - name: envoy.filters.http.grpc_json_transcoder - typed_config: - services: [appy.AppyService, grpc.reflection.v1alpha.ServerReflection] - proto_descriptor_bin: CrE7CiBnb29nbGUvcHJ...= +configPatches: + - patch: + value: + name: envoy.filters.http.grpc_json_transcoder + typed_config: + services: [appy.AppyService, grpc.reflection.v1alpha.ServerReflection] + proto_descriptor_bin: CrE7CiBnb29nbGUvcHJ...= ``` -For service continuity it is assumed that only backwards compatible API -changes are rolled out. +For service continuity it is assumed that only backwards compatible API changes +are rolled out. -In order to guarantee that only Pods with new versions of an API are -being queried during a rollout a temporary Kubernetes Service is set up -connecting only to the new pods. +In order to guarantee that only Pods with new versions of an API are being +queried during a rollout a temporary Kubernetes Service is set up connecting +only to the new pods. ![Transflect k8s diagram](transflect.svg) - diff --git a/docs/local-cluster.md b/docs/local-cluster.md index 79013cd..43aa342 100644 --- a/docs/local-cluster.md +++ b/docs/local-cluster.md @@ -1,36 +1,36 @@ # Set up `transflect` in local cluster -Pre-requisites: git, curl, docker (tested with version 20.10) -Other tools are self-bootstrapped with [hermit](https://cashapp.github.io/hermit/). -Activate hermit with +Pre-requisites: git, curl, docker (tested with version 20.10) Other tools are +self-bootstrapped with [hermit](https://cashapp.github.io/hermit/). Activate +hermit with - . ./bin/activate-hermit + . ./bin/activate-hermit ## First time setup Clone this repo, activate hermit and run first time setup - . ./bin/activate-hermit - make setup + . ./bin/activate-hermit + make setup -`make setup` requires your root password to update `/etc/hosts` with an -entry for the local Docker registry used by the k3d cluster. +`make setup` requires your root password to update `/etc/hosts` with an entry +for the local Docker registry used by the k3d cluster. ## Create a local cluster Setup lightweight, local [`k3d`](https://k3d.io/) Kubernetes cluster with - make cluster-create + make cluster-create you can inspect your bare-bones local transflect cluster with - $ k3d node list - NAME ROLE CLUSTER STATUS - k3d-registry.transflect.local registry running - k3d-transflect-server-0 server transflect running - k3d-transflect-serverlb loadbalancer transflect running + $ k3d node list + NAME ROLE CLUSTER STATUS + k3d-registry.transflect.local registry running + k3d-transflect-server-0 server transflect running + k3d-transflect-serverlb loadbalancer transflect running - $ kubectl get deployments --all-namespaces + $ kubectl get deployments --all-namespaces NAMESPACE NAME READY UP-TO-DATE AVAILABLE AGE kube-system local-path-provisioner 1/1 1 1 24m kube-system metrics-server 1/1 1 1 24m @@ -42,11 +42,11 @@ you can inspect your bare-bones local transflect cluster with Add the prepared gRPC echo service deployment with - kubectl apply -f deployment/guppyecho.yaml + kubectl apply -f deployment/guppyecho.yaml you can inspect pods with - $ kubectl get pods -n guppyecho + $ kubectl get pods -n guppyecho NAME READY STATUS RESTARTS AGE guppyecho-54db9d96c8-89qr9 2/2 Running 0 91s guppyecho-54db9d96c8-cw9cl 2/2 Running 0 90s @@ -54,55 +54,57 @@ you can inspect pods with and gRPC service with - $ grpcurl --authority "guppyecho.local" -plaintext 127.0.0.1:80 list - echo.Echo - grpc.reflection.v1alpha.ServerReflection + $ grpcurl --authority "guppyecho.local" -plaintext 127.0.0.1:80 list + echo.Echo + grpc.reflection.v1alpha.ServerReflection - $ grpcurl --authority "guppyecho.local" -plaintext 127.0.0.1:80 describe - ... + $ grpcurl --authority "guppyecho.local" -plaintext 127.0.0.1:80 describe + ... -Notice that at this stage no HTTP/JSON endpoints or `EnvoyFilter` -resources are _not yet_ available +Notice that at this stage no HTTP/JSON endpoints or `EnvoyFilter` resources are +_not yet_ available - $ curl http://127.0.0.1/api/echo/hello -H "Host: guppyecho.local" -d '{"message": "👋"}' - /api/echo/hello: not Implemented + $ curl http://127.0.0.1/api/echo/hello -H "Host: guppyecho.local" -d '{"message": "👋"}' + /api/echo/hello: not Implemented - $ kubectl get envoyfilter -n guppyecho - No resources found in guppyecho namespace. + $ kubectl get envoyfilter -n guppyecho + No resources found in guppyecho namespace. -Aside: the HTTP/JSON path `/api/echo/hello` is defined in [echo.proto](https://github.com/juliaogris/guppy/blob/v0.0.6/protos/echo/echo.proto#L11). +Aside: the HTTP/JSON path `/api/echo/hello` is defined in +[echo.proto](https://github.com/juliaogris/guppy/blob/v0.0.6/protos/echo/echo.proto#L11). ## Add Kubernetes deployment for `transflect` Apply transflect deployment - kubectl apply -f deployment/transflect.yaml + kubectl apply -f deployment/transflect.yaml and watch the logs with - kubectl logs -l app=transflect -n transflect + kubectl logs -l app=transflect -n transflect ## Test HTTP/JSON API You can now successfully call the above `curl` command and find the transflect-created `EnvoyFilter` resource - $ curl http://127.0.0.1/api/echo/hello -H "Host: guppyecho.local" -d '{"message": "👋"}' - {"response":"And to you: 👋"} +$ curl http://127.0.0.1/api/echo/hello -H "Host: guppyecho.local" -d +'{"message": "👋"}' {"response":"And to you: 👋"} - $ kubectl get envoyfilter -n guppyecho - NAME AGE - guppyecho-transflect 17m + $ kubectl get envoyfilter -n guppyecho + NAME AGE + guppyecho-transflect 17m ## Cleanup -Remove the entire local cluster with `make cluster-delete` or reduce it -to the initial bare-bones installation with `make cluster-clean`. +Remove the entire local cluster with `make cluster-delete` or reduce it to the +initial bare-bones installation with `make cluster-clean`. ## Further reading - `Local cluster` section in the [Makefile](Makefile) -- Kubernetes deployments in [deployment/](deployment/) directory (apps and transflect) +- Kubernetes deployments in [deployment/](deployment/) directory (apps and + transflect) - Integration tests in [cluster-test.sh](cluster-test.sh) -- Sample Istio EnvoyFilter resource [istio/envoy-filter.yaml](istio/envoy-filter.yaml) - +- Sample Istio EnvoyFilter resource + [istio/envoy-filter.yaml](istio/envoy-filter.yaml) From 1891d37211e20768c7789374692d717fa43af139 Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 19:14:41 +1000 Subject: [PATCH 2/8] Reformat transflect deployment yaml file Reformat transflect deployment yaml file for the ClusterRole to read more compact. In a follow-up commit we will add a further CluterRole rule for `leases.coordination.k8s.io`. It is easier to take these in a more compact format me thinks. --- deployment/transflect.yaml | 56 ++++++++------------------------------ 1 file changed, 12 insertions(+), 44 deletions(-) diff --git a/deployment/transflect.yaml b/deployment/transflect.yaml index 8acb279..3cf8f98 100644 --- a/deployment/transflect.yaml +++ b/deployment/transflect.yaml @@ -42,50 +42,18 @@ kind: ClusterRole metadata: name: transflect rules: - - apiGroups: - - apps - resources: - - replicasets - verbs: - - get - - list - - watch - - apiGroups: - - apps - resources: - - deployment - verbs: - - get - - list - - watch - - apiGroups: - - '' - resources: - - services - verbs: - - create - - delete - - get - - update - - apiGroups: - - networking.k8s.io - resources: - - ingresses - verbs: - - create - - delete - - get - - update - - apiGroups: - - networking.istio.io - resources: - - envoyfilters - verbs: - - create - - delete - - get - - list - - update + - apiGroups: [ apps ] + resources: [ replicasets, deployments ] + verbs: [ get, list, watch ] + - apiGroups: [ '' ] + resources: [ services ] + verbs: [ create, delete, get, update ] + - apiGroups: [ networking.k8s.io ] + resources: [ ingresses ] + verbs: [ create, delete, get, update ] + - apiGroups: [ networking.istio.io ] + resources: [ envoyfilters ] + verbs: [ create, delete, get, list, update ] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding From 2109a176b7fba431359db39bac92a621f26f8908 Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 11:12:04 +1000 Subject: [PATCH 3/8] Refactor operator main Refactor operator main and move probes server into separate file to keep the main file lean. This is in preparation for adding leader election code which will require a bit more rework. --- cmd/transflect-operator/main.go | 40 +++-------------------------- cmd/transflect-operator/probes.go | 42 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 36 deletions(-) create mode 100644 cmd/transflect-operator/probes.go diff --git a/cmd/transflect-operator/main.go b/cmd/transflect-operator/main.go index 0d47e41..b76a792 100644 --- a/cmd/transflect-operator/main.go +++ b/cmd/transflect-operator/main.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "net/http" "os" "os/signal" "syscall" @@ -17,7 +16,10 @@ import ( "golang.org/x/sync/errgroup" ) -var version = "v0.0.0" +var ( + version = "v0.0.0" + errSignal = fmt.Errorf("received shutdown signal") +) type config struct { UseIngress bool `short:"i" help:"Create and use temporary ingress to access temporary service, e.g. from outside cluster"` @@ -37,40 +39,6 @@ func main() { } } -type probesServer struct { - http.Server -} - -func newProbesServer() *probesServer { - ok := func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "OK") - } - mux := http.NewServeMux() - mux.HandleFunc("/_liveness", ok) - mux.HandleFunc("/_readiness", ok) - return &probesServer{ - Server: http.Server{ - Addr: ":8080", - Handler: mux, - }, - } -} - -func (s *probesServer) start() error { - if err := s.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { - return errors.Wrap(err, "cannot start probes server") - } - return nil -} - -func (s *probesServer) stop() { - if err := s.Shutdown(context.Background()); err != nil { - log.Logger.Error().Err(err).Msg("cannot stop HTTP server") - } -} - -var errSignal = fmt.Errorf("received shutdown signal") - func run(cfg *config) error { setupLogging(cfg) diff --git a/cmd/transflect-operator/probes.go b/cmd/transflect-operator/probes.go new file mode 100644 index 0000000..f467af5 --- /dev/null +++ b/cmd/transflect-operator/probes.go @@ -0,0 +1,42 @@ +package main + +import ( + "context" + "fmt" + "net/http" + + "github.com/pkg/errors" + "github.com/rs/zerolog/log" +) + +type probesServer struct { + http.Server +} + +func newProbesServer() *probesServer { + ok := func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "OK") + } + mux := http.NewServeMux() + mux.HandleFunc("/_liveness", ok) + mux.HandleFunc("/_readiness", ok) + return &probesServer{ + Server: http.Server{ + Addr: ":8080", + Handler: mux, + }, + } +} + +func (s *probesServer) start() error { + if err := s.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + return errors.Wrap(err, "cannot start probes server") + } + return nil +} + +func (s *probesServer) stop() { + if err := s.Shutdown(context.Background()); err != nil { + log.Logger.Error().Err(err).Msg("cannot stop HTTP server") + } +} From 01f980b3c0a9b14efa279a2e9b5625337042c4c9 Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 19:30:06 +1000 Subject: [PATCH 4/8] Update inline code comments comment Update inline code comments comment for readability. --- cmd/transflect-operator/operator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/transflect-operator/operator.go b/cmd/transflect-operator/operator.go index e6c8be0..3fa3b30 100644 --- a/cmd/transflect-operator/operator.go +++ b/cmd/transflect-operator/operator.go @@ -43,8 +43,8 @@ type operator struct { // activeState[deploymentKey] = { revision, grpcPort } activeState sync.Map - // serialise all operations on a deployment, - // so that no concurrent operations on a single deployment are possible + // deploymentLocker synchronises operations on a deployment + // so that no two updates for a single deployment can run concurrently. deploymentLocker transflect.MutexMap useIngress bool @@ -221,7 +221,7 @@ func (o *operator) runWorkers(cnt int) { } func (o *operator) runWorker() { - for o.next() { + for o.next() { // process next enqueued Replicaset } o.wg.Done() } @@ -290,7 +290,7 @@ func (o *operator) shouldProcess(rs *appsv1.ReplicaSet) bool { deployKey, _ := getDeploymentKey(rs) port := grpcPort(rs) v, existing := o.activeState.Load(deployKey) - if existing { + if existing { // EnvoyFilter for given deployment exists active, _ := v.(activeEntry) // Ignore candidate if we have an existing filter but the candidate is // old or has not changed the port. From 30a6964fa02247fec3f33ec70a0612b0b0fe3748 Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 19:31:43 +1000 Subject: [PATCH 5/8] Rework operator initialisation Rework operator initialisation so that on `start` call non-constant operator fields and structures, such as informers, stopper channel get re-initialised. This is a preparatory step for adding leader-election, where these structures need to be re-initialised when leadership / execution is re-gained. --- cmd/transflect-operator/operator.go | 39 +++++++++++++++-------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/cmd/transflect-operator/operator.go b/cmd/transflect-operator/operator.go index 3fa3b30..51219f1 100644 --- a/cmd/transflect-operator/operator.go +++ b/cmd/transflect-operator/operator.go @@ -64,24 +64,9 @@ func newOperator(cfg *config) (*operator, error) { return nil, err } - informerFactory := informers.NewSharedInformerFactory(k8s, time.Second*30) - - rsInformer := informerFactory.Apps().V1().ReplicaSets() - if err := rsInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil { - return nil, errors.Wrap(err, "cannot add custom error handler to replicaset informer") - } - deployInformer := informerFactory.Apps().V1().Deployments() - if err := deployInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil { - return nil, errors.Wrap(err, "cannot add custom error handler to deployment informer") - } - op := &operator{ - k8s: k8s, - istio: istio, - rsInformer: rsInformer, - deployInformer: deployInformer, - queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - stopper: make(chan struct{}), + k8s: k8s, + istio: istio, useIngress: cfg.UseIngress, plaintext: cfg.Plaintext, @@ -96,20 +81,36 @@ func watchErrorHandler(_ *cache.Reflector, err error) { } func (o *operator) start() error { + // Initialise activeState for existing transflect EnvoyFilters + // to determine if EnvoyFilter upsert should be processed. if err := o.syncActive(); err != nil { return fmt.Errorf("cannot start operator: %w", err) } + // Initialise work-queue and stopper channel + o.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + o.stopper = make(chan struct{}) + + // Initialise informers + informerFactory := informers.NewSharedInformerFactory(o.k8s, time.Second*30) + o.rsInformer = informerFactory.Apps().V1().ReplicaSets() + if err := o.rsInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil { + return errors.Wrap(err, "cannot add custom error handler to replicaset informer") + } o.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: o.enqueue, UpdateFunc: o.enqueueWithOld, }) - + o.deployInformer = informerFactory.Apps().V1().Deployments() + if err := o.deployInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil { + return errors.Wrap(err, "cannot add custom error handler to deployment informer") + } o.deployInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: o.cleanupDeployment, }) + log.Debug().Msg("Start leading: informer event handlers registered") - log.Debug().Msg("Starting: informer event handlers registered") + // Run workers and informers o.runWorkers(42) o.rsInformer.Informer().Run(o.stopper) o.deployInformer.Informer().Run(o.stopper) From eb4bf03b0617e0ab8d716589987fefd409bceecb Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 19:42:39 +1000 Subject: [PATCH 6/8] Change function order Change function order for readability. This a pure code reorganisation within the operator.go file on a function level without any further additions. This is another step towards adding leader election. Readability has been optimised for the final code listing with leader-election, however to keep the leader-election change small, pure code movements have been moved out to this commit. Signed-off-by: Julia Ogris --- cmd/transflect-operator/operator.go | 196 ++++++++++++++-------------- 1 file changed, 98 insertions(+), 98 deletions(-) diff --git a/cmd/transflect-operator/operator.go b/cmd/transflect-operator/operator.go index 51219f1..22466e4 100644 --- a/cmd/transflect-operator/operator.go +++ b/cmd/transflect-operator/operator.go @@ -76,10 +76,6 @@ func newOperator(cfg *config) (*operator, error) { return op, nil } -func watchErrorHandler(_ *cache.Reflector, err error) { - log.Debug().Err(err).Msg("ListAndWatch dropped the connection with an error, back-off and retry") -} - func (o *operator) start() error { // Initialise activeState for existing transflect EnvoyFilters // to determine if EnvoyFilter upsert should be processed. @@ -121,56 +117,6 @@ func (o *operator) start() error { return nil } -func (o *operator) syncActive() error { - opts := metav1.ListOptions{ - LabelSelector: "app=transflect", - Limit: 42, - } - b := backoff.Backoff{Min: 2 * time.Second} - ctx := context.Background() - for { - list, err := o.istio.EnvoyFilters("").List(ctx, opts) - if err != nil { - if int(b.Attempt()) > 10 { - return fmt.Errorf("cannot list existing EnvoyFilters to sync active state, ran out of attempts") - } - time.Sleep(b.Duration()) - continue - } - b.Reset() - for _, filter := range list.Items { - key, entry, err := getActiveEntry(filter) - if err != nil { - log.Error().Err(err).Str("envoyfilter", filter.Name).Msg("Cannot be synced") - continue - } - o.activeState.Store(key, entry) - log.Debug().Str("deploymentKey", key).Int("revision", entry.revision).Uint32("port", entry.grpcPort).Msg("synced active state") - } - if opts.Continue == "" { - return nil - } - } -} - -func getActiveEntry(filter istionet.EnvoyFilter) (string, activeEntry, error) { - a := filter.Annotations - key := a["transflect.cash.squareup.com/deployment"] - if key == "" { - return "", activeEntry{}, fmt.Errorf("cannot retrieve deployment key from existing EnvoyFilter") - } - port := grpcPortStr(filter.Annotations["transflect.cash.squareup.com/port"]) - if port == 0 { - return "", activeEntry{}, fmt.Errorf("cannot retrieve grpc port from existing EnvoyFilter") - } - revision := deployRevisionStr(filter.Annotations["deployment.kubernetes.io/revision"]) - if revision == 0 { - return "", activeEntry{}, fmt.Errorf("cannot retrieve deployment revision from existing EnvoyFilter") - } - entry := activeEntry{grpcPort: port, revision: revision} - return key, entry, nil -} - func (o *operator) enqueue(v interface{}) { rs, ok := v.(*appsv1.ReplicaSet) if !ok { @@ -199,21 +145,6 @@ func shouldEnqueue(rs *appsv1.ReplicaSet) bool { return true } -func (o *operator) cleanupDeployment(v interface{}) { - d, ok := v.(*appsv1.Deployment) - if !ok { - log.Error().Interface("object", v).Msg("Cannot convert object to Deployment for cleanup") - return - } - key, err := cache.MetaNamespaceKeyFunc(d) - if err != nil { - log.Error().Err(err).Str("deployment", d.Name).Msg("Cannot create deployment Key, skip EnvoyFilter cleanup") - return - } - o.activeState.Delete(key) - o.deploymentLocker.Remove(key) -} - func (o *operator) runWorkers(cnt int) { for i := 0; i < cnt; i++ { o.wg.Add(1) @@ -317,35 +248,6 @@ func (o *operator) shouldProcess(rs *appsv1.ReplicaSet) bool { return true } -func (o *operator) getReplicaset(key string) (*appsv1.ReplicaSet, error) { - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return nil, errors.Wrapf(err, "invalid key format") - } - rs, err := o.rsInformer.Lister().ReplicaSets(namespace).Get(name) - if err != nil { - return nil, errors.Wrapf(err, "cannot GET replicaset %s", name) - } - return rs, nil -} - -func getClientSets() (*kubernetes.Clientset, *istio.NetworkingV1alpha3Client, error) { - cfg, err := getConfig() - if err != nil { - return nil, nil, errors.Wrap(err, "cannot get Kubernetes config") - } - k8s, err := kubernetes.NewForConfig(cfg) - if err != nil { - return nil, nil, errors.Wrap(err, "cannot get Kubernetes ClientSet") - } - - istioClient, err := istio.NewForConfig(cfg) - if err != nil { - return nil, nil, errors.Wrap(err, "cannot get Istio ClientSet") - } - return k8s, istioClient, nil -} - func grpcPort(rs *appsv1.ReplicaSet) uint32 { return grpcPortStr(rs.Annotations["transflect.cash.squareup.com/port"]) } @@ -390,6 +292,35 @@ func getDeployment(rs *appsv1.ReplicaSet) (metav1.OwnerReference, bool) { return metav1.OwnerReference{}, false } +func (o *operator) getReplicaset(key string) (*appsv1.ReplicaSet, error) { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return nil, errors.Wrapf(err, "invalid key format") + } + rs, err := o.rsInformer.Lister().ReplicaSets(namespace).Get(name) + if err != nil { + return nil, errors.Wrapf(err, "cannot GET replicaset %s", name) + } + return rs, nil +} + +func getClientSets() (*kubernetes.Clientset, *istio.NetworkingV1alpha3Client, error) { + cfg, err := getConfig() + if err != nil { + return nil, nil, errors.Wrap(err, "cannot get Kubernetes config") + } + k8s, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, nil, errors.Wrap(err, "cannot get Kubernetes ClientSet") + } + + istioClient, err := istio.NewForConfig(cfg) + if err != nil { + return nil, nil, errors.Wrap(err, "cannot get Istio ClientSet") + } + return k8s, istioClient, nil +} + func getConfig() (*rest.Config, error) { cfg, err := rest.InClusterConfig() if err == nil { @@ -413,3 +344,72 @@ func getConfig() (*rest.Config, error) { } return kubeCfg, nil } + +func watchErrorHandler(_ *cache.Reflector, err error) { + log.Debug().Err(err).Msg("ListAndWatch dropped the connection with an error, back-off and retry") +} + +func (o *operator) syncActive() error { + opts := metav1.ListOptions{ + LabelSelector: "app=transflect", + Limit: 42, + } + b := backoff.Backoff{Min: 2 * time.Second} + ctx := context.Background() + for { + list, err := o.istio.EnvoyFilters("").List(ctx, opts) + if err != nil { + if int(b.Attempt()) > 10 { + return fmt.Errorf("cannot list existing EnvoyFilters to sync active state, ran out of attempts") + } + time.Sleep(b.Duration()) + continue + } + b.Reset() + for _, filter := range list.Items { + key, entry, err := getActiveEntry(filter) + if err != nil { + log.Error().Err(err).Str("envoyfilter", filter.Name).Msg("Cannot be synced") + continue + } + o.activeState.Store(key, entry) + log.Debug().Str("deploymentKey", key).Int("revision", entry.revision).Uint32("port", entry.grpcPort).Msg("synced active state") + } + if opts.Continue == "" { + return nil + } + } +} + +func getActiveEntry(filter istionet.EnvoyFilter) (string, activeEntry, error) { + a := filter.Annotations + key := a["transflect.cash.squareup.com/deployment"] + if key == "" { + return "", activeEntry{}, fmt.Errorf("cannot retrieve deployment key from existing EnvoyFilter") + } + port := grpcPortStr(filter.Annotations["transflect.cash.squareup.com/port"]) + if port == 0 { + return "", activeEntry{}, fmt.Errorf("cannot retrieve grpc port from existing EnvoyFilter") + } + revision := deployRevisionStr(filter.Annotations["deployment.kubernetes.io/revision"]) + if revision == 0 { + return "", activeEntry{}, fmt.Errorf("cannot retrieve deployment revision from existing EnvoyFilter") + } + entry := activeEntry{grpcPort: port, revision: revision} + return key, entry, nil +} + +func (o *operator) cleanupDeployment(v interface{}) { + d, ok := v.(*appsv1.Deployment) + if !ok { + log.Error().Interface("object", v).Msg("Cannot convert object to Deployment for cleanup") + return + } + key, err := cache.MetaNamespaceKeyFunc(d) + if err != nil { + log.Error().Err(err).Str("deployment", d.Name).Msg("Cannot create deployment Key, skip EnvoyFilter cleanup") + return + } + o.activeState.Delete(key) + o.deploymentLocker.Remove(key) +} From e52f532edbfb9551cefdc3df046f9574a3db720b Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Mon, 6 Sep 2021 19:51:36 +1000 Subject: [PATCH 7/8] Reorganise Replicaset/Filter processing Reorganise Replicaset/Filter processing in opertor.go, for new workloads retrieved from the workqueue. This is again purely a refactor without any functional additions. --- cmd/transflect-operator/operator.go | 58 ++++++++++++++--------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/cmd/transflect-operator/operator.go b/cmd/transflect-operator/operator.go index 22466e4..50fd1b8 100644 --- a/cmd/transflect-operator/operator.go +++ b/cmd/transflect-operator/operator.go @@ -180,46 +180,23 @@ func (o *operator) next() bool { log.Error().Err(err).Msg("Cannot get next queued Replicaset") return true } - if err := o.processFilter(rs); err != nil { - log.Error().Err(err).Str("replica", rs.Name).Msg("Cannot process EnvoyFilter for Replicaset") - } - return true -} -func (o *operator) processFilter(rs *appsv1.ReplicaSet) error { deployKey, _ := getDeploymentKey(rs) o.deploymentLocker.Lock(deployKey) defer o.deploymentLocker.Unlock(deployKey) - if !o.shouldProcess(rs) { - return nil - } - port := grpcPort(rs) - if port == 0 { - if err := o.deleteFilter(context.Background(), rs); err != nil { - if !k8errors.IsNotFound(err) { - return err - } - log.Warn().Err(err).Str("replica", rs.Name).Msg("Cannot delete EnvoyFilter because it cannot be found") - } - o.activeState.Delete(deployKey) - o.deploymentLocker.Remove(deployKey) - return nil - } - if err := o.upsertFilter(context.Background(), rs); err != nil { - return err + if o.shouldProcessReplicaset(rs, deployKey) { + if err := o.processReplicaset(rs, deployKey); err != nil { + log.Error().Err(err).Str("replica", rs.Name).Msg("Cannot process EnvoyFilter for Replicaset") + } } - revision := deployRevision(rs) - active := activeEntry{grpcPort: port, revision: revision} - o.activeState.Store(deployKey, active) - return nil + return true } -func (o *operator) shouldProcess(rs *appsv1.ReplicaSet) bool { +func (o *operator) shouldProcessReplicaset(rs *appsv1.ReplicaSet, deployKey string) bool { if rs.Status.ReadyReplicas == 0 { return false } - deployKey, _ := getDeploymentKey(rs) port := grpcPort(rs) v, existing := o.activeState.Load(deployKey) if existing { // EnvoyFilter for given deployment exists @@ -248,6 +225,29 @@ func (o *operator) shouldProcess(rs *appsv1.ReplicaSet) bool { return true } +func (o *operator) processReplicaset(rs *appsv1.ReplicaSet, deployKey string) error { + port := grpcPort(rs) + if port == 0 { + if err := o.deleteFilter(context.Background(), rs); err != nil { + if !k8errors.IsNotFound(err) { + return err + } + log.Warn().Err(err).Str("replica", rs.Name).Msg("Cannot delete EnvoyFilter because it cannot be found") + } + o.activeState.Delete(deployKey) + o.deploymentLocker.Remove(deployKey) + return nil + } + + if err := o.upsertFilter(context.Background(), rs); err != nil { + return err + } + revision := deployRevision(rs) + active := activeEntry{grpcPort: port, revision: revision} + o.activeState.Store(deployKey, active) + return nil +} + func grpcPort(rs *appsv1.ReplicaSet) uint32 { return grpcPortStr(rs.Annotations["transflect.cash.squareup.com/port"]) } From 0319b44d15e4757472f14e6c49825a2977cb3753 Mon Sep 17 00:00:00 2001 From: Julia Ogris Date: Tue, 7 Sep 2021 10:12:13 +1000 Subject: [PATCH 8/8] Fix informer run call Fix informer call to start in separate go routine so that the deployment informer is actually executed concurrently with the replicaset informer. oups. --- cmd/transflect-operator/operator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/transflect-operator/operator.go b/cmd/transflect-operator/operator.go index 50fd1b8..464bb25 100644 --- a/cmd/transflect-operator/operator.go +++ b/cmd/transflect-operator/operator.go @@ -108,8 +108,9 @@ func (o *operator) start() error { // Run workers and informers o.runWorkers(42) - o.rsInformer.Informer().Run(o.stopper) - o.deployInformer.Informer().Run(o.stopper) + go o.rsInformer.Informer().Run(o.stopper) + go o.deployInformer.Informer().Run(o.stopper) + <-o.stopper // When run finishes e.g. because of signal calling o.stop(): log.Debug().Msg("Shutting down queue")