Skip to content

Fix #1366 unpredictable global artifact behavior #1461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions workflow/common/ancestry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package common

import (
"time"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

type Context interface {
// GetTaskNode returns the node status of a task.
GetTaskNode(taskName string) *wfv1.NodeStatus
}

// GetTaskAncestry returns a list of taskNames which are ancestors of this task.
// The list is ordered by the tasks finished time.
func GetTaskAncestry(ctx Context, taskName string, tasks []wfv1.DAGTask) []string {
taskByName := make(map[string]wfv1.DAGTask)
for _, task := range tasks {
taskByName[task.Name] = task
}

visited := make(map[string]time.Time)
var getAncestry func(s string)
getAncestry = func(currTask string) {
task := taskByName[currTask]
for _, depTask := range task.Dependencies {
getAncestry(depTask)
}
if currTask != taskName {
if _, ok := visited[currTask]; !ok {
visited[currTask] = getTimeFinished(ctx, currTask)
}
}
}
getAncestry(taskName)

ancestry := make([]string, len(visited))
for newTask, newFinishedAt := range visited {
insertTask(visited, ancestry, newTask, newFinishedAt)
}

return ancestry
}

// getTimeFinished returns the finishedAt time of the corresponding node.
// If the finished time is not set, the started time is returned.
// If ctx is not defined the current time is returned to ensure consistent order in the validation step.
func getTimeFinished(ctx Context, taskName string) time.Time {
if ctx != nil {
node := ctx.GetTaskNode(taskName)
if !node.FinishedAt.IsZero() {
return node.FinishedAt.Time
}
return node.StartedAt.Time
} else {
return time.Now()
}
}

// insertTask inserts the newTaskName at the right position ordered by time into the ancestry list.
func insertTask(visited map[string]time.Time, ancestry []string, newTaskName string, finishedAt time.Time) {
for i, taskName := range ancestry {
if taskName == "" {
ancestry[i] = newTaskName
return
}

if finishedAt.Before(visited[taskName]) {
// insert at position i and shift others
copy(ancestry[i+1:], ancestry[i:])
ancestry[i] = newTaskName
return
}
}
}
154 changes: 154 additions & 0 deletions workflow/common/ancestry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package common

import (
"reflect"
"testing"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

type testContext struct {
status map[string]*wfv1.NodeStatus
}

func (c *testContext) GetTaskNode(taskName string) *wfv1.NodeStatus {
return c.status[taskName]
}

func TestGetTaskAncestryForValidation(t *testing.T) {
type args struct {
ctx Context
taskName string
tasks []wfv1.DAGTask
}

testTasks := []wfv1.DAGTask{
{
Name: "task1",
Dependencies: make([]string, 0),
},
{
Name: "task2",
Dependencies: []string{"task1"},
},
{
Name: "task3",
Dependencies: []string{"task1"},
},
{
Name: "task4",
Dependencies: []string{"task2", "task3"},
},
}

tests := []struct {
name string
args args
want []string
}{
{
name: "one task",
args: args{
ctx: nil,
taskName: "task2",
tasks: testTasks,
},
want: []string{"task1"},
},
{
name: "multiple tasks",
args: args{
ctx: nil,
taskName: "task4",
tasks: testTasks,
},
want: []string{"task1", "task2", "task3"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetTaskAncestry(tt.args.ctx, tt.args.taskName, tt.args.tasks); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetTaskAncestry() = %v, want %v", got, tt.want)
}
})
}
}

func TestGetTaskAncestryForGlobalArtifacts(t *testing.T) {
type args struct {
ctx Context
taskName string
tasks []wfv1.DAGTask
}

testTasks := []wfv1.DAGTask{
{
Name: "task1",
Dependencies: make([]string, 0),
},
{
Name: "task2",
Dependencies: []string{"task1"},
},
{
Name: "task3",
Dependencies: []string{"task1"},
},
{
Name: "task4",
Dependencies: []string{"task2", "task3"},
},
}

ctx := &testContext{
status: map[string]*wfv1.NodeStatus{
"task1": {
FinishedAt: v1.Time{time.Now().Add(1 * time.Minute)},
},
"task2": {
FinishedAt: v1.Time{time.Now().Add(3 * time.Minute)},
},
"task3": {
FinishedAt: v1.Time{time.Now().Add(2 * time.Minute)},
},
"task4": {
FinishedAt: v1.Time{time.Now().Add(4 * time.Minute)},
},
},
}

tests := []struct {
name string
args args
want []string
}{
{
name: "one task",
args: args{
ctx: ctx,
taskName: "task2",
tasks: testTasks,
},
want: []string{"task1"},
},
{
name: "multiple tasks",
args: args{
ctx: ctx,
taskName: "task4",
tasks: testTasks,
},
want: []string{"task1", "task3", "task2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetTaskAncestry(tt.args.ctx, tt.args.taskName, tt.args.tasks); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetTaskAncestry() = %v, want %v", got, tt.want)
}
})
}
}
27 changes: 0 additions & 27 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,33 +447,6 @@ func IsPodTemplate(tmpl *wfv1.Template) bool {
return false
}

// GetTaskAncestry returns a list of taskNames which are ancestors of this task
func GetTaskAncestry(taskName string, tasks []wfv1.DAGTask) []string {
taskByName := make(map[string]wfv1.DAGTask)
for _, task := range tasks {
taskByName[task.Name] = task
}

visited := make(map[string]bool)
var getAncestry func(s string)
getAncestry = func(currTask string) {
task := taskByName[currTask]
for _, depTask := range task.Dependencies {
getAncestry(depTask)
}
if currTask != taskName {
visited[currTask] = true
}
}

getAncestry(taskName)
ancestry := make([]string, 0)
for ancestor := range visited {
ancestry = append(ancestry, ancestor)
}
return ancestry
}

