Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ func makeScheduled(tr v1.TaskRun) *v1.TaskRun {
return newTr
}

func makePipelineRunScheduled(pr v1.PipelineRun) *v1.PipelineRun {
newPr := newPipelineRun(pr)
newPr.Status = v1.PipelineRunStatus{ /* explicitly empty */ }
return newPr
}

func makeStarted(tr v1.TaskRun) *v1.TaskRun {
newTr := newTaskRun(tr)
newTr.Status.Conditions[0].Status = corev1.ConditionUnknown
Expand Down Expand Up @@ -812,6 +818,102 @@ var taskCancelledMatrix = PipelineRunState{{
},
}}

var noneStartedChildPipelineRunState = PipelineRunState{{
PipelineTask: &pts[21],
ChildPipelineRunNames: []string{"pipelinerun-mytask22"},
ChildPipelineRuns: nil,
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[21].PipelineSpec,
},
}, {
PipelineTask: &pts[22],
ChildPipelineRunNames: []string{"pipelinerun-mytask23"},
ChildPipelineRuns: nil,
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[22].PipelineSpec,
},
}}

var oneChildPipelineRunStartedState = PipelineRunState{{
PipelineTask: &pts[21],
ChildPipelineRunNames: []string{"pipelinerun-mytask22"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunStarted(prs[0])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[21].PipelineSpec,
},
}, {
PipelineTask: &pts[22],
ChildPipelineRunNames: []string{"pipelinerun-mytask23"},
ChildPipelineRuns: nil,
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[22].PipelineSpec,
},
}}

var oneChildPipelineRunFinishedState = PipelineRunState{{
PipelineTask: &pts[21],
ChildPipelineRunNames: []string{"pipelinerun-mytask22"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunSucceeded(prs[0])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[21].PipelineSpec,
},
}, {
PipelineTask: &pts[22],
ChildPipelineRunNames: []string{"pipelinerun-mytask23"},
ChildPipelineRuns: nil,
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[22].PipelineSpec,
},
}}

var oneChildPipelineRunFailedState = PipelineRunState{{
PipelineTask: &pts[21],
ChildPipelineRunNames: []string{"pipelinerun-mytask22"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunFailed(prs[0])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[21].PipelineSpec,
},
}, {
PipelineTask: &pts[22],
ChildPipelineRunNames: []string{"pipelinerun-mytask23"},
ChildPipelineRuns: nil,
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[22].PipelineSpec,
},
}}

var allChildPipelineRunsFinishedState = PipelineRunState{{
PipelineTask: &pts[21],
ChildPipelineRunNames: []string{"pipelinerun-mytask22"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunSucceeded(prs[0])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[21].PipelineSpec,
},
}, {
PipelineTask: &pts[22],
ChildPipelineRunNames: []string{"pipelinerun-mytask23"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunSucceeded(prs[1])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[22].PipelineSpec,
},
}}

var finalChildPipelineRunsScheduledState = PipelineRunState{{
PipelineTask: &pts[21],
ChildPipelineRunNames: []string{"pipelinerun-mytask22"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunSucceeded(prs[0])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[21].PipelineSpec,
},
}, {
PipelineTask: &pts[22],
ChildPipelineRunNames: []string{"pipelinerun-mytask23"},
ChildPipelineRuns: []*v1.PipelineRun{makePipelineRunScheduled(prs[1])},
ResolvedPipeline: ResolvedPipeline{
PipelineSpec: pts[22].PipelineSpec,
},
}}

