Rewrite controller (#60)

Co-authored-by: Tejas Dinkar <tejas@gja.in>
This commit is contained in:
Marc-Antoine 2021-04-25 11:43:40 +02:00 committed by GitHub
parent 66c52c682f
commit 50f6941db3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 2090 additions and 2091 deletions

View File

@ -1,20 +0,0 @@
name: Docker Image CI
on:
push:
branches: [master]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build and push Docker images
uses: docker/build-push-action@v1.1.0
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
repository: caddy/ingress
tag_with_ref: true
tag_with_sha: true

View File

@ -1,54 +0,0 @@
name: Lint and Test Charts
on: pull_request
jobs:
lint-test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Fetch history
run: git fetch --prune --unshallow
- name: Run chart-testing (lint)
id: lint
uses: helm/chart-testing-action@v1.0.0-rc.1
with:
image: quay.io/helmpack/chart-testing:v3.0.0-rc.1
command: lint
- name: Create kind cluster
uses: helm/kind-action@v1.0.0-rc.1
with:
version: "v0.8.1"
# Only build a kind cluster if there are chart changes to test.
if: steps.lint.outputs.changed == 'true'
- name: Install MetalLB to allow LoadBalancer services
run: |
kubectl create ns metallb-system
kubectl apply -f https://raw.githubusercontent.com/google/metallb/v0.9.3/manifests/metallb.yaml
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: config
namespace: metallb-system
data:
config: |
address-pools:
- name: default
protocol: layer2
addresses:
- 172.17.255.1-172.17.255.200
EOF
if: steps.lint.outputs.changed == 'true'
- name: Run chart-testing (install)
uses: helm/chart-testing-action@v1.0.0-rc.1
with:
image: quay.io/helmpack/chart-testing:v3.0.0-rc.1
command: install

115
.github/workflows/main.yml vendored Normal file
View File

@ -0,0 +1,115 @@
name: Lint and Test Charts
on:
push:
branches: [master]
tags:
- 'v*'
pull_request:
jobs:
docker-build:
runs-on: ubuntu-latest
name: Building Docker Image
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Docker meta
id: docker_meta
uses: crazy-max/ghaction-docker-meta@v1
with:
images: caddy/ingress
tag-sha: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
push: true
tags: ${{ steps.docker_meta.outputs.tags }}
labels: ${{ steps.docker_meta.outputs.labels }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
lint-test:
name: Test and lint charts
needs:
- docker-build
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up chart-testing
uses: helm/chart-testing-action@v2.0.1
- name: Run chart-testing (list-changed)
id: list-changed
run: |
changed=$(ct list-changed)
if [[ -n "$changed" ]]; then
echo "::set-output name=changed::true"
fi
- name: Run chart-testing (lint)
run: ct lint
- name: Create kind cluster
uses: helm/kind-action@v1.1.0
with:
version: "v0.9.0"
# Only build a kind cluster if there are chart changes to test.
if: steps.list-changed.outputs.changed == 'true'
- name: Install MetalLB to allow LoadBalancer services
if: steps.list-changed.outputs.changed == 'true'
run: |
kubectl create ns metallb-system
kubectl apply -f https://raw.githubusercontent.com/google/metallb/v0.9.3/manifests/metallb.yaml
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: config
namespace: metallb-system
data:
config: |
address-pools:
- name: default
protocol: layer2
addresses:
- 172.17.255.1-172.17.255.200
EOF
- name: Get image tag
id: image-tag
run: echo "::set-output name=data::$(echo ${GITHUB_SHA} | cut -c1-7)"
- name: Set current image tag
working-directory: charts/caddy-ingress-controller
run: envsubst < ci/test-values.yaml.tpl > ci/test-values.yaml
env:
TAG: ${{ steps.image-tag.outputs.data }}
- name: Run chart-testing (install)
run: ct install --config ct.yaml

View File

@ -4,8 +4,7 @@ home: https://github.com/caddyserver/ingress
description: A helm chart for the Caddy Kubernetes ingress controller
icon: https://caddyserver.com/resources/images/caddy-circle-lock.svg
type: application
version: 0.0.1-rc2
appVersion: v0.1.0
version: 0.0.1-rc3
keywords:
- ingress-controller
- caddyserver

View File

@ -0,0 +1,81 @@
# caddy-ingress-controller
A helm chart for the Caddy Kubernetes ingress controller
## TL;DR:
```bash
helm install my-release caddy-ingress-controller\
--repo https://caddyserver.github.io/ingress/ \
--namespace=caddy-system
```
## Introduction
This chart bootstraps a caddy-ingress-deployment deployment on a [Kubernetes](http://kubernetes.io) cluster using the [Helm](https://helm.sh) package manager.
## Prerequisites
- Helm 3+
- Kubernetes 1.14+
## Installing the Chart
```bash
helm repo add caddyserver https://caddyserver.github.io/ingress/
helm install my-release caddyserver/caddy-ingress-controller --namespace=caddy-system
```
## Uninstalling the Chart
To uninstall `my-release`:
```console
$ helm uninstall my-release
```
The command removes all the Kubernetes components associated with the chart and deletes the release.
> **Tip**: List all releases using `helm list` or start clean with `helm uninstall my-release`
## Additional Configuration
## Troubleshooting
## Values
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| affinity | object | `{}` | |
| fullnameOverride | string | `""` | |
| image.pullPolicy | string | `"IfNotPresent"` | |
| image.repository | string | `"caddy/ingress"` | |
| image.tag | string | `"latest"` | |
| imagePullSecrets | list | `[]` | |
| ingressController.config.acmeCA | string | `""` | |
| ingressController.config.debug | bool | `false` | |
| ingressController.config.email | string | `""` | |
| ingressController.config.metrics | bool | `true` | |
| ingressController.config.onDemandTLS | bool | `false` | |
| ingressController.config.proxyProtocol | bool | `false` | |
| ingressController.rbac.create | bool | `true` | |
| ingressController.verbose | bool | `false` | |
| ingressController.watchNamespace | string | `""` | |
| minikube | bool | `false` | |
| nameOverride | string | `""` | |
| nodeSelector | object | `{}` | |
| podAnnotations | object | `{}` | |
| podDisruptionBudget.maxUnavailable | string | `nil` | |
| podDisruptionBudget.minAvailable | int | `1` | |
| podSecurityContext | object | `{}` | |
| replicaCount | int | `2` | |
| resources | object | `{}` | |
| securityContext.allowPrivilegeEscalation | bool | `true` | |
| securityContext.capabilities.add[0] | string | `"NET_BIND_SERVICE"` | |
| securityContext.capabilities.drop[0] | string | `"ALL"` | |
| securityContext.runAsGroup | int | `0` | |
| securityContext.runAsUser | int | `0` | |
| serviceAccount.annotations | object | `{}` | |
| serviceAccount.create | bool | `true` | |
| serviceAccount.name | string | `"caddy-ingress-controller"` | |
| tolerations | list | `[]` | |

View File

@ -0,0 +1,48 @@
{{ template "chart.header" . }}
{{ template "chart.description" . }}
## TL;DR:
```bash
helm install my-release caddy-ingress-controller\
--repo https://caddyserver.github.io/ingress/ \
--namespace=caddy-system
```
## Introduction
This chart bootstraps a caddy-ingress-deployment deployment on a [Kubernetes](http://kubernetes.io) cluster using the [Helm](https://helm.sh) package manager.
## Prerequisites
- Helm 3+
- Kubernetes 1.14+
## Installing the Chart
```bash
helm repo add caddyserver https://caddyserver.github.io/ingress/
helm install my-release caddyserver/caddy-ingress-controller --namespace=caddy-system
```
## Uninstalling the Chart
To uninstall `my-release`:
```console
$ helm uninstall my-release
```
The command removes all the Kubernetes components associated with the chart and deletes the release.
> **Tip**: List all releases using `helm list` or start clean with `helm uninstall my-release`
## Additional Configuration
## Troubleshooting
{{ template "chart.valuesSection" . }}

View File

@ -0,0 +1,2 @@
image:
tag: sha-${TAG}

View File

@ -8,10 +8,12 @@ rules:
- apiGroups:
- ""
- "networking.k8s.io"
- "coordination.k8s.io"
resources:
- ingresses
- ingresses/status
- secrets
- leases
verbs: ["*"]
- apiGroups:
- ""
@ -21,6 +23,7 @@ rules:
- nodes
- routes
- extensions
- configmaps
verbs:
- list
- get

View File

@ -45,6 +45,12 @@ spec:
{{- if .Values.minikube }}
hostPort: 443 # optional, required if running in minikube
{{- end }}
- name: metrics
containerPort: 9765
protocol: TCP
{{- if .Values.minikube }}
hostPort: 9765 # optional, required if running in minikube
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
@ -64,6 +70,15 @@ spec:
{{- if .Values.ingressController.watchNamespace }}
- -namespace={{ .Values.ingressController.watchNamespace }}
{{- end }}
{{- if .Values.ingressController.verbose }}
- -v
{{- end }}
readinessProbe:
initialDelaySeconds: 3
periodSeconds: 10
httpGet:
port: 9765
path: /metrics
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@ -0,0 +1,19 @@
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: {{ include "caddy-ingress-controller.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "caddy-ingress-controller.labels" . | nindent 4 }}
spec:
{{- with .Values.podDisruptionBudget }}
{{- if .minAvailable }}
minAvailable: {{ .minAvailable }}
{{- end }}
{{- if .maxUnavailable }}
maxUnavailable: {{ .maxUnavailable }}
{{- end }}
{{- end }}
selector:
matchLabels:
{{- include "caddy-ingress-controller.selectorLabels" . | nindent 6 }}

View File

@ -1,179 +1,212 @@
{
"definitions": {},
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": [
"replicaCount",
"minikube",
"image",
"imagePullSecrets",
"nameOverride",
"fullnameOverride",
"ingressController",
"serviceAccount",
"podAnnotations",
"podSecurityContext",
"securityContext",
"resources",
"nodeSelector",
"tolerations",
"affinity"
],
"properties": {
"replicaCount": {
"$id": "#/properties/replicaCount",
"type": "number"
"definitions": {},
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": [
"replicaCount",
"minikube",
"image",
"imagePullSecrets",
"nameOverride",
"fullnameOverride",
"ingressController",
"serviceAccount",
"podAnnotations",
"podSecurityContext",
"securityContext",
"resources",
"nodeSelector",
"tolerations",
"affinity"
],
"properties": {
"replicaCount": {
"$id": "#/properties/replicaCount",
"type": "number"
},
"minikube": {
"$id": "#/properties/minikube",
"type": "boolean"
},
"image": {
"$id": "#/properties/image",
"type": "object",
"required": [
"repository",
"tag",
"pullPolicy"
],
"properties": {
"repository": {
"$id": "#/properties/image/properties/repository",
"type": "string"
},
"minikube": {
"$id": "#/properties/minikube",
"type": "boolean"
"tag": {
"$id": "#/properties/image/properties/tag",
"type": "string"
},
"image": {
"$id": "#/properties/image",
"type": "object",
"required": [
"repository",
"tag",
"pullPolicy"
],
"properties": {
"repository": {
"$id": "#/properties/image/properties/repository",
"type": "string"
},
"tag": {
"$id": "#/properties/image/properties/tag",
"type": "string"
},
"pullPolicy": {
"$id": "#/properties/image/properties/pullPolicy",
"type": "string",
"enum": [
"Always",
"IfNotPresent",
"Never"
]
}
}
},
"imagePullSecrets": {
"$id": "#/properties/imagePullSecrets",
"type": "array"
},
"nameOverride": {
"$id": "#/properties/nameOverride",
"type": "string"
},
"fullnameOverride": {
"$id": "#/properties/fullnameOverride",
"type": "string"
},
"ingressController": {
"$id": "#/properties/ingressController",
"type": "object",
"required": [
"rbac",
"config"
],
"properties": {
"rbac": {
"$id": "#/properties/ingressController/properties/rbac",
"type": "object",
"required": [
"create"
],
"properties": {
"create": {
"$id": "#/properties/ingressController/properties/rbac/properties/create",
"type": "boolean"
}
}
},
"config": {
"$id": "#/properties/ingressController/properties/config",
"type": "object",
"properties": {
"acmeCA": {
"$id": "#/properties/ingressController/properties/config/properties/acmeCA",
"type": "string",
"oneOf": [
{
"format": "uri"
},
{
"maxLength": 0
}
]
},
"email": {
"$id": "#/properties/ingressController/properties/config/properties/email",
"type": "string",
"oneOf": [
{
"format": "email"
},
{
"maxLength": 0
}
]
},
"debug": {
"$id": "#/properties/ingressController/properties/config/properties/debug",
"type": "boolean"
}
}
}
}
},
"serviceAccount": {
"$id": "#/properties/serviceAccount",
"type": "object",
"required": [
"create",
"name"
],
"properties": {
"create": {
"$id": "#/properties/serviceAccount/properties/create",
"type": "boolean"
},
"name": {
"$id": "#/properties/serviceAccount/properties/name",
"type": "string"
},
"annotations": {
"$id": "#/properties/serviceAccount/properties/annotations",
"type": "object"
}
}
},
"podAnnotations": {
"$id": "#/properties/podAnnotations",
"type": "object"
},
"podSecurityContext": {
"$id": "#/properties/podSecurityContext",
"type": "object"
},
"securityContext": {
"$id": "#/properties/securityContext",
"type": "object"
},
"resources": {
"$id": "#/properties/resources",
"type": "object"
},
"nodeSelector": {
"$id": "#/properties/nodeSelector",
"type": "object"
},
"tolerations": {
"$id": "#/properties/tolerations",
"type": "array"
},
"affinity": {
"$id": "#/properties/affinity",
"type": "object"
"pullPolicy": {
"$id": "#/properties/image/properties/pullPolicy",
"type": "string",
"enum": [
"Always",
"IfNotPresent",
"Never"
]
}
}
},
"imagePullSecrets": {
"$id": "#/properties/imagePullSecrets",
"type": "array"
},
"nameOverride": {
"$id": "#/properties/nameOverride",
"type": "string"
},
"fullnameOverride": {
"$id": "#/properties/fullnameOverride",
"type": "string"
},
"ingressController": {
"$id": "#/properties/ingressController",
"type": "object",
"required": [
"rbac",
"config",
"watchNamespace"
],
"properties": {
"rbac": {
"$id": "#/properties/ingressController/properties/rbac",
"type": "object",
"required": [
"create"
],
"properties": {
"create": {
"$id": "#/properties/ingressController/properties/rbac/properties/create",
"type": "boolean"
}
}
},
"config": {
"$id": "#/properties/ingressController/properties/config",
"type": "object",
"properties": {
"acmeCA": {
"$id": "#/properties/ingressController/properties/config/properties/acmeCA",
"type": "string",
"oneOf": [
{
"format": "uri"
},
{
"maxLength": 0
}
]
},
"debug": {
"$id": "#/properties/ingressController/properties/config/properties/debug",
"type": "boolean"
},
"email": {
"$id": "#/properties/ingressController/properties/config/properties/email",
"type": "string",
"oneOf": [
{
"format": "email"
},
{
"maxLength": 0
}
]
},
"metrics": {
"$id": "#/properties/ingressController/properties/config/properties/metrics",
"type": "boolean"
},
"proxyProtocol": {
"$id": "#/properties/ingressController/properties/config/properties/proxyProtocol",
"type": "boolean"
},
"onDemandTLS": {
"$id": "#/properties/ingressController/properties/config/properties/onDemandTLS",
"type": "boolean"
},
"onDemandRateLimitInterval": {
"$id": "#/properties/ingressController/properties/config/properties/onDemandRateLimitInterval",
"type": "string"
},
"onDemandRateLimitBurst": {
"$id": "#/properties/ingressController/properties/config/properties/onDemandRateLimitBurst",
"type": "number"
},
"onDemandAsk": {
"$id": "#/properties/ingressController/properties/config/properties/onDemandAsk",
"type": "string"
}
}
},
"verbose": {
"$id": "#/properties/ingressController/properties/verbose",
"type": "boolean"
},
"watchNamespace": {
"$id": "#/properties/ingressController/properties/watchNamespace",
"type": "string"
}
}
},
"serviceAccount": {
"$id": "#/properties/serviceAccount",
"type": "object",
"required": [
"create",
"name"
],
"properties": {
"create": {
"$id": "#/properties/serviceAccount/properties/create",
"type": "boolean"
},
"name": {
"$id": "#/properties/serviceAccount/properties/name",
"type": "string"
},
"annotations": {
"$id": "#/properties/serviceAccount/properties/annotations",
"type": "object"
}
}
},
"podAnnotations": {
"$id": "#/properties/podAnnotations",
"type": "object"
},
"podSecurityContext": {
"$id": "#/properties/podSecurityContext",
"type": "object"
},
"securityContext": {
"$id": "#/properties/securityContext",
"type": "object"
},
"resources": {
"$id": "#/properties/resources",
"type": "object"
},
"nodeSelector": {
"$id": "#/properties/nodeSelector",
"type": "object"
},
"tolerations": {
"$id": "#/properties/tolerations",
"type": "array"
},
"affinity": {
"$id": "#/properties/affinity",
"type": "object"
}
}
}

View File

@ -1,7 +1,7 @@
# Default values for caddy-ingress-controller.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
replicaCount: 2
# Use to test in minikube context
minikube: false
@ -17,13 +17,22 @@ fullnameOverride: ""
# Default values for the caddy ingress controller.
ingressController:
watchNamespace: ""
verbose: false
rbac:
create: true
config:
# -- Acme Server URL
acmeCA: ""
email: ""
debug: false
email: ""
metrics: true
proxyProtocol: false
onDemandTLS: false
# onDemandRateLimitInterval:
# onDemandRateLimitBurst:
# onDemandAsk:
serviceAccount:
# Specifies whether a service account should be created
@ -39,6 +48,10 @@ podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
podDisruptionBudget:
minAvailable: 1
maxUnavailable:
securityContext:
allowPrivilegeEscalation: true
capabilities:

View File

@ -2,21 +2,28 @@ package main
import (
"flag"
"github.com/caddyserver/ingress/internal/caddy"
"github.com/caddyserver/ingress/internal/controller"
)
func parseFlags() caddy.ControllerConfig {
func parseFlags() controller.Options {
var namespace string
flag.StringVar(&namespace, "namespace", "", "the namespace that you would like to observe kubernetes ingress resources in.")
var configMapName string
flag.StringVar(&configMapName, "config-map", "", "defines the config map name from where to load global options")
var leaseId string
flag.StringVar(&leaseId, "lease-id", "", "defines the id of this instance for certmagic lock")
var verbose bool
flag.BoolVar(&verbose, "v", false, "set the log level to debug")
flag.Parse()
return caddy.ControllerConfig{
return controller.Options{
WatchNamespace: namespace,
ConfigMapName: configMapName,
Verbose: verbose,
LeaseId: leaseId,
}
}

View File

@ -1,20 +1,15 @@
package main
import (
"fmt"
"net/http"
"os"
"time"
"github.com/caddyserver/ingress/internal/caddy"
"github.com/caddyserver/ingress/internal/controller"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"time"
)
const (
@ -25,77 +20,56 @@ const (
defaultBurst = 1e6
)
func createLogger(verbose bool) *zap.SugaredLogger {
prodCfg := zap.NewProductionConfig()
if verbose {
prodCfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
}
logger, _ := prodCfg.Build()
return logger.Sugar()
}
func main() {
// parse any flags required to configure the caddy ingress controller
cfg := parseFlags()
logger := createLogger(cfg.Verbose)
if cfg.WatchNamespace == "" {
cfg.WatchNamespace = v1.NamespaceAll
logrus.Warning("-namespace flag is unset, caddy ingress controller will monitor ingress resources in all namespaces.")
logger.Warn("-namespace flag is unset, caddy ingress controller will monitor ingress resources in all namespaces.")
}
// get client to access the kubernetes service api
kubeClient, err := createApiserverClient()
kubeClient, err := createApiserverClient(logger)
if err != nil {
msg := "Could not establish a connection to the Kubernetes API Server."
logrus.Fatalf(msg, err)
logger.Fatalf("Could not establish a connection to the Kubernetes API Server. %v", err)
}
c := controller.NewCaddyController(kubeClient, cfg)
reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
PidFn: func() (int, error) { return os.Getpid(), nil },
ReportErrors: true,
}))
// create http server to expose controller health metrics
go startMetricsServer(reg)
// start the ingress controller
stopCh := make(chan struct{}, 1)
defer close(stopCh)
logrus.Info("Starting the caddy ingress controller")
go c.Run(stopCh)
c := controller.NewCaddyController(logger, kubeClient, cfg, caddy.Converter{}, stopCh)
// start the ingress controller
logger.Info("Starting the caddy ingress controller")
go c.Run()
// TODO :- listen to sigterm
select {}
}
func startMetricsServer(reg *prometheus.Registry) {
mux := http.NewServeMux()
mux.Handle(
"/metrics",
promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
),
)
logrus.Info("Exporting metrics on :9090")
server := &http.Server{
Addr: fmt.Sprintf(":%v", 9090),
Handler: mux,
ReadTimeout: 10 * time.Second,
ReadHeaderTimeout: 10 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 120 * time.Second,
}
logrus.Fatal(server.ListenAndServe())
}
// createApiserverClient creates a new Kubernetes REST client. We assume the
// controller runs inside Kubernetes and use the in-cluster config.
func createApiserverClient() (*kubernetes.Clientset, error) {
func createApiserverClient(logger *zap.SugaredLogger) (*kubernetes.Clientset, error) {
cfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
return nil, err
}
logrus.Infof("Creating API client for %s", cfg.Host)
logger.Infof("Creating API client for %s", cfg.Host)
cfg.QPS = defaultQPS
cfg.Burst = defaultBurst
@ -124,7 +98,7 @@ func createApiserverClient() (*kubernetes.Clientset, error) {
}
lastErr = err
logrus.Infof("Unexpected error discovering Kubernetes version (attempt %v): %v", retries, err)
logger.Infof("Unexpected error discovering Kubernetes version (attempt %v): %v", retries, err)
retries++
return false, nil
})
@ -135,7 +109,7 @@ func createApiserverClient() (*kubernetes.Clientset, error) {
}
if retries > 0 {
logrus.Warningf("Initial connection to the Kubernetes API server was retried %d times.", retries)
logger.Warnf("Initial connection to the Kubernetes API server was retried %d times.", retries)
}
return client, nil

41
go.mod
View File

@ -3,40 +3,23 @@ module github.com/caddyserver/ingress
go 1.14
require (
github.com/caddyserver/caddy/v2 v2.0.0
github.com/caddyserver/certmagic v0.10.12
github.com/caddyserver/caddy/v2 v2.3.0-rc.1
github.com/caddyserver/certmagic v0.12.1-0.20201209195841-b726d1ed13c3
github.com/google/uuid v1.1.1
github.com/mitchellh/mapstructure v1.1.2
github.com/pires/go-proxyproto v0.3.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.6.0
github.com/sirupsen/logrus v1.6.0
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/pool.v3 v3.1.1
k8s.io/api v0.17.0
k8s.io/apimachinery v0.17.0
k8s.io/client-go v0.17.0
k8s.io/kubernetes v1.17.0
k8s.io/api v0.19.4
k8s.io/apimachinery v0.19.4
k8s.io/client-go v0.19.4
)
replace (
k8s.io/api => k8s.io/api v0.17.0
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.0
k8s.io/apimachinery => k8s.io/apimachinery v0.17.0
k8s.io/apiserver => k8s.io/apiserver v0.17.0
k8s.io/cli-runtime => k8s.io/cli-runtime v0.17.0
k8s.io/client-go => k8s.io/client-go v0.17.0
k8s.io/cloud-provider => k8s.io/cloud-provider v0.17.0
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.17.0
k8s.io/code-generator => k8s.io/code-generator v0.17.0
k8s.io/component-base => k8s.io/component-base v0.17.0
k8s.io/cri-api => k8s.io/cri-api v0.17.0
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.17.0
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.17.0
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.17.0
k8s.io/kube-proxy => k8s.io/kube-proxy v0.17.0
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.17.0
k8s.io/kubectl => k8s.io/kubectl v0.17.0
k8s.io/kubelet => k8s.io/kubelet v0.17.0
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.17.0
k8s.io/metrics => k8s.io/metrics v0.17.0
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.17.0
k8s.io/api => k8s.io/api v0.19.4
k8s.io/apimachinery => k8s.io/apimachinery v0.19.4
k8s.io/client-go => k8s.io/client-go v0.19.4
)

789
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -1,67 +0,0 @@
package caddy
import (
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/caddy/v2/modules/caddytls"
)
// StorageValues represents the config for certmagic storage providers.
type StorageValues struct {
Namespace string `json:"namespace"`
}
// Storage represents the certmagic storage configuration.
type Storage struct {
System string `json:"module"`
StorageValues
}
// Config represents a caddy2 config file.
type Config struct {
Storage Storage `json:"storage"`
Apps map[string]interface{} `json:"apps"`
Logging caddy.Logging `json:"logging"`
}
// ControllerConfig represents ingress controller config received through cli arguments.
type ControllerConfig struct {
WatchNamespace string
ConfigMapName string
}
// NewConfig returns a base plain slate caddy2 config file.
func NewConfig(namespace string, cfgMapConfig *Config) *Config {
var cfg *Config
if cfgMapConfig != nil {
cfg = cfgMapConfig
} else {
cfg = &Config{
Logging: caddy.Logging{},
Apps: map[string]interface{}{
"tls": &caddytls.TLS{
CertificatesRaw: caddy.ModuleMap{},
},
"http": &caddyhttp.App{
Servers: map[string]*caddyhttp.Server{
"ingress_server": {
AutoHTTPS: &caddyhttp.AutoHTTPSConfig{},
Listen: []string{":443"},
},
},
},
},
}
}
// set cert-magic storage provider
cfg.Storage = Storage{
System: "secret_store",
StorageValues: StorageValues{
Namespace: namespace,
},
}
return cfg
}

View File

@ -2,63 +2,106 @@ package caddy
import (
"encoding/json"
"fmt"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"k8s.io/api/networking/v1beta1"
"github.com/caddyserver/caddy/v2/modules/caddytls"
"github.com/caddyserver/ingress/internal/controller"
)
// ConvertToCaddyConfig returns a new caddy routelist based off of ingresses managed by this controller.
// This is not used when this ingress controller is configured with a config map, so that we don't
// override user defined routes.
func ConvertToCaddyConfig(ings []*v1beta1.Ingress) (caddyhttp.RouteList, error) {
// TODO :-
// when setting the upstream url we should should bypass kube-dns and get the ip address of
// the pod for the deployment we are proxying to so that we can proxy to that ip address port.
// this is good for session affinity and increases performance.
// create a server route for each ingress route
var routes caddyhttp.RouteList
for _, ing := range ings {
for _, rule := range ing.Spec.Rules {
for _, path := range rule.HTTP.Paths {
clusterHostName := fmt.Sprintf("%v.%v.svc.cluster.local:%d", path.Backend.ServiceName, ing.Namespace, path.Backend.ServicePort.IntVal)
r := baseRoute(clusterHostName)
match := caddy.ModuleMap{}
if rule.Host != "" {
match["host"] = caddyconfig.JSON(caddyhttp.MatchHost{rule.Host}, nil)
}
if path.Path != "" {
match["path"] = caddyconfig.JSON(caddyhttp.MatchPath{path.Path}, nil)
}
r.MatcherSetsRaw = []caddy.ModuleMap{match}
routes = append(routes, r)
}
}
}
return routes, nil
// StorageValues represents the config for certmagic storage providers.
type StorageValues struct {
Namespace string `json:"namespace"`
LeaseId string `json:"leaseId"`
}
// TODO :- configure log middleware for all routes
func baseRoute(upstream string) caddyhttp.Route {
return caddyhttp.Route{
HandlersRaw: []json.RawMessage{
json.RawMessage(`
{
"handler": "reverse_proxy",
"upstreams": [
{
"dial": "` + fmt.Sprintf("%s", upstream) + `"
}
]
}
`),
// Storage represents the certmagic storage configuration.
type Storage struct {
System string `json:"module"`
StorageValues
}
// Config represents a caddy2 config file.
type Config struct {
Admin caddy.AdminConfig `json:"admin,omitempty"`
Storage Storage `json:"storage"`
Apps map[string]interface{} `json:"apps"`
Logging caddy.Logging `json:"logging"`
}
type Converter struct{}
const (
HttpServer = "ingress_server"
MetricsServer = "metrics_server"
)
func metricsServer(enabled bool) *caddyhttp.Server {
handler := json.RawMessage(`{ "handler": "static_response" }`)
if enabled {
handler = json.RawMessage(`{ "handler": "metrics" }`)
}
return &caddyhttp.Server{
Listen: []string{":9765"},
AutoHTTPS: &caddyhttp.AutoHTTPSConfig{Disabled: true},
Routes: []caddyhttp.Route{{
HandlersRaw: []json.RawMessage{handler},
MatcherSetsRaw: []caddy.ModuleMap{{
"path": caddyconfig.JSON(caddyhttp.MatchPath{"/metrics"}, nil),
}},
}},
}
}
func newConfig(namespace string, store *controller.Store) (*Config, error) {
cfg := &Config{
Logging: caddy.Logging{},
Apps: map[string]interface{}{
"tls": &caddytls.TLS{
CertificatesRaw: caddy.ModuleMap{},
},
"http": &caddyhttp.App{
Servers: map[string]*caddyhttp.Server{
MetricsServer: metricsServer(store.ConfigMap.Metrics),
HttpServer: {
AutoHTTPS: &caddyhttp.AutoHTTPSConfig{},
// Listen to both :80 and :443 ports in order
// to use the same listener wrappers (PROXY protocol use it)
Listen: []string{":80", ":443"},
},
},
},
},
Storage: Storage{
System: "secret_store",
StorageValues: StorageValues{
Namespace: namespace,
LeaseId: store.Options.LeaseId,
},
},
}
return cfg, nil
}
func (c Converter) ConvertToCaddyConfig(namespace string, store *controller.Store) (interface{}, error) {
cfg, err := newConfig(namespace, store)
err = LoadIngressConfig(cfg, store)
if err != nil {
return cfg, err
}
err = LoadConfigMapOptions(cfg, store)
if err != nil {
return cfg, err
}
err = LoadTLSConfig(cfg, store)
if err != nil {
return cfg, err
}
return cfg, err
}

View File

@ -0,0 +1,65 @@
package caddy
import (
"encoding/json"
caddy2 "github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/caddy/v2/modules/caddytls"
"github.com/caddyserver/ingress/internal/controller"
)
// LoadConfigMapOptions load options from ConfigMap
func LoadConfigMapOptions(config *Config, store *controller.Store) error {
cfgMap := store.ConfigMap
tlsApp := config.Apps["tls"].(*caddytls.TLS)
httpServer := config.Apps["http"].(*caddyhttp.App).Servers[HttpServer]
if cfgMap.Debug {
config.Logging.Logs = map[string]*caddy2.CustomLog{"default": {Level: "DEBUG"}}
}
if cfgMap.AcmeCA != "" || cfgMap.Email != "" {
acmeIssuer := caddytls.ACMEIssuer{}
if cfgMap.AcmeCA != "" {
acmeIssuer.CA = cfgMap.AcmeCA
}
if cfgMap.Email != "" {
acmeIssuer.Email = cfgMap.Email
}
var onDemandConfig *caddytls.OnDemandConfig
if cfgMap.OnDemandTLS {
onDemandConfig = &caddytls.OnDemandConfig{
RateLimit: &caddytls.RateLimit{
Interval: cfgMap.OnDemandRateLimitInterval,
Burst: cfgMap.OnDemandRateLimitBurst,
},
Ask: cfgMap.OnDemandAsk,
}
}
tlsApp.Automation = &caddytls.AutomationConfig{
OnDemand: onDemandConfig,
Policies: []*caddytls.AutomationPolicy{
{
IssuersRaw: []json.RawMessage{
caddyconfig.JSONModuleObject(acmeIssuer, "module", "acme", nil),
},
OnDemand: cfgMap.OnDemandTLS,
},
},
}
}
if cfgMap.ProxyProtocol {
httpServer.ListenerWrappersRaw = []json.RawMessage{
json.RawMessage(`{"wrapper":"proxy_protocol"}`),
json.RawMessage(`{"wrapper":"tls"}`),
}
}
return nil
}

76
internal/caddy/ingress.go Normal file
View File

@ -0,0 +1,76 @@
package caddy
import (
"encoding/json"
"fmt"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/ingress/internal/controller"
"k8s.io/api/networking/v1beta1"
)
// TODO :- configure log middleware for all routes
func baseRoute(upstream string) caddyhttp.Route {
return caddyhttp.Route{
HandlersRaw: []json.RawMessage{
json.RawMessage(`
{
"handler": "reverse_proxy",
"upstreams": [
{
"dial": "` + fmt.Sprintf("%s", upstream) + `"
}
]
}
`),
},
}
}
// LoadIngressConfig creates a routelist based off of ingresses managed by this controller.
func LoadIngressConfig(config *Config, store *controller.Store) error {
// TODO :-
// when setting the upstream url we should should bypass kube-dns and get the ip address of
// the pod for the deployment we are proxying to so that we can proxy to that ip address port.
// this is good for session affinity and increases performance.
// create a server route for each ingress route
var routes caddyhttp.RouteList
for _, ing := range store.Ingresses {
for _, rule := range ing.Spec.Rules {
for _, path := range rule.HTTP.Paths {
clusterHostName := fmt.Sprintf("%v.%v.svc.cluster.local:%d", path.Backend.ServiceName, ing.Namespace, path.Backend.ServicePort.IntVal)
r := baseRoute(clusterHostName)
match := caddy.ModuleMap{
// match only on https protocol to allow HTTPS redirects
// TODO Let user disable this to serve HTTP requests
"protocol": caddyconfig.JSON(caddyhttp.MatchProtocol("https"), nil),
}
if rule.Host != "" {
match["host"] = caddyconfig.JSON(caddyhttp.MatchHost{rule.Host}, nil)
}
if path.Path != "" {
p := path.Path
if *path.PathType == v1beta1.PathTypePrefix {
p += "*"
}
match["path"] = caddyconfig.JSON(caddyhttp.MatchPath{p}, nil)
}
r.MatcherSetsRaw = []caddy.ModuleMap{match}
routes = append(routes, r)
}
}
}
httpApp := config.Apps["http"].(*caddyhttp.App)
httpApp.Servers[HttpServer].Routes = routes
return nil
}

33
internal/caddy/tls.go Normal file
View File

@ -0,0 +1,33 @@
package caddy
import (
"encoding/json"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/caddy/v2/modules/caddytls"
"github.com/caddyserver/ingress/internal/controller"
)
// LoadTLSConfig configure caddy when some ingresses have TLS certs
func LoadTLSConfig(config *Config, store *controller.Store) error {
tlsApp := config.Apps["tls"].(*caddytls.TLS)
httpApp := config.Apps["http"].(*caddyhttp.App)
var hosts []string
// Get all Hosts subject to custom TLS certs
for _, ing := range store.Ingresses {
for _, tlsRule := range ing.Spec.TLS {
for _, h := range tlsRule.Hosts {
hosts = append(hosts, h)
}
}
}
if len(hosts) > 0 {
tlsApp.CertificatesRaw["load_folders"] = json.RawMessage(`["` + controller.CertFolder + `"]`)
// do not manage certificates for those hosts
httpApp.Servers[HttpServer].AutoHTTPS.SkipCerts = hosts
}
return nil
}

View File

@ -1,218 +0,0 @@
package controller
import (
"encoding/json"
"fmt"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/caddyserver/caddy/v2/modules/caddytls"
"github.com/caddyserver/ingress/internal/caddy"
config "github.com/caddyserver/ingress/internal/caddy"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/api/networking/v1beta1"
)
// loadConfigMap runs when a config map with caddy config is loaded on app start.
func (c *CaddyController) onLoadConfig(obj interface{}) {
c.syncQueue.Add(LoadConfigAction{
config: obj,
})
}
// onResourceAdded runs when an ingress resource is added to the cluster.
func (c *CaddyController) onResourceAdded(obj interface{}) {
c.syncQueue.Add(ResourceAddedAction{
resource: obj,
})
}
// onResourceUpdated is run when an ingress resource is updated in the cluster.
func (c *CaddyController) onResourceUpdated(old interface{}, new interface{}) {
c.syncQueue.Add(ResourceUpdatedAction{
resource: new,
oldResource: old,
})
}
// onResourceDeleted is run when an ingress resource is deleted from the cluster.
func (c *CaddyController) onResourceDeleted(obj interface{}) {
c.syncQueue.Add(ResourceDeletedAction{
resource: obj,
})
}
// onSyncStatus is run every sync interval to update the source address on ingresses.
func (c *CaddyController) onSyncStatus(obj interface{}) {
c.syncQueue.Add(SyncStatusAction{})
}
// Action is an interface for ingress actions.
type Action interface {
handle(c *CaddyController) error
}
// LoadConfigAction provides an implementation of the action interface.
type LoadConfigAction struct {
config interface{}
}
// ResourceAddedAction provides an implementation of the action interface.
type ResourceAddedAction struct {
resource interface{}
}
// ResourceUpdatedAction provides an implementation of the action interface.
type ResourceUpdatedAction struct {
resource interface{}
oldResource interface{}
}
// ResourceDeletedAction provides an implementation of the action interface.
type ResourceDeletedAction struct {
resource interface{}
}
func (r LoadConfigAction) handle(c *CaddyController) error {
logrus.Info("Config file detected, updating Caddy config...")
c.resourceStore.CaddyConfig = r.config.(*config.Config)
err := regenerateConfig(c)
if err != nil {
return err
}
return nil
}
func (r ResourceAddedAction) handle(c *CaddyController) error {
logrus.Info("New ingress resource detected, updating Caddy config...")
// configure caddy to handle this resource
ing, ok := r.resource.(*v1beta1.Ingress)
if !ok {
return fmt.Errorf("ResourceAddedAction: incoming resource is not of type ingress")
}
// add this ingress to the internal store
c.resourceStore.AddIngress(ing)
err := regenerateConfig(c)
if err != nil {
return err
}
// ensure that ingress source is updated to point to this ingress controller's ip
err = c.syncStatus([]*v1beta1.Ingress{ing})
if err != nil {
return errors.Wrapf(err, "syncing ingress source address name: %v", ing.GetName())
}
logrus.Info("Caddy reloaded successfully.")
return nil
}
func (r ResourceUpdatedAction) handle(c *CaddyController) error {
logrus.Info("Ingress resource update detected, updating Caddy config...")
// update caddy config regarding this ingress
ing, ok := r.resource.(*v1beta1.Ingress)
if !ok {
return fmt.Errorf("ResourceAddedAction: incoming resource is not of type ingress")
}
// add or update this ingress in the internal store
c.resourceStore.AddIngress(ing)
err := regenerateConfig(c)
if err != nil {
return err
}
logrus.Info("Caddy reloaded successfully.")
return nil
}
func (r ResourceDeletedAction) handle(c *CaddyController) error {
logrus.Info("Ingress resource deletion detected, updating Caddy config...")
// delete all resources from caddy config that are associated with this resource
// reload caddy config
ing, ok := r.resource.(*v1beta1.Ingress)
if !ok {
return fmt.Errorf("ResourceAddedAction: incoming resource is not of type ingress")
}
// add this ingress to the internal store
c.resourceStore.PluckIngress(ing)
err := regenerateConfig(c)
if err != nil {
return err
}
logrus.Info("Caddy reloaded successfully.")
return nil
}
// regenerateConfig regenerate caddy config with updated resources.
func regenerateConfig(c *CaddyController) error {
logrus.Info("Updating caddy config")
var cfg *config.Config
var cfgFile *config.Config = nil
var err error
if c.usingConfigMap {
cfgFile, err = loadCaddyConfigFile("/etc/caddy/config.json")
if err != nil {
logrus.Warn("Unable to load config file: %v", err)
}
}
cfg = config.NewConfig(c.podInfo.Namespace, cfgFile)
tlsApp := cfg.Apps["tls"].(*caddytls.TLS)
httpApp := cfg.Apps["http"].(*caddyhttp.App)
if c.resourceStore.ConfigMap != nil {
err := setConfigMapOptions(c, cfg)
if err != nil {
return errors.Wrap(err, "caddy config reload")
}
}
// if certs are defined on an ingress resource we need to handle them.
tlsCfg, err := c.HandleOwnCertManagement(c.resourceStore.Ingresses)
if err != nil {
return errors.Wrap(err, "caddy config reload")
}
// after TLS secrets are synched we should load them in the cert pool
// and skip auto https for hosts with certs provided
if tlsCfg != nil {
tlsApp.CertificatesRaw["load_folders"] = tlsCfg["load_folders"].(json.RawMessage)
if hosts, ok := tlsCfg["hosts"].([]string); ok {
httpApp.Servers["ingress_server"].AutoHTTPS.Skip = hosts
}
}
if !c.usingConfigMap {
serverRoutes, err := caddy.ConvertToCaddyConfig(c.resourceStore.Ingresses)
if err != nil {
return errors.Wrap(err, "converting ingress resources to caddy config")
}
// set the http server routes
httpApp.Servers["ingress_server"].Routes = serverRoutes
}
// reload caddy with new config
err = c.reloadCaddy(cfg)
if err != nil {
return errors.Wrap(err, "caddy config reload")
}
return nil
}

View File

@ -0,0 +1,71 @@
package controller
import (
"github.com/caddyserver/ingress/internal/k8s"
v1 "k8s.io/api/core/v1"
)
// ConfigMapAddedAction provides an implementation of the action interface.
type ConfigMapAddedAction struct {
resource *v1.ConfigMap
}
// ConfigMapUpdatedAction provides an implementation of the action interface.
type ConfigMapUpdatedAction struct {
resource *v1.ConfigMap
oldResource *v1.ConfigMap
}
// ConfigMapDeletedAction provides an implementation of the action interface.
type ConfigMapDeletedAction struct {
resource *v1.ConfigMap
}
// onConfigMapAdded runs when a configmap is added to the namespace.
func (c *CaddyController) onConfigMapAdded(obj *v1.ConfigMap) {
c.syncQueue.Add(ConfigMapAddedAction{
resource: obj,
})
}
// onConfigMapUpdated is run when a configmap is updated in the namespace.
func (c *CaddyController) onConfigMapUpdated(old *v1.ConfigMap, new *v1.ConfigMap) {
c.syncQueue.Add(ConfigMapUpdatedAction{
resource: new,
oldResource: old,
})
}
// onConfigMapDeleted is run when an configmap is deleted from the namespace.
func (c *CaddyController) onConfigMapDeleted(obj *v1.ConfigMap) {
c.syncQueue.Add(ConfigMapDeletedAction{
resource: obj,
})
}
func (r ConfigMapAddedAction) handle(c *CaddyController) error {
c.logger.Infof("ConfigMap created (%s/%s)", r.resource.Namespace, r.resource.Name)
cfg, err := k8s.ParseConfigMap(r.resource)
if err == nil {
c.resourceStore.ConfigMap = cfg
}
return err
}
func (r ConfigMapUpdatedAction) handle(c *CaddyController) error {
c.logger.Infof("ConfigMap updated (%s/%s)", r.resource.Namespace, r.resource.Name)
cfg, err := k8s.ParseConfigMap(r.resource)
if err == nil {
c.resourceStore.ConfigMap = cfg
}
return err
}
func (r ConfigMapDeletedAction) handle(c *CaddyController) error {
c.logger.Infof("ConfigMap deleted (%s/%s)", r.resource.Namespace, r.resource.Name)
c.resourceStore.ConfigMap = nil
return nil
}

View File

@ -0,0 +1,70 @@
package controller
import (
"k8s.io/api/networking/v1beta1"
)
// IngressAddedAction provides an implementation of the action interface.
type IngressAddedAction struct {
resource *v1beta1.Ingress
}
// IngressUpdatedAction provides an implementation of the action interface.
type IngressUpdatedAction struct {
resource *v1beta1.Ingress
oldResource *v1beta1.Ingress
}
// IngressDeletedAction provides an implementation of the action interface.
type IngressDeletedAction struct {
resource *v1beta1.Ingress
}
// onIngressAdded runs when an ingress resource is added to the cluster.
func (c *CaddyController) onIngressAdded(obj *v1beta1.Ingress) {
c.syncQueue.Add(IngressAddedAction{
resource: obj,
})
}
// onIngressUpdated is run when an ingress resource is updated in the cluster.
func (c *CaddyController) onIngressUpdated(old *v1beta1.Ingress, new *v1beta1.Ingress) {
c.syncQueue.Add(IngressUpdatedAction{
resource: new,
oldResource: old,
})
}
// onIngressDeleted is run when an ingress resource is deleted from the cluster.
func (c *CaddyController) onIngressDeleted(obj *v1beta1.Ingress) {
c.syncQueue.Add(IngressDeletedAction{
resource: obj,
})
}
func (r IngressAddedAction) handle(c *CaddyController) error {
c.logger.Infof("Ingress created (%s/%s)", r.resource.Namespace, r.resource.Name)
// add this ingress to the internal store
c.resourceStore.AddIngress(r.resource)
// Ingress may now have a TLS config
return c.watchTLSSecrets()
}
func (r IngressUpdatedAction) handle(c *CaddyController) error {
c.logger.Infof("Ingress updated (%s/%s)", r.resource.Namespace, r.resource.Name)
// add or update this ingress in the internal store
c.resourceStore.AddIngress(r.resource)
// Ingress may now have a TLS config
return c.watchTLSSecrets()
}
func (r IngressDeletedAction) handle(c *CaddyController) error {
c.logger.Infof("Ingress deleted (%s/%s)", r.resource.Namespace, r.resource.Name)
// delete all resources from caddy config that are associated with this resource
c.resourceStore.PluckIngress(r.resource)
return nil
}

View File

@ -0,0 +1,139 @@
package controller
import (
"github.com/caddyserver/ingress/internal/k8s"
"go.uber.org/zap"
"gopkg.in/go-playground/pool.v3"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
"k8s.io/client-go/kubernetes"
"net"
"sort"
"strings"
)
// dispatchSync is run every syncInterval duration to sync ingress source address fields.
func (c *CaddyController) dispatchSync() {
c.syncQueue.Add(SyncStatusAction{})
}
// SyncStatusAction provides an implementation of the action interface.
type SyncStatusAction struct {
}
// handle is run when a syncStatusAction appears in the queue.
func (r SyncStatusAction) handle(c *CaddyController) error {
return c.syncStatus(c.resourceStore.Ingresses)
}
// syncStatus ensures that the ingress source address points to this ingress controller's IP address.
func (c *CaddyController) syncStatus(ings []*v1beta1.Ingress) error {
addrs, err := k8s.GetAddresses(c.podInfo, c.kubeClient)
if err != nil {
return err
}
c.logger.Debugf("Syncing %d Ingress resources source addresses", len(ings))
c.updateIngStatuses(sliceToLoadBalancerIngress(addrs), ings)
return nil
}
// updateIngStatuses starts a queue and adds all monitored ingresses to update their status source address to the on
// that the ingress controller is running on. This is called by the syncStatus queue.
func (c *CaddyController) updateIngStatuses(controllerAddresses []apiv1.LoadBalancerIngress, ings []*v1beta1.Ingress) {
p := pool.NewLimited(10)
defer p.Close()
batch := p.Batch()
sort.SliceStable(controllerAddresses, lessLoadBalancerIngress(controllerAddresses))
for _, ing := range ings {
curIPs := ing.Status.LoadBalancer.Ingress
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
// check to see if ingresses source address does not match the ingress controller's.
if ingressSliceEqual(curIPs, controllerAddresses) {
c.logger.Debugf("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
continue
}
batch.Queue(runUpdate(c.logger, ing, controllerAddresses, c.kubeClient))
}
batch.QueueComplete()
batch.WaitAll()
}
// runUpdate updates the ingress status field.
func runUpdate(logger *zap.SugaredLogger, ing *v1beta1.Ingress, status []apiv1.LoadBalancerIngress, client *kubernetes.Clientset) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
updated, err := k8s.UpdateIngressStatus(client, ing, status)
if err != nil {
logger.Warnf("error updating ingress rule: %v", err)
} else {
logger.Debugf(
"updating Ingress %v/%v status from %v to %v",
ing.Namespace,
ing.Name,
ing.Status.LoadBalancer.Ingress,
updated.Status.LoadBalancer.Ingress,
)
}
return true, nil
}
}
// ingressSliceEqual determines if the ingress source matches the ingress controller's.
func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i].IP != rhs[i].IP {
return false
}
if lhs[i].Hostname != rhs[i].Hostname {
return false
}
}
return true
}
// lessLoadBalancerIngress is a sorting function for ingress hostnames.
func lessLoadBalancerIngress(addrs []apiv1.LoadBalancerIngress) func(int, int) bool {
return func(a, b int) bool {
switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
case -1:
return true
case 1:
return false
}
return addrs[a].IP < addrs[b].IP
}
}
// sliceToLoadBalancerIngress converts a slice of IP and/or hostnames to LoadBalancerIngress
func sliceToLoadBalancerIngress(endpoints []string) []apiv1.LoadBalancerIngress {
lbi := []apiv1.LoadBalancerIngress{}
for _, ep := range endpoints {
if net.ParseIP(ep) == nil {
lbi = append(lbi, apiv1.LoadBalancerIngress{Hostname: ep})
} else {
lbi = append(lbi, apiv1.LoadBalancerIngress{IP: ep})
}
}
sort.SliceStable(lbi, func(a, b int) bool {
return lbi[a].IP < lbi[b].IP
})
return lbi
}

View File

@ -0,0 +1,131 @@
package controller
import (
"github.com/caddyserver/ingress/internal/k8s"
"io/ioutil"
apiv1 "k8s.io/api/core/v1"
"os"
"path/filepath"
)
var CertFolder = filepath.FromSlash("/etc/caddy/certs")
// SecretAddedAction provides an implementation of the action interface.
type SecretAddedAction struct {
resource *apiv1.Secret
}
// SecretUpdatedAction provides an implementation of the action interface.
type SecretUpdatedAction struct {
resource *apiv1.Secret
oldResource *apiv1.Secret
}
// SecretDeletedAction provides an implementation of the action interface.
type SecretDeletedAction struct {
resource *apiv1.Secret
}
// onSecretAdded runs when a TLS secret resource is added to the cluster.
func (c *CaddyController) onSecretAdded(obj *apiv1.Secret) {
if k8s.IsManagedTLSSecret(obj, c.resourceStore.Ingresses) {
c.syncQueue.Add(SecretAddedAction{
resource: obj,
})
}
}
// onSecretUpdated is run when a TLS secret resource is updated in the cluster.
func (c *CaddyController) onSecretUpdated(old *apiv1.Secret, new *apiv1.Secret) {
if k8s.IsManagedTLSSecret(new, c.resourceStore.Ingresses) {
c.syncQueue.Add(SecretUpdatedAction{
resource: new,
oldResource: old,
})
}
}
// onSecretDeleted is run when a TLS secret resource is deleted from the cluster.
func (c *CaddyController) onSecretDeleted(obj *apiv1.Secret) {
c.syncQueue.Add(SecretDeletedAction{
resource: obj,
})
}
// writeFile writes a secret to a .pem file on disk.
func writeFile(s *apiv1.Secret) error {
content := make([]byte, 0)
for _, cert := range s.Data {
content = append(content, cert...)
}
err := ioutil.WriteFile(filepath.Join(CertFolder, s.Name+".pem"), content, 0644)
if err != nil {
return err
}
return nil
}
func (r SecretAddedAction) handle(c *CaddyController) error {
c.logger.Infof("TLS secret created (%s/%s)", r.resource.Namespace, r.resource.Name)
return writeFile(r.resource)
}
func (r SecretUpdatedAction) handle(c *CaddyController) error {
c.logger.Infof("TLS secret updated (%s/%s)", r.resource.Namespace, r.resource.Name)
return writeFile(r.resource)
}
func (r SecretDeletedAction) handle(c *CaddyController) error {
c.logger.Infof("TLS secret deleted (%s/%s)", r.resource.Namespace, r.resource.Name)
return os.Remove(filepath.Join(CertFolder, r.resource.Name+".pem"))
}
// watchTLSSecrets Start listening to TLS secrets if at least one ingress needs it.
// It will sync the CertFolder with TLS secrets
func (c *CaddyController) watchTLSSecrets() error {
if c.informers.TLSSecret == nil && c.resourceStore.HasManagedTLS() {
// Init informers
params := k8s.TLSSecretParams{
InformerFactory: c.factories.WatchedNamespace,
}
c.informers.TLSSecret = k8s.WatchTLSSecrets(params, k8s.TLSSecretHandlers{
AddFunc: c.onSecretAdded,
UpdateFunc: c.onSecretUpdated,
DeleteFunc: c.onSecretDeleted,
})
// Run it
go c.informers.TLSSecret.Run(c.stopChan)
// Sync secrets
secrets, err := k8s.ListTLSSecrets(params, c.resourceStore.Ingresses)
if err != nil {
return err
}
if _, err := os.Stat(CertFolder); os.IsNotExist(err) {
err = os.MkdirAll(CertFolder, 0755)
if err != nil {
return err
}
}
for _, secret := range secrets {
content := make([]byte, 0)
for _, cert := range secret.Data {
content = append(content, cert...)
}
err := ioutil.WriteFile(filepath.Join(CertFolder, secret.Name+".pem"), content, 0644)
if err != nil {
return err
}
}
}
return nil
}

View File

@ -1,185 +0,0 @@
package controller
import (
"fmt"
"github.com/caddyserver/ingress/internal/caddy"
caddy2 "github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/caddyserver/caddy/v2/modules/caddytls"
"github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)
type ConfigMapOptions struct {
Debug bool `json:"debug"`
AcmeCA string `json:"acmeCA"`
Email string `json:"email"`
}
// onConfigMapAdded is run when a config map is added to the cluster.
func (c *CaddyController) onConfigMapAdded(obj interface{}) {
c.syncQueue.Add(ConfigMapAddedAction{
resource: obj,
})
}
// onConfigMapUpdated is run when an ingress resource is updated in the cluster.
func (c *CaddyController) onConfigMapUpdated(old interface{}, new interface{}) {
c.syncQueue.Add(ConfigMapUpdatedAction{
resource: new,
oldResource: old,
})
}
// onConfigMapDeleted is run when an ingress resource is deleted from the cluster.
func (c *CaddyController) onConfigMapDeleted(obj interface{}) {
c.syncQueue.Add(ConfigMapDeletedAction{
resource: obj,
})
}
// ConfigMapAddedAction provides an implementation of the action interface.
type ConfigMapAddedAction struct {
resource interface{}
}
// ConfigMapUpdatedAction provides an implementation of the action interface.
type ConfigMapUpdatedAction struct {
resource interface{}
oldResource interface{}
}
// ConfigMapDeletedAction provides an implementation of the action interface.
type ConfigMapDeletedAction struct {
resource interface{}
}
func (r ConfigMapAddedAction) handle(c *CaddyController) error {
cfgMap, ok := r.resource.(*v1.ConfigMap)
if !ok {
return fmt.Errorf("ConfigMapAddedAction: incoming resource is not of type configmap")
}
// only care about the caddy config map
if !changeTriggerUpdate(c, cfgMap) {
return nil
}
logrus.Info("New configmap detected, updating Caddy config...")
// save to the store the current config map to use
c.resourceStore.ConfigMap = cfgMap
err := regenerateConfig(c)
if err != nil {
return err
}
logrus.Info("Caddy reloaded successfully.")
return nil
}
func (r ConfigMapUpdatedAction) handle(c *CaddyController) error {
cfgMap, ok := r.resource.(*v1.ConfigMap)
if !ok {
return fmt.Errorf("ConfigMapUpdatedAction: incoming resource is not of type configmap")
}
// only care about the caddy config map
if !changeTriggerUpdate(c, cfgMap) {
return nil
}
logrus.Info("ConfigMap resource updated, updating Caddy config...")
// save to the store the current config map to use
c.resourceStore.ConfigMap = cfgMap
err := regenerateConfig(c)
if err != nil {
return err
}
logrus.Info("Caddy reloaded successfully.")
return nil
}
func (r ConfigMapDeletedAction) handle(c *CaddyController) error {
cfgMap, ok := r.resource.(*v1.ConfigMap)
if !ok {
return fmt.Errorf("ConfigMapDeletedAction: incoming resource is not of type configmap")
}
// only care about the caddy config map
if !changeTriggerUpdate(c, cfgMap) {
return nil
}
logrus.Info("ConfigMap resource deleted, updating Caddy config...")
// delete config map from internal store
c.resourceStore.ConfigMap = nil
err := regenerateConfig(c)
if err != nil {
return err
}
logrus.Info("Caddy reloaded successfully.")
return nil
}
func setConfigMapOptions(c *CaddyController, cfg *caddy.Config) error {
// parse configmap
cfgMap := ConfigMapOptions{}
config := &mapstructure.DecoderConfig{
Metadata: nil,
WeaklyTypedInput: true,
Result: &cfgMap,
TagName: "json",
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
logrus.Warningf("unexpected error creating decoder: %v", err)
}
err = decoder.Decode(c.resourceStore.ConfigMap.Data)
if err != nil {
logrus.Warningf("unexpected error parsing configmap: %v", err)
}
logrus.Infof("using config map options: %+v to %+v", c.resourceStore.ConfigMap.Data, cfgMap)
// merge configmap options to CaddyConfig
tlsApp := cfg.Apps["tls"].(*caddytls.TLS)
//httpApp := cfg.Apps["http"].(*caddyhttp.App)
if cfgMap.Debug {
cfg.Logging.Logs = map[string]*caddy2.CustomLog{"default": {Level: "DEBUG"}}
}
if cfgMap.AcmeCA != "" || cfgMap.Email != "" {
acmeIssuer := caddytls.ACMEIssuer{}
if cfgMap.AcmeCA != "" {
acmeIssuer.CA = cfgMap.AcmeCA
}
if cfgMap.Email != "" {
acmeIssuer.Email = cfgMap.Email
}
tlsApp.Automation = &caddytls.AutomationConfig{
Policies: []*caddytls.AutomationPolicy{
{IssuerRaw: caddyconfig.JSONModuleObject(acmeIssuer, "module", "acme", nil)},
},
}
}
return nil
}
func changeTriggerUpdate(c *CaddyController, cfgMap *v1.ConfigMap) bool {
return cfgMap.Namespace == c.podInfo.Namespace && cfgMap.Name == c.config.ConfigMapName
}

View File

@ -1,167 +1,196 @@
package controller
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"time"
"github.com/caddyserver/caddy/v2"
c "github.com/caddyserver/ingress/internal/caddy"
"github.com/caddyserver/ingress/internal/pod"
"github.com/caddyserver/ingress/internal/store"
"github.com/caddyserver/ingress/internal/k8s"
"github.com/caddyserver/ingress/pkg/storage"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"os"
"time"
// load required caddy plugins
_ "github.com/caddyserver/caddy/v2/modules/caddyhttp/reverseproxy"
_ "github.com/caddyserver/caddy/v2/modules/caddytls"
_ "github.com/caddyserver/caddy/v2/modules/caddytls/standardstek"
_ "github.com/caddyserver/caddy/v2/modules/metrics"
_ "github.com/caddyserver/ingress/pkg/proxy"
)
const (
// how often we should attempt to keep ingress resource's source address in sync
syncInterval = time.Second * 30
// we can sync secrets every hour since we still have events listening on updated, deletes, etc
secretSyncInterval = time.Hour * 1
// how often we resync informers resources (besides receiving updates)
resourcesSyncInterval = time.Hour * 1
)
// Action is an interface for ingress actions.
type Action interface {
handle(c *CaddyController) error
}
// Options represents ingress controller config received through cli arguments.
type Options struct {
WatchNamespace string
ConfigMapName string
Verbose bool
LeaseId string
}
// Store contains resources used to generate Caddy config
type Store struct {
Options *Options
ConfigMap *k8s.ConfigMapOptions
Ingresses []*v1beta1.Ingress
}
// Informer defines the required SharedIndexInformers that interact with the API server.
type Informer struct {
Ingress cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
TLSSecret cache.SharedIndexInformer
}
// Lister contains object listers (stores).
type Listers struct {
Ingress cache.Store
ConfigMap cache.Store
// InformerFactory contains shared informer factory
// We need to type of factory:
// - One used to watch resources in the Pod namespaces (caddy config, secrets...)
// - Another one for Ingress resources in the selected namespace
type InformerFactory struct {
PodNamespace informers.SharedInformerFactory
WatchedNamespace informers.SharedInformerFactory
}
type Converter interface {
ConvertToCaddyConfig(namespace string, store *Store) (interface{}, error)
}
// CaddyController represents an caddy ingress controller.
type CaddyController struct {
resourceStore *store.Store
resourceStore *Store
kubeClient *kubernetes.Clientset
logger *zap.SugaredLogger
// main queue syncing ingresses, configmaps, ... with caddy
syncQueue workqueue.RateLimitingInterface
// informer factories
factories *InformerFactory
// informer contains the cache Informers
informers *Informer
// listers contains the cache.Store interfaces used in the ingress controller
listers *Listers
// cert manager manage user provided certs
certManager *CertManager
// ingress controller pod infos
podInfo *pod.Info
podInfo *k8s.Info
// config of the controller (flags)
config c.ControllerConfig
// save last applied caddy config
lastAppliedConfig []byte
// if a /etc/caddy/config.json is detected, it will be used instead of ingresses
usingConfigMap bool
converter Converter
stopChan chan struct{}
}
// NewCaddyController returns an instance of the caddy ingress controller.
func NewCaddyController(kubeClient *kubernetes.Clientset, cfg c.ControllerConfig) *CaddyController {
func NewCaddyController(
logger *zap.SugaredLogger,
kubeClient *kubernetes.Clientset,
opts Options,
converter Converter,
stopChan chan struct{},
) *CaddyController {
controller := &CaddyController{
logger: logger,
kubeClient: kubeClient,
converter: converter,
stopChan: stopChan,
syncQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
config: cfg,
informers: &Informer{},
listers: &Listers{},
factories: &InformerFactory{},
}
podInfo, err := pod.GetPodDetails(kubeClient)
podInfo, err := k8s.GetPodDetails(kubeClient)
if err != nil {
logrus.Fatalf("Unexpected error obtaining pod information: %v", err)
logger.Fatalf("Unexpected error obtaining pod information: %v", err)
}
controller.podInfo = podInfo
// load caddy config from file if mounted with config map
caddyCfgMap, err := loadCaddyConfigFile("/etc/caddy/config.json")
if err != nil {
logrus.Fatalf("Unexpected error reading config.json: %v", err)
// Create informer factories
controller.factories.PodNamespace = informers.NewSharedInformerFactoryWithOptions(
kubeClient,
resourcesSyncInterval,
informers.WithNamespace(controller.podInfo.Namespace),
)
controller.factories.WatchedNamespace = informers.NewSharedInformerFactoryWithOptions(
kubeClient,
resourcesSyncInterval,
informers.WithNamespace(opts.WatchNamespace),
)
// Watch ingress resources in selected namespaces
ingressParams := k8s.IngressParams{
InformerFactory: controller.factories.WatchedNamespace,
// TODO Add configuration for that
ClassName: "caddy",
ClassNameRequired: false,
}
if caddyCfgMap != nil {
controller.usingConfigMap = true
}
// create 2 types of informers: one for the caddy NS and another one for ingress resources
ingressInformerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, syncInterval, informers.WithNamespace(cfg.WatchNamespace))
caddyInformerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, syncInterval, informers.WithNamespace(podInfo.Namespace))
controller.informers.Ingress = ingressInformerFactory.Networking().V1beta1().Ingresses().Informer()
controller.listers.Ingress = controller.informers.Ingress.GetStore()
controller.informers.ConfigMap = caddyInformerFactory.Core().V1().ConfigMaps().Informer()
controller.listers.ConfigMap = controller.informers.ConfigMap.GetStore()
// add event handlers
controller.informers.Ingress.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.onResourceAdded,
UpdateFunc: controller.onResourceUpdated,
DeleteFunc: controller.onResourceDeleted,
controller.informers.Ingress = k8s.WatchIngresses(ingressParams, k8s.IngressHandlers{
AddFunc: controller.onIngressAdded,
UpdateFunc: controller.onIngressUpdated,
DeleteFunc: controller.onIngressDeleted,
})
controller.informers.ConfigMap.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Watch Configmap in the pod's namespace for global options
cmOptionsParams := k8s.ConfigMapParams{
Namespace: podInfo.Namespace,
InformerFactory: controller.factories.PodNamespace,
ConfigMapName: opts.ConfigMapName,
}
controller.informers.ConfigMap = k8s.WatchConfigMaps(cmOptionsParams, k8s.ConfigMapHandlers{
AddFunc: controller.onConfigMapAdded,
UpdateFunc: controller.onConfigMapUpdated,
DeleteFunc: controller.onConfigMapDeleted,
})
// setup store to keep track of resources
controller.resourceStore = store.NewStore(kubeClient, podInfo.Namespace, cfg, caddyCfgMap)
// attempt to do initial sync of status addresses with ingresses
controller.dispatchSync()
// register kubernetes specific cert-magic storage module
// register kubernetes specific cert-magic storage module and proxy module
caddy.RegisterModule(storage.SecretStorage{})
// Create resource store
controller.resourceStore = NewStore(opts)
return controller
}
// Shutdown stops the caddy controller.
func (c *CaddyController) Shutdown() error {
// remove this ingress controller's ip from ingress resources.
c.updateIngStatuses([]apiv1.LoadBalancerIngress{apiv1.LoadBalancerIngress{}}, c.resourceStore.Ingresses)
c.updateIngStatuses([]apiv1.LoadBalancerIngress{{}}, c.resourceStore.Ingresses)
return nil
}
// Run method starts the ingress controller.
func (c *CaddyController) Run(stopCh chan struct{}) {
err := regenerateConfig(c)
if err != nil {
logrus.Errorf("initial caddy config load failed, %v", err.Error())
}
func (c *CaddyController) Run() {
defer runtime.HandleCrash()
defer c.syncQueue.ShutDown()
// start informers where we listen to new / updated resources
go c.informers.ConfigMap.Run(stopCh)
go c.informers.Ingress.Run(stopCh)
go c.informers.ConfigMap.Run(c.stopChan)
go c.informers.Ingress.Run(c.stopChan)
// wait for all involved caches to be synced before processing items
// from the queue
if !cache.WaitForCacheSync(stopCh,
if !cache.WaitForCacheSync(c.stopChan,
c.informers.ConfigMap.HasSynced,
c.informers.Ingress.HasSynced,
) {
@ -169,19 +198,19 @@ func (c *CaddyController) Run(stopCh chan struct{}) {
}
// start processing events for syncing ingress resources
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, c.stopChan)
// start ingress status syncher and run every syncInterval
go wait.Until(c.dispatchSync, syncInterval, stopCh)
go wait.Until(c.dispatchSync, syncInterval, c.stopChan)
// wait for SIGTERM
<-stopCh
logrus.Info("stopping ingress controller")
<-c.stopChan
c.logger.Info("stopping ingress controller")
var exitCode int
err = c.Shutdown()
err := c.Shutdown()
if err != nil {
logrus.Errorf("could not shutdown ingress controller properly, %v", err.Error())
c.logger.Error("could not shutdown ingress controller properly, " + err.Error())
exitCode = 1
}
@ -211,6 +240,13 @@ func (c *CaddyController) processNextItem() bool {
err := action.(Action).handle(c)
if err != nil {
c.handleErr(err, action)
return true
}
err = c.reloadCaddy()
if err != nil {
c.logger.Error("could not reload caddy: " + err.Error())
return true
}
return true
@ -218,41 +254,31 @@ func (c *CaddyController) processNextItem() bool {
// handleErrs reports errors received from queue actions.
func (c *CaddyController) handleErr(err error, action interface{}) {
logrus.Error(err)
c.logger.Error(err.Error())
}
func loadCaddyConfigFile(cfgPath string) (*c.Config, error) {
var caddyCfgMap *c.Config
if _, err := os.Stat(cfgPath); !os.IsNotExist(err) {
file, err := os.Open(cfgPath)
if err != nil {
return nil, err
}
defer file.Close()
b, err := ioutil.ReadAll(file)
if err != nil {
return nil, err
}
json.Unmarshal(b, &caddyCfgMap)
} else {
return nil, nil
// reloadCaddy generate a caddy config from controller's store
func (c *CaddyController) reloadCaddy() error {
config, err := c.converter.ConvertToCaddyConfig(c.podInfo.Namespace, c.resourceStore)
if err != nil {
return err
}
return caddyCfgMap, nil
}
// reloadCaddy reloads the internal caddy instance with config from the internal store.
func (c *CaddyController) reloadCaddy(config *c.Config) error {
j, err := json.Marshal(config)
if err != nil {
return err
}
err = caddy.Load(j, true)
if bytes.Equal(c.lastAppliedConfig, j) {
c.logger.Debug("caddy config did not change, skipping reload")
return nil
}
c.logger.Debug("reloading caddy with config %v" + string(j))
err = caddy.Load(j, false)
if err != nil {
return fmt.Errorf("could not reload caddy config %v", err.Error())
}
c.lastAppliedConfig = j
return nil
}

View File

@ -1,98 +0,0 @@
package controller
import (
"fmt"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
pool "gopkg.in/go-playground/pool.v3"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// updateIngStatuses starts a queue and adds all monitored ingresses to update their status source address to the on
// that the ingress controller is running on. This is called by the syncStatus queue.
func (c *CaddyController) updateIngStatuses(controllerAddresses []apiv1.LoadBalancerIngress, ings []*v1beta1.Ingress) {
p := pool.NewLimited(10)
defer p.Close()
batch := p.Batch()
sort.SliceStable(controllerAddresses, lessLoadBalancerIngress(controllerAddresses))
for _, ing := range ings {
curIPs := ing.Status.LoadBalancer.Ingress
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
// check to see if ingresses source address does not match the ingress controller's.
if ingressSliceEqual(curIPs, controllerAddresses) {
logrus.Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
continue
}
batch.Queue(runUpdate(ing, controllerAddresses, c.kubeClient))
}
batch.QueueComplete()
batch.WaitAll()
}
// runUpdate updates the ingress status field.
func runUpdate(ing *v1beta1.Ingress, status []apiv1.LoadBalancerIngress, client *kubernetes.Clientset) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
ingClient := client.NetworkingV1beta1().Ingresses(ing.Namespace)
currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("unexpected error searching Ingress %v/%v", ing.Namespace, ing.Name))
}
logrus.Infof("updating Ingress %v/%v status from %v to %v", currIng.Namespace, currIng.Name, currIng.Status.LoadBalancer.Ingress, status)
currIng.Status.LoadBalancer.Ingress = status
_, err = ingClient.UpdateStatus(currIng)
if err != nil {
logrus.Warningf("error updating ingress rule: %v", err)
}
return true, nil
}
}
// ingressSliceEqual determines if the ingress source matches the ingress controller's.
func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i].IP != rhs[i].IP {
return false
}
if lhs[i].Hostname != rhs[i].Hostname {
return false
}
}
return true
}
// lessLoadBalancerIngress is a sorting function for ingress hostnames.
func lessLoadBalancerIngress(addrs []apiv1.LoadBalancerIngress) func(int, int) bool {
return func(a, b int) bool {
switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
case -1:
return true
case 1:
return false
}
return addrs[a].IP < addrs[b].IP
}
}

View File

@ -1,56 +0,0 @@
package controller
import (
"net"
"sort"
"github.com/caddyserver/ingress/internal/pod"
"github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
)
// dispatchSync is run every syncInterval duration to sync ingress source address fields.
func (c *CaddyController) dispatchSync() {
c.syncQueue.Add(SyncStatusAction{})
}
// SyncStatusAction provides an implementation of the action interface.
type SyncStatusAction struct {
}
// handle is run when a syncStatusAction appears in the queue.
func (r SyncStatusAction) handle(c *CaddyController) error {
return c.syncStatus(c.resourceStore.Ingresses)
}
// syncStatus ensures that the ingress source address points to this ingress controller's IP address.
func (c *CaddyController) syncStatus(ings []*v1beta1.Ingress) error {
addrs, err := pod.GetAddresses(c.podInfo, c.kubeClient)
if err != nil {
return err
}
logrus.Info("Synching Ingress resource source addresses")
c.updateIngStatuses(sliceToLoadBalancerIngress(addrs), ings)
return nil
}
// sliceToLoadBalancerIngress converts a slice of IP and/or hostnames to LoadBalancerIngress
func sliceToLoadBalancerIngress(endpoints []string) []apiv1.LoadBalancerIngress {
lbi := []apiv1.LoadBalancerIngress{}
for _, ep := range endpoints {
if net.ParseIP(ep) == nil {
lbi = append(lbi, apiv1.LoadBalancerIngress{Hostname: ep})
} else {
lbi = append(lbi, apiv1.LoadBalancerIngress{IP: ep})
}
}
sort.SliceStable(lbi, func(a, b int) bool {
return lbi[a].IP < lbi[b].IP
})
return lbi
}

View File

@ -0,0 +1,64 @@
package controller
import (
"github.com/caddyserver/ingress/internal/k8s"
"k8s.io/api/networking/v1beta1"
)
// NewStore returns a new store that keeps track of K8S resources needed by the controller.
func NewStore(opts Options) *Store {
s := &Store{
Options: &opts,
Ingresses: []*v1beta1.Ingress{},
ConfigMap: &k8s.ConfigMapOptions{},
}
return s
}
// AddIngress adds an ingress to the store. It updates the element at the given index if it is unique.
func (s *Store) AddIngress(ing *v1beta1.Ingress) {
isUniq := true
for i := range s.Ingresses {
in := s.Ingresses[i]
if in.GetUID() == ing.GetUID() {
isUniq = false
s.Ingresses[i] = ing
}
}
if isUniq {
s.Ingresses = append(s.Ingresses, ing)
}
}
// PluckIngress removes the ingress passed in as an argument from the stores list of ingresses.
func (s *Store) PluckIngress(ing *v1beta1.Ingress) {
id := ing.GetUID()
var index int
var hasMatch bool
for i := range s.Ingresses {
if s.Ingresses[i].GetUID() == id {
index = i
hasMatch = true
break
}
}
// since order is not important we can swap the element to delete with the one at the end of the slice
// and then set ingresses to the n-1 first elements
if hasMatch {
s.Ingresses[len(s.Ingresses)-1], s.Ingresses[index] = s.Ingresses[index], s.Ingresses[len(s.Ingresses)-1]
s.Ingresses = s.Ingresses[:len(s.Ingresses)-1]
}
}
func (s *Store) HasManagedTLS() bool {
for _, ing := range s.Ingresses {
if len(ing.Spec.TLS) > 0 {
return true
}
}
return false
}

View File

@ -1,232 +0,0 @@
package controller
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
var certDir = filepath.FromSlash("/etc/caddy/certs")
// CertManager manager user defined certs on ingress resources for caddy.
type CertManager struct {
certInformer cache.Controller
certs []certificate
syncQueue workqueue.RateLimitingInterface
synced bool
}
type certificate struct {
name string
namespace string
}
// HandleOwnCertManagement handles whether we need to watch for user defined
// certs and update caddy.
func (c *CaddyController) HandleOwnCertManagement(ings []*v1beta1.Ingress) (map[string]interface{}, error) {
var certs []certificate
var hosts []string
// do we have any ingresses with TLS certificates and secrets defined on them?
for _, ing := range ings {
for _, tlsRule := range ing.Spec.TLS {
for _, h := range tlsRule.Hosts {
hosts = append(hosts, h)
}
c := certificate{name: tlsRule.SecretName, namespace: ing.Namespace}
certs = append(certs, c)
}
}
// run the caddy cert sync now (ONE TIME) but only run it in the future
// when a cert has been updated (or a new cert has been added)
if len(certs) > 0 && c.certManager == nil {
err := syncCertificates(certs, c.kubeClient)
if err != nil {
return nil, err
}
informer, err := newSecretInformer(c)
if err != nil {
return nil, err
}
c.certManager = &CertManager{
certs: certs,
certInformer: informer,
syncQueue: c.syncQueue,
}
// start the informer to listen to secrets
go informer.Run(c.stopChan)
}
if len(certs) > 0 {
return getTLSConfig(hosts), nil
}
return nil, nil
}
// newSecretInformer creates an informer to listen to updates to secrets.
func newSecretInformer(c *CaddyController) (cache.Controller, error) {
secretInformer := sv1.NewSecretInformer(c.kubeClient, c.config.WatchNamespace, secretSyncInterval, cache.Indexers{})
secretInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onSecretResourceAdded,
UpdateFunc: c.onSecretResourceUpdated,
DeleteFunc: c.onSecretResourceDeleted,
})
return secretInformer, nil
}
// getTLSConfig returns the caddy config for certificate management to load all certs from certDir.
func getTLSConfig(hosts []string) map[string]interface{} {
return map[string]interface{}{
"load_folders": json.RawMessage(`["` + certDir + `"]`),
"hosts": hosts,
}
}
// syncCertificates downloads the certificate files defined on a ingress resource and
// stores it locally in this pod for use by caddy.
func syncCertificates(certs []certificate, kubeClient *kubernetes.Clientset) error {
logrus.Info("Found TLS certificates on ingress resource. Syncing...")
certData := make(map[string]map[string][]byte, len(certs))
for _, cert := range certs {
s, err := kubeClient.CoreV1().Secrets(cert.namespace).Get(cert.name, metav1.GetOptions{})
if err != nil {
return err
}
certData[cert.name] = s.Data
}
if _, err := os.Stat(certDir); os.IsNotExist(err) {
err = os.MkdirAll(certDir, 0755)
if err != nil {
return err
}
}
// combine crt and key and combine to .pem in cert directory
for secret, data := range certData {
content := make([]byte, 0)
for _, cert := range data {
content = append(content, cert...)
}
err := ioutil.WriteFile(filepath.Join(certDir, secret+".pem"), content, 0644)
if err != nil {
return err
}
}
return nil
}
// SecretResourceAddedAction provides an implementation of the action interface.
type SecretResourceAddedAction struct {
resource *apiv1.Secret
}
// SecretResourceUpdatedAction provides an implementation of the action interface.
type SecretResourceUpdatedAction struct {
resource *apiv1.Secret
oldResource *apiv1.Secret
}
// SecretResourceDeletedAction provides an implementation of the action interface.
type SecretResourceDeletedAction struct {
resource *apiv1.Secret
}
// onSecretResourceAdded runs when a secret resource is added to the cluster.
func (c *CaddyController) onSecretResourceAdded(obj interface{}) {
s, ok := obj.(*apiv1.Secret)
if ok {
for _, secret := range c.certManager.certs {
if s.Name == secret.name {
c.syncQueue.Add(SecretResourceAddedAction{
resource: s,
})
}
}
}
}
// writeFile writes a secret to a .pem file on disk.
func writeFile(s *apiv1.Secret) error {
content := make([]byte, 0)
for _, cert := range s.Data {
content = append(content, cert...)
}
err := ioutil.WriteFile(filepath.Join(certDir, s.Name+".pem"), content, 0644)
if err != nil {
return err
}
return nil
}
// onSecretResourceUpdated is run when a secret resource is updated in the cluster.
func (c *CaddyController) onSecretResourceUpdated(old interface{}, new interface{}) {
s, ok := old.(*apiv1.Secret)
if !ok {
return
}
snew, ok := new.(*apiv1.Secret)
for _, secret := range c.certManager.certs {
if s.Name == secret.name {
c.syncQueue.Add(SecretResourceUpdatedAction{
resource: snew,
oldResource: s,
})
}
}
}
// onSecretResourceDeleted is run when a secret resource is deleted from the cluster.
func (c *CaddyController) onSecretResourceDeleted(obj interface{}) {
s, ok := obj.(*apiv1.Secret)
if ok {
for _, secret := range c.certManager.certs {
if s.Name == secret.name {
c.syncQueue.Add(SecretResourceDeletedAction{
resource: s,
})
}
}
}
}
// handle is run when a SecretResourceDeletedAction appears in the queue.
func (r SecretResourceDeletedAction) handle(c *CaddyController) error {
return os.Remove(filepath.Join(certDir, r.resource.Name+".pem"))
}
// handle is run when a SecretResourceUpdatedAction appears in the queue.
func (r SecretResourceUpdatedAction) handle(c *CaddyController) error {
return writeFile(r.resource)
}
// handle is run when a SecretResourceAddedAction appears in the queue.
func (r SecretResourceAddedAction) handle(c *CaddyController) error {
return writeFile(r.resource)
}

92
internal/k8s/configmap.go Normal file
View File

@ -0,0 +1,92 @@
package k8s
import (
"github.com/caddyserver/caddy/v2"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
v12 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
// ConfigMapOptions represents global options set through a configmap
type ConfigMapOptions struct {
Debug bool `json:"debug,omitempty"`
AcmeCA string `json:"acmeCA,omitempty"`
Email string `json:"email,omitempty"`
ProxyProtocol bool `json:"proxyProtocol,omitempty"`
Metrics bool `json:"metrics,omitempty"`
OnDemandTLS bool `json:"onDemandTLS,omitempty"`
OnDemandRateLimitInterval caddy.Duration `json:"onDemandTLSRateLimitInterval,omitempty"`
OnDemandRateLimitBurst int `json:"onDemandTLSRateLimitBurst,omitempty"`
OnDemandAsk string `json:"onDemandTLSAsk,omitempty"`
}
type ConfigMapHandlers struct {
AddFunc func(obj *v12.ConfigMap)
UpdateFunc func(oldObj, newObj *v12.ConfigMap)
DeleteFunc func(obj *v12.ConfigMap)
}
type ConfigMapParams struct {
Namespace string
InformerFactory informers.SharedInformerFactory
ConfigMapName string
}
func isControllerConfigMap(cm *v12.ConfigMap, name string) bool {
return cm.GetName() == name
}
func WatchConfigMaps(options ConfigMapParams, funcs ConfigMapHandlers) cache.SharedIndexInformer {
informer := options.InformerFactory.Core().V1().ConfigMaps().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm, ok := obj.(*v12.ConfigMap)
if ok && isControllerConfigMap(cm, options.ConfigMapName) {
funcs.AddFunc(cm)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldCM, ok1 := oldObj.(*v12.ConfigMap)
newCM, ok2 := newObj.(*v12.ConfigMap)
if ok1 && ok2 && isControllerConfigMap(newCM, options.ConfigMapName) {
funcs.UpdateFunc(oldCM, newCM)
}
},
DeleteFunc: func(obj interface{}) {
cm, ok := obj.(*v12.ConfigMap)
if ok && isControllerConfigMap(cm, options.ConfigMapName) {
funcs.DeleteFunc(cm)
}
},
})
return informer
}
func ParseConfigMap(cm *v12.ConfigMap) (*ConfigMapOptions, error) {
// parse configmap
cfgMap := ConfigMapOptions{}
config := &mapstructure.DecoderConfig{
Metadata: nil,
WeaklyTypedInput: true,
Result: &cfgMap,
TagName: "json",
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return nil, errors.Wrap(err, "unexpected error creating decoder")
}
err = decoder.Decode(cm.Data)
if err != nil {
return nil, errors.Wrap(err, "unexpected error parsing configmap")
}
return &cfgMap, nil
}

81
internal/k8s/ingress.go Normal file
View File

@ -0,0 +1,81 @@
package k8s
import (
"context"
"fmt"
"github.com/pkg/errors"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type IngressHandlers struct {
AddFunc func(obj *v1beta1.Ingress)
UpdateFunc func(oldObj, newObj *v1beta1.Ingress)
DeleteFunc func(obj *v1beta1.Ingress)
}
type IngressParams struct {
InformerFactory informers.SharedInformerFactory
ClassName string
ClassNameRequired bool
}
func WatchIngresses(options IngressParams, funcs IngressHandlers) cache.SharedIndexInformer {
// TODO Handle new API
informer := options.InformerFactory.Networking().V1beta1().Ingresses().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ingress, ok := obj.(*v1beta1.Ingress)
if ok && IsControllerIngress(options, ingress) {
funcs.AddFunc(ingress)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldIng, ok1 := oldObj.(*v1beta1.Ingress)
newIng, ok2 := newObj.(*v1beta1.Ingress)
if ok1 && ok2 && IsControllerIngress(options, newIng) {
funcs.UpdateFunc(oldIng, newIng)
}
},
DeleteFunc: func(obj interface{}) {
ingress, ok := obj.(*v1beta1.Ingress)
if ok && IsControllerIngress(options, ingress) {
funcs.DeleteFunc(ingress)
}
},
})
return informer
}
// IsControllerIngress check if the ingress object can be controlled by us
// TODO Handle `ingressClassName`
func IsControllerIngress(options IngressParams, ingress *v1beta1.Ingress) bool {
ingressClass := ingress.Annotations["kubernetes.io/ingress.class"]
if !options.ClassNameRequired && ingressClass == "" {
return true
}
return ingressClass == options.ClassName
}
func UpdateIngressStatus(kubeClient *kubernetes.Clientset, ing *v1beta1.Ingress, status []apiv1.LoadBalancerIngress) (*v1beta1.Ingress, error) {
ingClient := kubeClient.NetworkingV1beta1().Ingresses(ing.Namespace)
currIng, err := ingClient.Get(context.TODO(), ing.Name, v1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("unexpected error searching Ingress %v/%v", ing.Namespace, ing.Name))
}
currIng.Status.LoadBalancer.Ingress = status
return ingClient.UpdateStatus(context.TODO(), currIng, v1.UpdateOptions{})
}

View File

@ -1,15 +1,14 @@
package pod
package k8s
import (
"context"
"fmt"
"os"
"github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
)
// Info contains runtime information about the pod running the Ingress controller
@ -24,25 +23,20 @@ type Info struct {
// GetAddresses gets the ip address or name of the node in the cluster that the
// ingress controller is running on.
func GetAddresses(p *Info, kubeClient *kubernetes.Clientset) ([]string, error) {
addrs := []string{}
var addrs []string
// get information about all the pods running the ingress controller
pods, err := kubeClient.CoreV1().Pods(p.Namespace).List(metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(p.Labels).String(),
})
// Get services that may select this pod
svcs, err := kubeClient.CoreV1().Services(p.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
// only Running pods are valid
if pod.Status.Phase != apiv1.PodRunning {
continue
}
name := GetNodeIPOrName(kubeClient, pod.Spec.NodeName, true)
if !sliceutils.StringInSlice(name, addrs) {
addrs = append(addrs, name)
for _, svc := range svcs.Items {
if labels.AreLabelsInWhiteList(svc.Spec.Selector, p.Labels) {
addr := GetAddressFromService(&svc)
if addr != "" {
addrs = append(addrs, addr)
}
}
}
@ -50,31 +44,24 @@ func GetAddresses(p *Info, kubeClient *kubernetes.Clientset) ([]string, error) {
}
// GetNodeIPOrName returns the IP address or the name of a node in the cluster
func GetNodeIPOrName(kubeClient *kubernetes.Clientset, name string, useInternalIP bool) string {
node, err := kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Error getting node %v: %v", name, err)
return ""
}
if useInternalIP {
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeInternalIP {
if address.Address != "" {
return address.Address
func GetAddressFromService(service *apiv1.Service) string {
switch service.Spec.Type {
case apiv1.ServiceTypeNodePort:
case apiv1.ServiceTypeClusterIP:
return service.Spec.ClusterIP
case apiv1.ServiceTypeExternalName:
return service.Spec.ExternalName
case apiv1.ServiceTypeLoadBalancer:
{
if len(service.Status.LoadBalancer.Ingress) > 0 {
ingress := service.Status.LoadBalancer.Ingress[0]
if ingress.Hostname != "" {
return ingress.Hostname
}
return ingress.IP
}
}
}
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeExternalIP {
if address.Address != "" {
return address.Address
}
}
}
return ""
}
@ -88,7 +75,7 @@ func GetPodDetails(kubeClient *kubernetes.Clientset) (*Info, error) {
return nil, fmt.Errorf("unable to get POD information (missing POD_NAME or POD_NAMESPACE environment variable")
}
pod, _ := kubeClient.CoreV1().Pods(podNs).Get(podName, metav1.GetOptions{})
pod, _ := kubeClient.CoreV1().Pods(podNs).Get(context.TODO(), podName, metav1.GetOptions{})
if pod == nil {
return nil, fmt.Errorf("unable to get POD information")
}

View File

@ -0,0 +1,76 @@
package k8s
import (
v12 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
type TLSSecretHandlers struct {
AddFunc func(obj *v12.Secret)
UpdateFunc func(oldObj, newObj *v12.Secret)
DeleteFunc func(obj *v12.Secret)
}
type TLSSecretParams struct {
InformerFactory informers.SharedInformerFactory
}
func WatchTLSSecrets(options TLSSecretParams, funcs TLSSecretHandlers) cache.SharedIndexInformer {
informer := options.InformerFactory.Core().V1().Secrets().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secret, ok := obj.(*v12.Secret)
if ok && secret.Type == v12.SecretTypeTLS {
funcs.AddFunc(secret)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldSecret, ok1 := oldObj.(*v12.Secret)
newSecret, ok2 := newObj.(*v12.Secret)
if ok1 && ok2 && newSecret.Type == v12.SecretTypeTLS {
funcs.UpdateFunc(oldSecret, newSecret)
}
},
DeleteFunc: func(obj interface{}) {
secret, ok := obj.(*v12.Secret)
if ok && secret.Type == v12.SecretTypeTLS {
funcs.DeleteFunc(secret)
}
},
})
return informer
}
func ListTLSSecrets(options TLSSecretParams, ings []*v1beta1.Ingress) ([]*v12.Secret, error) {
lister := options.InformerFactory.Core().V1().Secrets().Lister()
tlsSecrets := []*v12.Secret{}
for _, ing := range ings {
for _, tlsRule := range ing.Spec.TLS {
secret, err := lister.Secrets(ing.Namespace).Get(tlsRule.SecretName)
// TODO Handle errors
if err == nil {
tlsSecrets = append(tlsSecrets, secret)
}
}
}
return tlsSecrets, nil
}
func IsManagedTLSSecret(secret *v12.Secret, ings []*v1beta1.Ingress) bool {
for _, ing := range ings {
for _, tlsRule := range ing.Spec.TLS {
if tlsRule.SecretName == secret.Name && ing.Namespace == secret.Namespace {
return true
}
}
}
return false
}

View File

@ -1,84 +0,0 @@
package store
import (
c "github.com/caddyserver/ingress/internal/caddy"
"github.com/sirupsen/logrus"
k "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// Store represents a collection of ingresses and secrets that we are monitoring.
type Store struct {
Ingresses []*v1beta1.Ingress
Secrets []interface{} // TODO :- should we store the secrets in the ingress object?
ConfigMap *k.ConfigMap
CaddyConfig *c.Config
}
// NewStore returns a new store that keeps track of ingresses and secrets. It will attempt to get
// all current ingresses before returning.
func NewStore(kubeClient *kubernetes.Clientset, namespace string, cfg c.ControllerConfig, cfgMapConfig *c.Config) *Store {
s := &Store{
Ingresses: []*v1beta1.Ingress{},
}
ingresses, err := kubeClient.NetworkingV1beta1().Ingresses(cfg.WatchNamespace).List(v1.ListOptions{})
if err != nil {
logrus.Errorf("could not get existing ingresses in cluster", err)
} else {
for _, i := range ingresses.Items {
s.Ingresses = append(s.Ingresses, &i)
}
}
cfgMap, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(cfg.ConfigMapName, v1.GetOptions{})
if err != nil {
logrus.Warn("could not get option configmap", err)
} else {
s.ConfigMap = cfgMap
}
s.CaddyConfig = cfgMapConfig
return s
}
// AddIngress adds an ingress to the store. It updates the element at the given index if it is unique.
func (s *Store) AddIngress(ing *v1beta1.Ingress) {
isUniq := true
for i := range s.Ingresses {
in := s.Ingresses[i]
if in.GetUID() == ing.GetUID() {
isUniq = false
s.Ingresses[i] = ing
}
}
if isUniq {
s.Ingresses = append(s.Ingresses, ing)
}
}
// PluckIngress removes the ingress passed in as an argument from the stores list of ingresses.
func (s *Store) PluckIngress(ing *v1beta1.Ingress) {
id := ing.GetUID()
var index int
var hasMatch bool
for i := range s.Ingresses {
if s.Ingresses[i].GetUID() == id {
index = i
hasMatch = true
break
}
}
// since order is not important we can swap the element to delete with the one at the end of the slice
// and then set ingresses to the n-1 first elements
if hasMatch {
s.Ingresses[len(s.Ingresses)-1], s.Ingresses[index] = s.Ingresses[index], s.Ingresses[len(s.Ingresses)-1]
s.Ingresses = s.Ingresses[:len(s.Ingresses)-1]
}
}

20
pkg/proxy/proxy.go Normal file
View File

@ -0,0 +1,20 @@
package proxy
import "github.com/caddyserver/caddy/v2"
var (
_ = caddy.Provisioner(&Wrapper{})
_ = caddy.Module(&Wrapper{})
_ = caddy.ListenerWrapper(&Wrapper{})
)
func init() {
caddy.RegisterModule(Wrapper{})
}
func (Wrapper) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "caddy.listeners.proxy_protocol",
New: func() caddy.Module { return new(Wrapper) },
}
}

30
pkg/proxy/wrapper.go Normal file
View File

@ -0,0 +1,30 @@
package proxy
import (
"github.com/caddyserver/caddy/v2"
"net"
"github.com/pires/go-proxyproto"
)
// Wrapper provides PROXY protocol support to Caddy by implementing the caddy.ListenerWrapper interface. It must be loaded before the `tls` listener.
type Wrapper struct {
// Allow is an optional list of CIDR ranges to allow/require PROXY headers from.
Allow []string `json:"allow,omitempty"`
policy proxyproto.PolicyFunc
}
func (pp *Wrapper) Provision(ctx caddy.Context) error {
pp.policy = func(upstream net.Addr) (proxyproto.Policy, error) {
return proxyproto.REQUIRE, nil
}
return nil
}
func (pp *Wrapper) WrapListener(l net.Listener) net.Listener {
pL := &proxyproto.Listener{Listener: l, Policy: pp.policy}
return pL
}

View File

@ -1,17 +1,31 @@
package storage
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/certmagic"
"github.com/sirupsen/logrus"
"github.com/google/uuid"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"regexp"
"strings"
"time"
)
const (
leaseDuration = 5 * time.Second
leaseRenewInterval = 2 * time.Second
leasePollInterval = 5 * time.Second
leasePrefix = "caddy-lock-"
keyPrefix = "caddy.ingress--"
)
// matchLabels are attached to each resource so that they can be found in the future.
@ -19,21 +33,21 @@ var matchLabels = map[string]string{
"manager": "caddy",
}
// labelSelector is the search string that will return all secrets managed by the caddy ingress controller.
var labelSelector = "manager=caddy"
// specialChars is a regex that matches all special characters except '.' and '-'.
var specialChars = regexp.MustCompile("[^0-9a-zA-Z.-]+")
// specialChars is a regex that matches all special characters except '-'.
var specialChars = regexp.MustCompile("[^0-9a-zA-Z-]+")
// cleanKey strips all special characters that are not supported by kubernetes names and converts them to a '.'.
func cleanKey(key string) string {
return "caddy.ingress--" + specialChars.ReplaceAllString(key, "")
// sequences like '.*.' are also converted to a single '.'.
func cleanKey(key string, prefix string) string {
return prefix + specialChars.ReplaceAllString(key, ".")
}
// SecretStorage facilitates storing certificates retrieved by certmagic in kubernetes secrets.
type SecretStorage struct {
Namespace string
KubeClient *kubernetes.Clientset
LeaseId string
logger *zap.Logger
}
func (SecretStorage) CaddyModule() caddy.ModuleInfo {
@ -49,7 +63,11 @@ func (s *SecretStorage) Provision(ctx caddy.Context) error {
// creates the clientset
clientset, _ := kubernetes.NewForConfig(config)
s.logger = ctx.Logger(s)
s.KubeClient = clientset
if s.LeaseId == "" {
s.LeaseId = uuid.New().String()
}
return nil
}
@ -60,18 +78,18 @@ func (s *SecretStorage) CertMagicStorage() (certmagic.Storage, error) {
// Exists returns true if key exists in fs.
func (s *SecretStorage) Exists(key string) bool {
secrets, err := s.KubeClient.CoreV1().Secrets(s.Namespace).List(metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%v", cleanKey(key)),
s.logger.Debug("finding secret", zap.String("name", key))
secrets, err := s.KubeClient.CoreV1().Secrets(s.Namespace).List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%v", cleanKey(key, keyPrefix)),
})
if err != nil {
logrus.Error(err)
return false
}
var found bool
for _, i := range secrets.Items {
if i.ObjectMeta.Name == cleanKey(key) {
if i.ObjectMeta.Name == cleanKey(key, keyPrefix) {
found = true
break
}
@ -84,7 +102,7 @@ func (s *SecretStorage) Exists(key string) bool {
func (s *SecretStorage) Store(key string, value []byte) error {
se := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: cleanKey(key),
Name: cleanKey(key, keyPrefix),
Labels: matchLabels,
},
Data: map[string][]byte{
@ -93,10 +111,12 @@ func (s *SecretStorage) Store(key string, value []byte) error {
}
var err error
if s.Exists(cleanKey(key)) {
_, err = s.KubeClient.CoreV1().Secrets(s.Namespace).Update(&se)
if s.Exists(key) {
s.logger.Debug("creating secret", zap.String("name", key))
_, err = s.KubeClient.CoreV1().Secrets(s.Namespace).Update(context.TODO(), &se, metav1.UpdateOptions{})
} else {
_, err = s.KubeClient.CoreV1().Secrets(s.Namespace).Create(&se)
s.logger.Debug("updating secret", zap.String("name", key))
_, err = s.KubeClient.CoreV1().Secrets(s.Namespace).Create(context.TODO(), &se, metav1.CreateOptions{})
}
if err != nil {
@ -108,21 +128,23 @@ func (s *SecretStorage) Store(key string, value []byte) error {
// Load retrieves the value at the given key.
func (s *SecretStorage) Load(key string) ([]byte, error) {
secret, err := s.KubeClient.CoreV1().Secrets(s.Namespace).Get(cleanKey(key), metav1.GetOptions{})
secret, err := s.KubeClient.CoreV1().Secrets(s.Namespace).Get(context.TODO(), cleanKey(key, keyPrefix), metav1.GetOptions{})
if err != nil {
return nil, err
}
s.logger.Debug("loading secret", zap.String("name", key))
return secret.Data["value"], nil
}
// Delete deletes the value at the given key.
func (s *SecretStorage) Delete(key string) error {
err := s.KubeClient.CoreV1().Secrets(s.Namespace).Delete(cleanKey(key), &metav1.DeleteOptions{})
err := s.KubeClient.CoreV1().Secrets(s.Namespace).Delete(context.TODO(), cleanKey(key, keyPrefix), metav1.DeleteOptions{})
if err != nil {
return err
}
s.logger.Debug("deleting secret", zap.String("name", key))
return nil
}
@ -130,7 +152,10 @@ func (s *SecretStorage) Delete(key string) error {
func (s *SecretStorage) List(prefix string, recursive bool) ([]string, error) {
var keys []string
secrets, err := s.KubeClient.CoreV1().Secrets(s.Namespace).List(metav1.ListOptions{LabelSelector: labelSelector})
s.logger.Debug("listing secrets", zap.String("name", prefix))
secrets, err := s.KubeClient.CoreV1().Secrets(s.Namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(matchLabels).String(),
})
if err != nil {
return keys, err
}
@ -138,8 +163,8 @@ func (s *SecretStorage) List(prefix string, recursive bool) ([]string, error) {
// TODO :- do we need to handle the recursive flag?
for _, secret := range secrets.Items {
key := secret.ObjectMeta.Name
if strings.HasPrefix(key, cleanKey(prefix)) {
keys = append(keys, key)
if strings.HasPrefix(key, cleanKey(prefix, keyPrefix)) {
keys = append(keys, strings.TrimPrefix(key, keyPrefix))
}
}
@ -148,11 +173,13 @@ func (s *SecretStorage) List(prefix string, recursive bool) ([]string, error) {
// Stat returns information about key.
func (s *SecretStorage) Stat(key string) (certmagic.KeyInfo, error) {
secret, err := s.KubeClient.CoreV1().Secrets(s.Namespace).Get(cleanKey(key), metav1.GetOptions{})
secret, err := s.KubeClient.CoreV1().Secrets(s.Namespace).Get(context.TODO(), cleanKey(key, keyPrefix), metav1.GetOptions{})
if err != nil {
return certmagic.KeyInfo{}, err
}
s.logger.Debug("stats secret", zap.String("name", key))
return certmagic.KeyInfo{
Key: key,
Modified: secret.GetCreationTimestamp().UTC(),
@ -161,12 +188,93 @@ func (s *SecretStorage) Stat(key string) (certmagic.KeyInfo, error) {
}, nil
}
func (s *SecretStorage) Lock(key string) error {
// TODO: implement
return nil
func (s *SecretStorage) Lock(ctx context.Context, key string) error {
for {
_, err := s.tryAcquireOrRenew(ctx, cleanKey(key, leasePrefix), false)
if err == nil {
go s.keepLockUpdated(ctx, cleanKey(key, leasePrefix))
return nil
}
select {
case <-time.After(leasePollInterval):
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *SecretStorage) keepLockUpdated(ctx context.Context, key string) {
for {
time.Sleep(leaseRenewInterval)
done, err := s.tryAcquireOrRenew(ctx, key, true)
if err != nil {
return
}
if done {
return
}
}
}
func (s *SecretStorage) tryAcquireOrRenew(ctx context.Context, key string, shouldExist bool) (bool, error) {
now := metav1.Now()
lock := resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: key,
Namespace: s.Namespace,
},
Client: s.KubeClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: s.LeaseId,
},
}
ler := resourcelock.LeaderElectionRecord{
HolderIdentity: lock.Identity(),
LeaseDurationSeconds: 5,
AcquireTime: now,
RenewTime: now,
}
currLer, _, err := lock.Get(ctx)
// 1. obtain or create the ElectionRecord
if err != nil {
if !errors.IsNotFound(err) {
return true, err
}
if shouldExist {
return true, nil // Lock has been released
}
if err = lock.Create(ctx, ler); err != nil {
return true, err
}
return false, nil
}
// 2. Record obtained, check the Identity & Time
if currLer.HolderIdentity != "" &&
currLer.RenewTime.Add(leaseDuration).After(now.Time) &&
currLer.HolderIdentity != lock.Identity() {
return true, fmt.Errorf("lock is held by %v and has not yet expired", currLer.HolderIdentity)
}
// 3. We're going to try to update the existing one
if currLer.HolderIdentity == lock.Identity() {
ler.AcquireTime = currLer.AcquireTime
ler.LeaderTransitions = currLer.LeaderTransitions
} else {
ler.LeaderTransitions = currLer.LeaderTransitions + 1
}
if err = lock.Update(ctx, ler); err != nil {
return true, fmt.Errorf("failed to update lock: %v", err)
}
return false, nil
}
func (s *SecretStorage) Unlock(key string) error {
// TODO: implement
return nil
err := s.KubeClient.CoordinationV1().Leases(s.Namespace).Delete(context.TODO(), cleanKey(key, leasePrefix), metav1.DeleteOptions{})
return err
}