var yamlSeparator = regexp.MustCompile(`\n---`)

// SplitYAMLFile is a helper to split a body into multiple workflow objects
Expand Down
28 changes: 14 additions & 14 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package controller
import (
"encoding/json"
"fmt"
"strings"

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/valyala/fasttemplate"
"strings"
)

// dagContext holds context information about this context's DAG
Expand Down Expand Up @@ -52,7 +51,8 @@ func (d *dagContext) taskNodeID(taskName string) string {
return d.wf.NodeID(nodeName)
}

func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus {
// GetTaskNode returns the node status of a task.
func (d *dagContext) GetTaskNode(taskName string) *wfv1.NodeStatus {
nodeID := d.taskNodeID(taskName)
node, ok := d.wf.Status.Nodes[nodeID]
if !ok {
Expand Down Expand Up @@ -95,7 +95,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
// If all the nodes have finished, we should mark the failed node to finish overall workflow
// So we should check all the targetTasks have finished
for _, tmpDepName := range targetTasks {
tmpDepNode := d.getTaskNode(tmpDepName)
tmpDepNode := d.GetTaskNode(tmpDepName)
if tmpDepNode == nil {
tmpOverAllFinished = false
break
Expand All @@ -117,7 +117,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
}
// There are no currently running tasks. Now check if our dependencies were met
for _, depName := range targetTasks {
depNode := d.getTaskNode(depName)
depNode := d.GetTaskNode(depName)
if depNode == nil {
return wfv1.NodeRunning
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmpl *wfv1.Template, boun
scope: make(map[string]interface{}),
}
for _, task := range tmpl.DAG.Tasks {
taskNode := dagCtx.getTaskNode(task.Name)
taskNode := dagCtx.GetTaskNode(task.Name)
if taskNode == nil {
// Can happen when dag.target was specified
continue
Expand All @@ -234,7 +234,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmpl *wfv1.Template, boun
node = woc.getNodeByName(nodeName)
outbound := make([]string, 0)
for _, depName := range targetTasks {
depNode := dagCtx.getTaskNode(depName)
depNode := dagCtx.GetTaskNode(depName)
if depNode == nil {
woc.log.Println(depName)
}
Expand All @@ -255,7 +255,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
}
dagCtx.visited[taskName] = true

node := dagCtx.getTaskNode(taskName)
node := dagCtx.GetTaskNode(taskName)
if node != nil && node.Completed() {
return
}
Expand All @@ -265,7 +265,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
dependenciesSuccessful := true
nodeName := dagCtx.taskNodeName(taskName)
for _, depName := range task.Dependencies {
depNode := dagCtx.getTaskNode(depName)
depNode := dagCtx.GetTaskNode(depName)
if depNode != nil {
if depNode.Completed() {
if !depNode.Successful() && !dagCtx.getTask(depName).ContinuesOn(depNode.Phase) {
Expand Down Expand Up @@ -310,7 +310,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
} else {
// Otherwise, add all outbound nodes of our dependencies as parents to this node
for _, depName := range task.Dependencies {
depNode := dagCtx.getTaskNode(depName)
depNode := dagCtx.GetTaskNode(depName)
outboundNodeIDs := woc.getOutboundNodes(depNode.ID)
woc.log.Infof("DAG outbound nodes of %s are %s", depNode, outboundNodeIDs)
for _, outNodeID := range outboundNodeIDs {
Expand Down Expand Up @@ -348,7 +348,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
}

for _, t := range expandedTasks {
node = dagCtx.getTaskNode(t.Name)
node = dagCtx.GetTaskNode(t.Name)
taskNodeName := dagCtx.taskNodeName(t.Name)
if node == nil {
woc.log.Infof("All of node %s dependencies %s completed", taskNodeName, task.Dependencies)
Expand All @@ -375,7 +375,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
groupPhase := wfv1.NodeSucceeded
for _, t := range expandedTasks {
// Add the child relationship from our dependency's outbound nodes to this node.
node := dagCtx.getTaskNode(t.Name)
node := dagCtx.GetTaskNode(t.Name)
if node == nil || !node.Completed() {
return
}
Expand All @@ -397,9 +397,9 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
}
woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, &scope)

ancestors := common.GetTaskAncestry(task.Name, dagCtx.tasks)
ancestors := common.GetTaskAncestry(dagCtx, task.Name, dagCtx.tasks)
for _, ancestor := range ancestors {
ancestorNode := dagCtx.getTaskNode(ancestor)
ancestorNode := dagCtx.GetTaskNode(ancestor)
prefix := fmt.Sprintf("tasks.%s", ancestor)
if ancestorNode.Type == wfv1.NodeTypeTaskGroup {
var ancestorNodes []wfv1.NodeStatus
Expand Down
2 changes: 1 addition & 1 deletion workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1
for k, v := range scope {
taskScope[k] = v
}
ancestry := common.GetTaskAncestry(task.Name, tmpl.DAG.Tasks)
ancestry := common.GetTaskAncestry(nil, task.Name, tmpl.DAG.Tasks)
for _, ancestor := range ancestry {
ancestorTask := nameToTask[ancestor]
ancestorPrefix := fmt.Sprintf("tasks.%s", ancestor)
Expand Down