Skip to content

Commit b9b87b7

Browse files
authored
Centralized Longterm workflow persistence storage (#1344)
* Centralized Longterm workflow persistence storage implementaion
1 parent cb09609 commit b9b87b7

File tree

14 files changed

+973
-139
lines changed

14 files changed

+973
-139
lines changed

Gopkg.lock

Lines changed: 75 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,8 @@ required = [
6262
[[constraint]]
6363
name = "gopkg.in/jcmturner/gokrb5.v5"
6464
version = "5.3.0"
65+
66+
[[constraint]]
67+
name = "upper.io/db.v3"
68+
version ="3.5.7"
69+

util/util.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
package util
22

3-
import "io/ioutil"
3+
import (
4+
"io/ioutil"
5+
6+
log "github.com/sirupsen/logrus"
7+
apiv1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/util/wait"
10+
"k8s.io/client-go/kubernetes"
11+
12+
"github.com/argoproj/argo/errors"
13+
"github.com/argoproj/argo/util/retry"
14+
)
415

516
type Closer interface {
617
Close() error
@@ -12,6 +23,33 @@ func Close(c Closer) {
1223
_ = c.Close()
1324
}
1425

26+
// GetSecrets retrieves a secret value and memoizes the result
27+
func GetSecrets(clientSet kubernetes.Interface, namespace, name, key string) ([]byte, error) {
28+
29+
secretsIf := clientSet.CoreV1().Secrets(namespace)
30+
var secret *apiv1.Secret
31+
var err error
32+
_ = wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
33+
secret, err = secretsIf.Get(name, metav1.GetOptions{})
34+
if err != nil {
35+
log.Warnf("Failed to get secret '%s': %v", name, err)
36+
if !retry.IsRetryableKubeAPIError(err) {
37+
return false, err
38+
}
39+
return false, nil
40+
}
41+
return true, nil
42+
})
43+
if err != nil {
44+
return []byte{}, errors.InternalWrapError(err)
45+
}
46+
val, ok := secret.Data[key]
47+
if !ok {
48+
return []byte{}, errors.Errorf(errors.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key)
49+
}
50+
return val, nil
51+
}
52+
1553
// Write the Terminate message in pod spec
1654
func WriteTeriminateMessage(message string) {
1755
err := ioutil.WriteFile("/dev/termination-log", []byte(message), 0644)
Lines changed: 34 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,9 @@
1-
package controller
1+
package config
22

33
import (
4-
"context"
5-
"fmt"
6-
7-
apiv1 "k8s.io/api/core/v1"
8-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9-
"k8s.io/apimachinery/pkg/fields"
10-
"k8s.io/apimachinery/pkg/runtime"
11-
"k8s.io/apimachinery/pkg/watch"
12-
"k8s.io/client-go/tools/cache"
13-
14-
"github.com/argoproj/argo/errors"
154
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
16-
"github.com/argoproj/argo/workflow/common"
175
"github.com/argoproj/argo/workflow/metrics"
18-
"github.com/ghodss/yaml"
19-
log "github.com/sirupsen/logrus"
6+
apiv1 "k8s.io/api/core/v1"
207
)
218

229
// WorkflowControllerConfig contain the configuration settings for the workflow controller
@@ -70,6 +57,9 @@ type WorkflowControllerConfig struct {
7057
// Parallelism limits the max total parallel workflows that can execute at the same time
7158
Parallelism int `json:"parallelism,omitempty"`
7259

60+
// Persistence contains the workflow persistence DB configuration
61+
Persistence *PersistConfig `json:"persistence,omitempty"`
62+
7363
// Config customized Docker Sock path
7464
DockerSockPath string `json:"dockerSockPath,omitempty"`
7565
}
@@ -101,6 +91,35 @@ type ArtifactRepository struct {
10191
HDFS *HDFSArtifactRepository `json:"hdfs,omitempty"`
10292
}
10393

94+
type PersistConfig struct {
95+
NodeStatusOffload bool `json:"nodeStatusOffLoad"`
96+
ConnectionPool *ConnectionPool `json:"connectionPool"`
97+
PostgreSQL *PostgreSQLConfig `json:"postgresql,omitempty"`
98+
MySQL *MySQLConfig `json:"mysql,omitempty"`
99+
}
100+
type ConnectionPool struct {
101+
MaxIdleConns int `json:"maxIdleConns"`
102+
MaxOpenConns int `json:"maxOpenConns"`
103+
}
104+
type PostgreSQLConfig struct {
105+
Host string `json:"host"`
106+
Port string `json:"port"`
107+
Database string `json:"database"`
108+
TableName string `json:"tableName"`
109+
UsernameSecret apiv1.SecretKeySelector `json:"userNameSecret"`
110+
PasswordSecret apiv1.SecretKeySelector `json:"passwordSecret"`
111+
}
112+
113+
type MySQLConfig struct {
114+
Host string `json:"host"`
115+
Port string `json:"port"`
116+
Database string `json:"database"`
117+
TableName string `json:"tableName"`
118+
Options map[string]string `json:"options"`
119+
UsernameSecret apiv1.SecretKeySelector `json:"userNameSecret"`
120+
PasswordSecret apiv1.SecretKeySelector `json:"passwordSecret"`
121+
}
122+
104123
// S3ArtifactRepository defines the controller configuration for an S3 artifact repository
105124
type S3ArtifactRepository struct {
106125
wfv1.S3Bucket `json:",inline"`
@@ -130,114 +149,3 @@ type HDFSArtifactRepository struct {
130149
// Force copies a file forcibly even if it exists (default: false)
131150
Force bool `json:"force,omitempty"`
132151
}
133-
134-
// ResyncConfig reloads the controller config from the configmap
135-
func (wfc *WorkflowController) ResyncConfig() error {
136-
cmClient := wfc.kubeclientset.CoreV1().ConfigMaps(wfc.namespace)
137-
cm, err := cmClient.Get(wfc.configMap, metav1.GetOptions{})
138-
if err != nil {
139-
return errors.InternalWrapError(err)
140-
}
141-
return wfc.updateConfig(cm)
142-
}
143-
144-
func (wfc *WorkflowController) updateConfig(cm *apiv1.ConfigMap) error {
145-
configStr, ok := cm.Data[common.WorkflowControllerConfigMapKey]
146-
if !ok {
147-
log.Warnf("ConfigMap '%s' does not have key '%s'", wfc.configMap, common.WorkflowControllerConfigMapKey)
148-
return nil
149-
}
150-
var config WorkflowControllerConfig
151-
err := yaml.Unmarshal([]byte(configStr), &config)
152-
if err != nil {
153-
return errors.InternalWrapError(err)
154-
}
155-
log.Printf("workflow controller configuration from %s:\n%s", wfc.configMap, configStr)
156-
if wfc.cliExecutorImage == "" && config.ExecutorImage == "" {
157-
return errors.Errorf(errors.CodeBadRequest, "ConfigMap '%s' does not have executorImage", wfc.configMap)
158-
}
159-
wfc.Config = config
160-
wfc.throttler.SetParallelism(config.Parallelism)
161-
return nil
162-
}
163-
164-
// executorImage returns the image to use for the workflow executor
165-
func (wfc *WorkflowController) executorImage() string {
166-
if wfc.cliExecutorImage != "" {
167-
return wfc.cliExecutorImage
168-
}
169-
return wfc.Config.ExecutorImage
170-
}
171-
172-
// executorImagePullPolicy returns the imagePullPolicy to use for the workflow executor
173-
func (wfc *WorkflowController) executorImagePullPolicy() apiv1.PullPolicy {
174-
if wfc.cliExecutorImagePullPolicy != "" {
175-
return apiv1.PullPolicy(wfc.cliExecutorImagePullPolicy)
176-
} else if wfc.Config.Executor != nil && wfc.Config.Executor.ImagePullPolicy != "" {
177-
return wfc.Config.Executor.ImagePullPolicy
178-
} else {
179-
return apiv1.PullPolicy(wfc.Config.ExecutorImagePullPolicy)
180-
}
181-
}
182-
183-
func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (cache.Controller, error) {
184-
source := wfc.newControllerConfigMapWatch()
185-
_, controller := cache.NewInformer(
186-
source,
187-
&apiv1.ConfigMap{},
188-
0,
189-
cache.ResourceEventHandlerFuncs{
190-
AddFunc: func(obj interface{}) {
191-
if cm, ok := obj.(*apiv1.ConfigMap); ok {
192-
log.Infof("Detected ConfigMap update. Updating the controller config.")
193-
err := wfc.updateConfig(cm)
194-
if err != nil {
195-
log.Errorf("Update of config failed due to: %v", err)
196-
}
197-
}
198-
},
199-
UpdateFunc: func(old, new interface{}) {
200-
oldCM := old.(*apiv1.ConfigMap)
201-
newCM := new.(*apiv1.ConfigMap)
202-
if oldCM.ResourceVersion == newCM.ResourceVersion {
203-
return
204-
}
205-
if newCm, ok := new.(*apiv1.ConfigMap); ok {
206-
log.Infof("Detected ConfigMap update. Updating the controller config.")
207-
err := wfc.updateConfig(newCm)
208-
if err != nil {
209-
log.Errorf("Update of config failed due to: %v", err)
210-
}
211-
}
212-
},
213-
})
214-
215-
go controller.Run(ctx.Done())
216-
return controller, nil
217-
}
218-
219-
func (wfc *WorkflowController) newControllerConfigMapWatch() *cache.ListWatch {
220-
c := wfc.kubeclientset.CoreV1().RESTClient()
221-
resource := "configmaps"
222-
name := wfc.configMap
223-
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name))
224-
225-
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
226-
options.FieldSelector = fieldSelector.String()
227-
req := c.Get().
228-
Namespace(wfc.namespace).
229-
Resource(resource).
230-
VersionedParams(&options, metav1.ParameterCodec)
231-
return req.Do().Get()
232-
}
233-
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
234-
options.Watch = true
235-
options.FieldSelector = fieldSelector.String()
236-
req := c.Get().
237-
Namespace(wfc.namespace).
238-
Resource(resource).
239-
VersionedParams(&options, metav1.ParameterCodec)
240-
return req.Watch()
241-
}
242-
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
243-
}

0 commit comments

Comments
 (0)