-
Notifications
You must be signed in to change notification settings - Fork 216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KWOK] retry when applying Stages fails #911
Conversation
✅ Deploy Preview for k8s-kwok canceled.
|
Hi @caozhuozi. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Hi, @wzshiming. Currently, I only check those network errors and bypass all other error types. The logic is slightly different from the one I proposed before: #911 (comment) var ComputePatchError = errors.New("compute patch error") But it's hard for me to integrate the newly defined error into the existing code. For example: func (c *NodeController) computePatch(node *corev1.Node, tpl string) ([]byte, error) {
patch, err := c.renderer.ToJSON(tpl, node)
if err != nil {
return nil, err
}
original, err := json.Marshal(node.Status)
if err != nil {
return nil, err
}
sum, err := strategicpatch.StrategicMergePatch(original, patch, node.Status)
if err != nil {
return nil, err
}
nodeStatus := corev1.NodeStatus{}
err = json.Unmarshal(sum, &nodeStatus)
if err != nil {
return nil, err
}
dist, err := json.Marshal(nodeStatus)
if err != nil {
return nil, err
}
if bytes.Equal(original, dist) {
return nil, nil
}
return json.Marshal(map[string]json.RawMessage{
"status": patch,
})
} If I return the newly defined error in func (c *NodeController) computePatch(node *corev1.Node, tpl string) ([]byte, error) {
patch, err := c.renderer.ToJSON(tpl, node)
if err != nil {
return nil, ComputePatchError // the original `err` will be lost
}
// omitted
} An alternative way is that I return the newly defined error where patch, err = c.computePatch(node, next.StatusTemplate)
if err != nil {
logger.Debug("failed to obtain the patch of node %s: %w", node.Name, err)
return ComputePatchError
} But I am not sure if it is Go idiomatic. |
I think like We can use |
Hi, @wzshiming. Do you mean to join the specific error with the newly defined error?
|
I think you can finish this and verify the various corner cases locally |
I made several changes in this commit:
Please feel free to give comments! 🙏 |
since we only need to check the network error, I just removed the defined |
/ok-to-test |
In this new commit,
If there are no problems with this version, I will start to change other controllers. |
Sorry for the delay, this PR may need to be rebase due to the conflict with #920, I will take time to test this PR this week. |
Oh, @wzshiming! Understood! No worry! I will rebase it. 😊 |
ff2c2dd
to
2ecf597
Compare
needRetry, err := c.playStage(ctx, pod.Resource, pod.Stage) | ||
if err != nil { | ||
logger.Error("failed to apply stage", err, | ||
"node", pod.Resource.Name, "stage", pod.Stage.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"node", pod.Resource.Name, "stage", pod.Stage.Name()) | |
"pod", pod.Key, | |
"stage", pod.Stage.Name(), | |
) |
if needRetry { | ||
*resource.RetryCount++ | ||
logger.Info("retrying for failed job", | ||
"resource", resource.Resource.GetName(), "stage", resource.Stage.Name(), "retry", *resource.RetryCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"resource", resource.Resource.GetName(), "stage", resource.Stage.Name(), "retry", *resource.RetryCount) | |
"resource", resource.Key, | |
"stage", resource.Stage.Name(), | |
"retry", *resource.RetryCount, | |
) |
logger.Debug("Skip modify status", | ||
"reason", "do not need to modify status", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.Debug("Skip modify status", | |
"reason", "do not need to modify status", | |
logger.Debug("Skip node", | |
"reason", "do not need to modify", |
pkg/kwok/controllers/utils.go
Outdated
|
||
// backoffDelayByStep calculates the backoff delay period based on steps | ||
func backoffDelayByStep(steps int, c wait.Backoff) time.Duration { | ||
if steps <= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if steps <= 0 { | |
if steps < 0 { |
The parameter for the first failure should be 0, which is the expected 1s delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since uint64
has no value less than zero after the newest change, I just removed the if
clause.
*pod.RetryCount++ | ||
logger.Info("retrying for failed job", | ||
"pod", pod.Key, | ||
"stage", pod.Stage.Name(), | ||
"retry", *pod.RetryCount, | ||
) | ||
// for failed jobs, we re-push them into the queue with a lower weight | ||
// and a backoff period to avoid blocking normal tasks | ||
retryDelay := backoffDelayByStep(*pod.RetryCount, c.backoff) | ||
c.addStageJob(pod, retryDelay, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*pod.RetryCount++ | |
logger.Info("retrying for failed job", | |
"pod", pod.Key, | |
"stage", pod.Stage.Name(), | |
"retry", *pod.RetryCount, | |
) | |
// for failed jobs, we re-push them into the queue with a lower weight | |
// and a backoff period to avoid blocking normal tasks | |
retryDelay := backoffDelayByStep(*pod.RetryCount, c.backoff) | |
c.addStageJob(pod, retryDelay, 1) | |
retryCount := atomic.AddUint64(resource.RetryCount, 1) - 1 | |
logger.Info("retrying for failed job", | |
"pod", pod.Key, | |
"stage", pod.Stage.Name(), | |
"retry", retryCount, | |
) | |
// for failed jobs, we re-push them into the queue with a lower weight | |
// and a backoff period to avoid blocking normal tasks | |
retryDelay := backoffDelayByStep(retryCount, c.backoff) | |
c.addStageJob(pod, retryDelay, 1) |
In some corner case, there might be a data race, using atomic add to avoid panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!!
) | ||
// for failed jobs, we re-push them into the queue with a lower weight | ||
// and a backoff period to avoid blocking normal tasks | ||
retryDelay := backoffDelayByStep(*node.RetryCount, c.backoff) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
) | ||
// for failed jobs, we re-push them into the queue with a lower weight | ||
// and a backoff period to avoid blocking normal tasks | ||
retryDelay := backoffDelayByStep(*resource.RetryCount, c.backoff) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: caozhuozi, wzshiming The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
I made many mistakes and misunderstood a lot of things along the way (and I also learned a lot.) Thanks for the great patience and for taking the trouble to guide me during the PR. |
What type of PR is this?
/kind feature
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #816
Special notes for your reviewer:
Since we have a new design based on the weight delaying queue(#902), the old implementation #904 was rejected. Will close the old one and start here.
Currently, I only modified the node controller for a pre-review. Please feel free to give comments!
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: