From 4c92d5d87a9f18685c65797236d6790a107b4f61 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Fri, 20 Oct 2023 16:11:41 -0400 Subject: [PATCH 1/5] Added support to allow for forceful deletion of remaining pods that are not cleared when MCAD deletes the generic items after preemption --- .../workload.codeflare.dev_appwrappers.yaml | 7 ++ ...orkload.codeflare.dev_schedulingspecs.yaml | 7 ++ .../workload.codeflare.dev_appwrappers.yaml | 7 ++ ...orkload.codeflare.dev_schedulingspecs.yaml | 7 ++ pkg/apis/controller/v1beta1/schedulingspec.go | 4 + .../queuejob/queuejob_controller_ex.go | 94 ++++++++++++++++--- 6 files changed, 114 insertions(+), 12 deletions(-) diff --git a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml index f5c640e8d..95be02a96 100644 --- a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml +++ b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml @@ -161,6 +161,13 @@ spec: check should happen, and if requeuing has reached its maximum number of times. properties: + forcefulDeletionAfterSeconds: + default: 0 + description: Enable forceful deletion of generic items and + pods with the AppWrapper label after specified seconds. + This may be necesary to prevent redeployment of generic + items that create pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time diff --git a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml index 6330ddb0f..9d68b1682 100644 --- a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml +++ b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml @@ -58,6 +58,13 @@ spec: time. Values in this field control how often the pod check should happen, and if requeuing has reached its maximum number of times. properties: + forcefulDeletionAfterSeconds: + default: 0 + description: Enable forceful deletion of generic items and pods + with the AppWrapper label after specified seconds. This may + be necesary to prevent redeployment of generic items that create + pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time between diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml index f5c640e8d..95be02a96 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml @@ -161,6 +161,13 @@ spec: check should happen, and if requeuing has reached its maximum number of times. properties: + forcefulDeletionAfterSeconds: + default: 0 + description: Enable forceful deletion of generic items and + pods with the AppWrapper label after specified seconds. + This may be necesary to prevent redeployment of generic + items that create pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml index 6330ddb0f..9d68b1682 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml @@ -58,6 +58,13 @@ spec: time. Values in this field control how often the pod check should happen, and if requeuing has reached its maximum number of times. properties: + forcefulDeletionAfterSeconds: + default: 0 + description: Enable forceful deletion of generic items and pods + with the AppWrapper label after specified seconds. This may + be necesary to prevent redeployment of generic items that create + pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time between diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index 90f5d65fb..a5de822ae 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -72,6 +72,10 @@ type RequeuingTemplate struct { // items are stopped and removed from the cluster (AppWrapper remains deployed). // +kubebuilder:default=0 MaxNumRequeuings int `json:"maxNumRequeuings,omitempty" protobuf:"bytes,6,rep,name=maxNumRequeuings"` + // Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. + // This may be necesary to prevent redeployment of generic items that create pods that were not correctly deleted. + // +kubebuilder:default=0 + ForcefulDeletionAfterSeconds int `json:"forcefulDeletionAfterSeconds,omitempty" protobuf:"bytes,7,rep,name=forcefulDeletionAfterSeconds"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 58038dc78..c71adca2a 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -323,6 +323,18 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat // we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { ctx := context.Background() + // We need to update AW before analyzing it as a candidate for preemption + updateErr := qjm.UpdateQueueJobStatus(inspectAw) + if updateErr != nil { + klog.Warningf("[PreemptQueueJobs] update of pod count to AW %s/%s failed hence skipping preemption", inspectAw.Namespace, inspectAw.Name) + return + } + // Check if AppWrapper is waiting for deletion to be completed + if inspectAw.Status.QueueJobState == arbv1.AppWrapperCondDeleted { + qjm.enqueue(inspectAw) + return + } + // Check if AppWrapper should be preempted aw := qjm.GetQueueJobEligibleForPreemption(inspectAw) if aw != nil { if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { @@ -336,12 +348,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) return } - // we need to update AW before analyzing it as a candidate for preemption - updateErr := qjm.UpdateQueueJobStatus(newjob) - if updateErr != nil { - klog.Warningf("[PreemptQueueJobs] update of pod count to AW %s/%s failed hence skipping preemption", newjob.Namespace, newjob.Name) - return - } newjob.Status.CanRun = false newjob.Status.FilterIgnore = true // update QueueJobState only cleanAppWrapper := false @@ -372,7 +378,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { // cannot use cleanup AW, since it puts AW back in running state qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) generatedCondition = true - } } @@ -415,6 +420,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { updateNewJob = newjob.DeepCopy() generatedCondition = true + } else if newjob.Status.Running == 0 && newjob.Status.Succeeded == 0 && newjob.Status.State == arbv1.AppWrapperStateActive { // If pods failed scheduling generate new preempt condition message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running) @@ -1635,9 +1641,7 @@ func (cc *XController) addQueueJob(obj interface{}) { cc.UpdateQueueJobs(latestAw) klog.V(2).Infof("[Informer-addQJ] requeing AW %s/%s to determine completion status for AW", qj.Namespace, qj.Name) } - } - } }() } @@ -1876,9 +1880,9 @@ func (cc *XController) worker() { // if there are running resources for this job then delete them because the job was put in // pending state... - // If this the first time seeing this AW, no need to delete. + // If this is the first time seeing this AW, no need to delete. stateLen := len(queuejob.Status.State) - if stateLen > 0 { + if stateLen > 0 && queuejob.Status.QueueJobState != arbv1.AppWrapperCondDeleted { klog.V(2).Infof("[worker] Deleting resources for AppWrapper Job '%s/%s' because it was preempted, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) err00 := cc.Cleanup(ctx, queuejob) if err00 != nil { @@ -1886,13 +1890,57 @@ func (cc *XController) worker() { return err00 } klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) + + // Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods + var err error + newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper") + if err != nil { + klog.Errorf("[worker] Failed getting a new AppWrapper: '%s/%s',Status=%+v, err=%+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err) + return err + } + newjob.Status.QueueJobState = arbv1.AppWrapperCondDeleted + cc.addOrUpdateCondition(newjob, arbv1.AppWrapperCondDeleted, v1.ConditionTrue, "AwaitingDeletion", "") + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") // TODO: addOrUpdateCondition is NOT changing the transition time properly so need to do it here + newjob.Status.Conditions[index].LastTransitionMicroTime = metav1.NowMicro() + newjob.Status.Conditions[index].LastUpdateMicroTime = metav1.NowMicro() + newjob.Status.FilterIgnore = true + err = cc.updateStatusInEtcdWithRetry(ctx, newjob, "AwaitingDeletion") + if err != nil { + klog.Errorf("[worker] Error updating status 'Deleted' for AppWrapper: '%s/%s', status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + return err + } + return nil + + } else if queuejob.Status.QueueJobState == arbv1.AppWrapperCondDeleted { + // The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items + // MCAD will force delete any pods that remain in the system + if queuejob.Spec.SchedSpec.Requeuing.ForcefulDeletionAfterSeconds > 0 { + index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") + if index < 0 { + klog.V(4).Infof("WARNING: [worker] Forced deletion condition was not added after 'Cleanup'. Silently ignoring forced cleanup.") + } else { + deletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForcefulDeletionAfterSeconds) * time.Second) + currentTime := time.Now() + + if currentTime.After(deletionTime) { + if err := cc.ForcefulCleanup(ctx, queuejob); err != nil { + klog.V(5).Infof("[worker] Forced deletion of remaining live pods didn't work (Ending %s/%s). Retrying in the next cycle.", queuejob.Namespace, queuejob.Name) + return nil + } + } else { + klog.V(8).Infof("[worker] Waiting for 'forcefulDeletionAfterSeconds' seconds before requeueing job '%s/%s'.", queuejob.Namespace, queuejob.Name) + return nil + } + } + } } + // Preparing job for redispatching queuejob.Status.State = arbv1.AppWrapperStateEnqueued + queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status) index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondQueueing, "AwaitingHeadOfLine") if index < 0 { - queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondQueueing, v1.ConditionTrue, "AwaitingHeadOfLine", "") queuejob.Status.Conditions = append(queuejob.Status.Conditions, cond) } else { @@ -2194,6 +2242,28 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, } // Cleanup function +func (qjm *XController) ForcefulCleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error { + labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", appwrapper.Name) + pods, getPodsErr := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + + if getPodsErr != nil { + klog.V(5).Infof("[ForcefulCleanup] Listing pods with label '%s' was not successful", labelSelector) + return getPodsErr + } + + for _, pod := range pods.Items { + klog.V(3).Infof("[ForcefulCleanup] Forcibly deleting long-terminating pod='%s/%s'", pod.Namespace, pod.Name) + zero := int64(0) + delPodErr := qjm.clients.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) + if delPodErr != nil { + klog.V(3).Infof("[ForcefulCleanup] Couldn't forcibly delete long-terminating pod='%s/%s'", pod.Namespace, pod.Name) + return delPodErr + } + } + + return nil +} + func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error { klog.V(3).Infof("[Cleanup] begin AppWrapper '%s/%s' Version=%s", appwrapper.Namespace, appwrapper.Name, appwrapper.ResourceVersion) var err *multierror.Error From d3a929e138c97222a1e5d9786950ddbf46b7cdef Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Fri, 20 Oct 2023 16:33:12 -0400 Subject: [PATCH 2/5] Only change the QueueJobState when forceful deletion is enabled --- .../queuejob/queuejob_controller_ex.go | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index db739a1c4..a82fe8a6d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1898,26 +1898,27 @@ func (cc *XController) worker() { } klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) - // Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods - var err error - newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper") - if err != nil { - klog.Errorf("[worker] Failed getting a new AppWrapper: '%s/%s',Status=%+v, err=%+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err) - return err - } - newjob.Status.QueueJobState = arbv1.AppWrapperCondDeleted - cc.addOrUpdateCondition(newjob, arbv1.AppWrapperCondDeleted, v1.ConditionTrue, "AwaitingDeletion", "") - index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") // TODO: addOrUpdateCondition is NOT changing the transition time properly so need to do it here - newjob.Status.Conditions[index].LastTransitionMicroTime = metav1.NowMicro() - newjob.Status.Conditions[index].LastUpdateMicroTime = metav1.NowMicro() - newjob.Status.FilterIgnore = true - err = cc.updateStatusInEtcdWithRetry(ctx, newjob, "AwaitingDeletion") - if err != nil { - klog.Errorf("[worker] Error updating status 'Deleted' for AppWrapper: '%s/%s', status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) - return err + if queuejob.Spec.SchedSpec.Requeuing.ForcefulDeletionAfterSeconds > 0 { + // Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods + var err error + newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper") + if err != nil { + klog.Errorf("[worker] Failed getting a new AppWrapper: '%s/%s',Status=%+v, err=%+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err) + return err + } + newjob.Status.QueueJobState = arbv1.AppWrapperCondDeleted + cc.addOrUpdateCondition(newjob, arbv1.AppWrapperCondDeleted, v1.ConditionTrue, "AwaitingDeletion", "") + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") // TODO: addOrUpdateCondition is NOT changing the transition time properly so need to do it here + newjob.Status.Conditions[index].LastTransitionMicroTime = metav1.NowMicro() + newjob.Status.Conditions[index].LastUpdateMicroTime = metav1.NowMicro() + newjob.Status.FilterIgnore = true + err = cc.updateStatusInEtcdWithRetry(ctx, newjob, "AwaitingDeletion") + if err != nil { + klog.Errorf("[worker] Error updating status 'Deleted' for AppWrapper: '%s/%s', status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + return err + } + return nil } - return nil - } else if queuejob.Status.QueueJobState == arbv1.AppWrapperCondDeleted { // The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items // MCAD will force delete any pods that remain in the system From 277e9c065a815003ab6548a61090b8ec8bc4f799 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Fri, 20 Oct 2023 17:59:54 -0400 Subject: [PATCH 3/5] Changed spec from 'ForcefulDeletionAfterSeconds' to 'ForceDeletionTimeInSeconds' --- config/crd/bases/workload.codeflare.dev_appwrappers.yaml | 2 +- .../crd/bases/workload.codeflare.dev_schedulingspecs.yaml | 2 +- .../crds/workload.codeflare.dev_appwrappers.yaml | 2 +- .../crds/workload.codeflare.dev_schedulingspecs.yaml | 2 +- pkg/apis/controller/v1beta1/schedulingspec.go | 2 +- pkg/controller/queuejob/queuejob_controller_ex.go | 8 ++++---- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml index 95be02a96..196d5e497 100644 --- a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml +++ b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml @@ -161,7 +161,7 @@ spec: check should happen, and if requeuing has reached its maximum number of times. properties: - forcefulDeletionAfterSeconds: + forceDeletionTimeInSeconds: default: 0 description: Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. diff --git a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml index 9d68b1682..07940b088 100644 --- a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml +++ b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml @@ -58,7 +58,7 @@ spec: time. Values in this field control how often the pod check should happen, and if requeuing has reached its maximum number of times. properties: - forcefulDeletionAfterSeconds: + forceDeletionTimeInSeconds: default: 0 description: Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. This may diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml index 95be02a96..196d5e497 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml @@ -161,7 +161,7 @@ spec: check should happen, and if requeuing has reached its maximum number of times. properties: - forcefulDeletionAfterSeconds: + forceDeletionTimeInSeconds: default: 0 description: Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml index 9d68b1682..07940b088 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml @@ -58,7 +58,7 @@ spec: time. Values in this field control how often the pod check should happen, and if requeuing has reached its maximum number of times. properties: - forcefulDeletionAfterSeconds: + forceDeletionTimeInSeconds: default: 0 description: Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. This may diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index a5de822ae..f546ce769 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -75,7 +75,7 @@ type RequeuingTemplate struct { // Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. // This may be necesary to prevent redeployment of generic items that create pods that were not correctly deleted. // +kubebuilder:default=0 - ForcefulDeletionAfterSeconds int `json:"forcefulDeletionAfterSeconds,omitempty" protobuf:"bytes,7,rep,name=forcefulDeletionAfterSeconds"` + ForceDeletionTimeInSeconds int `json:"forceDeletionTimeInSeconds,omitempty" protobuf:"bytes,7,rep,name=forceDeletionTimeInSeconds"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index a82fe8a6d..9497dbc39 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1898,7 +1898,7 @@ func (cc *XController) worker() { } klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) - if queuejob.Spec.SchedSpec.Requeuing.ForcefulDeletionAfterSeconds > 0 { + if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 { // Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods var err error newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper") @@ -1922,12 +1922,12 @@ func (cc *XController) worker() { } else if queuejob.Status.QueueJobState == arbv1.AppWrapperCondDeleted { // The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items // MCAD will force delete any pods that remain in the system - if queuejob.Spec.SchedSpec.Requeuing.ForcefulDeletionAfterSeconds > 0 { + if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 { index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") if index < 0 { klog.V(4).Infof("WARNING: [worker] Forced deletion condition was not added after 'Cleanup'. Silently ignoring forced cleanup.") } else { - deletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForcefulDeletionAfterSeconds) * time.Second) + deletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds) * time.Second) currentTime := time.Now() if currentTime.After(deletionTime) { @@ -1936,7 +1936,7 @@ func (cc *XController) worker() { return nil } } else { - klog.V(8).Infof("[worker] Waiting for 'forcefulDeletionAfterSeconds' seconds before requeueing job '%s/%s'.", queuejob.Namespace, queuejob.Name) + klog.V(8).Infof("[worker] Waiting for 'ForceDeletionTimeInSeconds' seconds before requeueing job '%s/%s'.", queuejob.Namespace, queuejob.Name) return nil } } From 347b9c5c711011df0e0876f2a654e8e2a651715d Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Tue, 24 Oct 2023 14:15:26 -0400 Subject: [PATCH 4/5] Update to the CRD to support 'pauseTimeInSeconds' --- config/crd/bases/workload.codeflare.dev_appwrappers.yaml | 6 ++++++ .../crd/bases/workload.codeflare.dev_schedulingspecs.yaml | 6 ++++++ .../crds/workload.codeflare.dev_appwrappers.yaml | 6 ++++++ .../crds/workload.codeflare.dev_schedulingspecs.yaml | 6 ++++++ pkg/apis/controller/v1beta1/schedulingspec.go | 4 ++++ 5 files changed, 28 insertions(+) diff --git a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml index 196d5e497..f1da0572c 100644 --- a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml +++ b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml @@ -199,6 +199,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after + it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml index 07940b088..56e882090 100644 --- a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml +++ b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml @@ -95,6 +95,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after it has + been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml index 196d5e497..f1da0572c 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml @@ -199,6 +199,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after + it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml index 07940b088..56e882090 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml @@ -95,6 +95,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after it has + been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index f546ce769..111459e01 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -76,6 +76,10 @@ type RequeuingTemplate struct { // This may be necesary to prevent redeployment of generic items that create pods that were not correctly deleted. // +kubebuilder:default=0 ForceDeletionTimeInSeconds int `json:"forceDeletionTimeInSeconds,omitempty" protobuf:"bytes,7,rep,name=forceDeletionTimeInSeconds"` + // When a job is ready to be redispatched after it has been requeued due to preemption, MCAD will + // wait 'pauseTimeInSeconds' before redispatching + // +kubebuilder:default=0 + PauseTimeInSeconds int `json:"pauseTimeInSeconds,omitempty" protobuf:"bytes,8,rep,name=pauseTimeInSeconds"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object From 55fbef971a01eba895424cf441bb1b6b00344f6f Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Tue, 24 Oct 2023 14:53:42 -0400 Subject: [PATCH 5/5] Support for 'PauseTimeInSeconds' so MCAD delays redispatching after a job has been requeued --- .../queuejob/queuejob_controller_ex.go | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 9497dbc39..09a12a33d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1898,8 +1898,9 @@ func (cc *XController) worker() { } klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) - if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 { - // Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods + if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 || queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds > 0 { + // 1) Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods + // 2) Delaying redispatching with user specified wait time var err error newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper") if err != nil { @@ -1920,17 +1921,20 @@ func (cc *XController) worker() { return nil } } else if queuejob.Status.QueueJobState == arbv1.AppWrapperCondDeleted { - // The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items - // MCAD will force delete any pods that remain in the system - if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 { - index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") - if index < 0 { - klog.V(4).Infof("WARNING: [worker] Forced deletion condition was not added after 'Cleanup'. Silently ignoring forced cleanup.") - } else { - deletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds) * time.Second) - currentTime := time.Now() + // Checking of 'AwaitingDeletion' condition exists + index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") + if index < 0 { + klog.V(4).Infof("WARNING: [worker] Forced deletion condition was not added after 'Cleanup'. Silently ignoring forced cleanup.") + } else { + // Get current time to compare to + currentTime := time.Now() - if currentTime.After(deletionTime) { + // The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items + // MCAD will force delete any pods that remain in the system + if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 { + forceDeletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds) * time.Second) + + if currentTime.After(forceDeletionTime) { if err := cc.ForcefulCleanup(ctx, queuejob); err != nil { klog.V(5).Infof("[worker] Forced deletion of remaining live pods didn't work (Ending %s/%s). Retrying in the next cycle.", queuejob.Namespace, queuejob.Name) return nil @@ -1940,6 +1944,18 @@ func (cc *XController) worker() { return nil } } + + // When a job is ready to be redispatched after it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' before redispatching + if queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds > 0 { + redispatchingTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds) * time.Second) + + if currentTime.After(redispatchingTime) { + klog.V(5).Infof("[worker] Ready to redispatch the AppWrapper (Ending %s/%s).", queuejob.Namespace, queuejob.Name) + } else { + klog.V(8).Infof("[worker] Waiting for 'PauseTimeInSeconds' seconds before redispatching job '%s/%s'.", queuejob.Namespace, queuejob.Name) + return nil + } + } } }