func dagFromState(state PipelineRunState) (*dag.Graph, error) {
pts := []v1.PipelineTask{}
for _, rpt := range state {
Expand Down
72 changes: 57 additions & 15 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineTask {
return m
}

// IsBeforeFirstTaskRun returns true if the PipelineRun has not yet started its first TaskRun
// IsBeforeFirstTaskRun returns true if the PipelineRun has not yet started its first child PipelineRun/TaskRun/CustomRun
func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
for _, t := range state {
if len(t.CustomRuns) > 0 || len(t.TaskRuns) > 0 {
if len(t.ChildPipelineRuns) > 0 || len(t.CustomRuns) > 0 || len(t.TaskRuns) > 0 {
return false
}
}
Expand All @@ -144,6 +144,12 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
func (state PipelineRunState) AdjustStartTime(unadjustedStartTime *metav1.Time) *metav1.Time {
adjustedStartTime := unadjustedStartTime
for _, rpt := range state {
for _, childPipelineRun := range rpt.ChildPipelineRuns {
if childPipelineRun.CreationTimestamp.Time.Before(adjustedStartTime.Time) {
adjustedStartTime = &childPipelineRun.CreationTimestamp
}
}

for _, customRun := range rpt.CustomRuns {
creationTime := customRun.GetObjectMeta().GetCreationTimestamp()
if creationTime.Time.Before(adjustedStartTime.Time) {
Expand All @@ -166,6 +172,9 @@ func (state PipelineRunState) AdjustStartTime(unadjustedStartTime *metav1.Time)
func (state PipelineRunState) GetTaskRunsResults() map[string][]v1.TaskRunResult {
results := make(map[string][]v1.TaskRunResult)
for _, rpt := range state {
if rpt.IsChildPipeline() {
continue
}
if rpt.IsCustomTask() {
continue
}
Expand All @@ -189,6 +198,9 @@ func (state PipelineRunState) GetTaskRunsResults() map[string][]v1.TaskRunResult
func (state PipelineRunState) GetTaskRunsArtifacts() map[string]*v1.Artifacts {
results := make(map[string]*v1.Artifacts)
for _, rpt := range state {
if rpt.IsChildPipeline() {
continue
}
if rpt.IsCustomTask() {
continue
}
Expand Down Expand Up @@ -248,7 +260,7 @@ func (state PipelineRunState) GetRunsResults() map[string][]v1beta1.CustomRunRes
}

// GetChildReferences returns a slice of references, including version, kind, name, and pipeline task name, for all
// TaskRuns and Runs in the state.
// child (PinP) PipelineRuns, TaskRuns and Runs in the state.
func (facts *PipelineRunFacts) GetChildReferences() []v1.ChildStatusReference {
var childRefs []v1.ChildStatusReference

Expand All @@ -262,6 +274,12 @@ func (facts *PipelineRunFacts) GetChildReferences() []v1.ChildStatusReference {
}

switch {
case len(rpt.ChildPipelineRuns) != 0:
for _, childPipelineRun := range rpt.ChildPipelineRuns {
if childPipelineRun != nil {
childRefs = append(childRefs, rpt.getChildRefForChildPipelineRun(childPipelineRun))
}
}
case len(rpt.TaskRuns) != 0:
for _, taskRun := range rpt.TaskRuns {
if taskRun != nil {
Expand All @@ -277,8 +295,16 @@ func (facts *PipelineRunFacts) GetChildReferences() []v1.ChildStatusReference {
return childRefs
}

func (t *ResolvedPipelineTask) getDisplayName(customRun *v1beta1.CustomRun, taskRun *v1.TaskRun, c v1.ChildStatusReference) v1.ChildStatusReference {
func (t *ResolvedPipelineTask) getDisplayName(pipelineRun *v1.PipelineRun, customRun *v1beta1.CustomRun, taskRun *v1.TaskRun, c v1.ChildStatusReference) v1.ChildStatusReference {
replacements := make(map[string]string)
if pipelineRun != nil {
for _, p := range pipelineRun.Spec.Params {
if p.Value.Type == v1.ParamTypeString {
replacements[fmt.Sprintf("%s.%s", v1.ParamsPrefix, p.Name)] = p.Value.StringVal
}
}
}

if taskRun != nil {
for _, p := range taskRun.Spec.Params {
if p.Value.Type == v1.ParamTypeString {
Expand Down Expand Up @@ -323,6 +349,19 @@ func (t *ResolvedPipelineTask) getDisplayName(customRun *v1beta1.CustomRun, task
return c
}

func (t *ResolvedPipelineTask) getChildRefForChildPipelineRun(pipelineRun *v1.PipelineRun) v1.ChildStatusReference {
c := v1.ChildStatusReference{
TypeMeta: runtime.TypeMeta{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: pipeline.PipelineRunControllerName,
},
Name: pipelineRun.Name,
PipelineTaskName: t.PipelineTask.Name,
WhenExpressions: t.PipelineTask.When,
}
return t.getDisplayName(pipelineRun, nil, nil, c)
}

func (t *ResolvedPipelineTask) getChildRefForRun(customRun *v1beta1.CustomRun) v1.ChildStatusReference {
c := v1.ChildStatusReference{
TypeMeta: runtime.TypeMeta{
Expand All @@ -333,7 +372,7 @@ func (t *ResolvedPipelineTask) getChildRefForRun(customRun *v1beta1.CustomRun) v
PipelineTaskName: t.PipelineTask.Name,
WhenExpressions: t.PipelineTask.When,
}
return t.getDisplayName(customRun, nil, c)
return t.getDisplayName(nil, customRun, nil, c)
}

func (t *ResolvedPipelineTask) getChildRefForTaskRun(taskRun *v1.TaskRun) v1.ChildStatusReference {
Expand All @@ -346,17 +385,17 @@ func (t *ResolvedPipelineTask) getChildRefForTaskRun(taskRun *v1.TaskRun) v1.Chi
PipelineTaskName: t.PipelineTask.Name,
WhenExpressions: t.PipelineTask.When,
}
return t.getDisplayName(nil, taskRun, c)
return t.getDisplayName(nil, nil, taskRun, c)
}

// getNextTasks returns a list of tasks which should be executed next i.e.
// getNextTasks returns a list of pipeline tasks which should be executed next i.e.
// a list of tasks from candidateTasks which aren't yet indicated in state to be running and
// a list of cancelled/failed tasks from candidateTasks which haven't exhausted their retries
func (state PipelineRunState) getNextTasks(candidateTasks sets.String) []*ResolvedPipelineTask {
tasks := []*ResolvedPipelineTask{}
for _, t := range state {
if _, ok := candidateTasks[t.PipelineTask.Name]; ok {
if len(t.TaskRuns) == 0 && len(t.CustomRuns) == 0 {
if len(t.TaskRuns) == 0 && len(t.CustomRuns) == 0 && len(t.ChildPipelineRuns) == 0 {
tasks = append(tasks, t)
}
}
Expand Down Expand Up @@ -484,7 +523,7 @@ func (facts *PipelineRunFacts) IsFinalTaskStarted() bool {
}

// GetPipelineConditionStatus will return the Condition that the PipelineRun prName should be
// updated with, based on the status of the TaskRuns in state.
// updated with, based on the status of the child (PinP) PipelineRuns/TaskRuns/CustomRuns in state.
func (facts *PipelineRunFacts) GetPipelineConditionStatus(ctx context.Context, pr *v1.PipelineRun, logger *zap.SugaredLogger, c clock.PassiveClock) *apis.Condition {
// We have 4 different states here:
// 1. Timed out -> Failed
Expand Down Expand Up @@ -560,7 +599,7 @@ func (facts *PipelineRunFacts) GetPipelineConditionStatus(ctx context.Context, p
reason = v1.PipelineRunReasonCancelled.String()
status = corev1.ConditionFalse
}
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
logger.Infof("All child (PinP) PipelineRuns/TaskRuns/CustomRuns have finished for PipelineRun %s so it has finished", pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: status,
Expand Down Expand Up @@ -622,8 +661,9 @@ func (facts *PipelineRunFacts) GetSkippedTasks() []v1.SkippedTask {
return skipped
}

// GetPipelineTaskStatus returns the status of a PipelineTask depending on its taskRun
// the checks are implemented such that the finally tasks are requesting status of the dag tasks
// GetPipelineTaskStatus returns the status of a PipelineTask depending on its child (PinP)
// PipelineRun/TaskRun/CustomRun. The checks are implemented such that the finally tasks
// are requesting status of the dag tasks.
func (facts *PipelineRunFacts) GetPipelineTaskStatus() map[string]string {
// construct a map of tasks.<pipelineTask>.status and its state
tStatus := make(map[string]string)
Expand All @@ -649,13 +689,15 @@ func (facts *PipelineRunFacts) GetPipelineTaskStatus() map[string]string {
// initialize aggregate status of all dag tasks to None
aggregateStatus := PipelineTaskStateNone
if facts.checkDAGTasksDone() {
// all dag tasks are done, change the aggregate status to succeeded
// all dag pipeline tasks are done, change the aggregate status to succeeded
// will reset it to failed/skipped if needed
aggregateStatus = v1.PipelineRunReasonSuccessful.String()
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
// if any of the dag task failed, change the aggregate status to failed and return
if !t.IsCustomTask() && t.haveAnyTaskRunsFailed() || t.IsCustomTask() && t.haveAnyCustomRunsFailed() {
// if any of the dag pipeline tasks failed, change the aggregate status to failed and return
if !t.IsCustomTask() && t.haveAnyTaskRunsFailed() ||
t.IsCustomTask() && t.haveAnyCustomRunsFailed() ||
t.IsChildPipeline() && t.haveAnyChildPipelineRunsFailed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not binary anymore, I'm not sure that !t.IsCustomTask() is the best check anymore.
Should we use something like:

				if t.IsTask() && t.haveAnyTaskRunsFailed() ||
					t.IsCustomTask() && t.haveAnyCustomRunsFailed() ||
					t.IsChildPipeline() && t.haveAnyChildPipelineRunsFailed() {

I guess IsTask may not exist...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's painfull to look at it 🤕 🤣 . This is a point where that polymorphism refactoring would do its magic 😸.

I will add a IsTask a method and update it in a few places. That would be cleaner anyway.

aggregateStatus = v1.PipelineRunReasonFailed.String()
break
}
Expand Down
Loading
Loading