Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rechecking pending Pods #195

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/reconciler/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler"
)

const defaultReconcilerTimeout = 30
const defaultReconcilerTimeout = 60

func main() {
kubeConfigFile := flag.String("kubeconfig", "", "the path to the Kubernetes configuration file")
Expand Down
30 changes: 0 additions & 30 deletions cmd/reconciler/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,36 +254,6 @@ var _ = Describe("Whereabouts IP reconciler", func() {
})
})

Context("a pod in pending state, without an IP in its network-status", func() {
const poolName = "pool1"

var pod *v1.Pod
var pool *v1alpha1.IPPool

BeforeEach(func() {
var err error
pod, err = k8sClientSet.CoreV1().Pods(namespace).Create(
context.TODO(),
generatePendingPod(namespace, podName),
metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())

pool = generateIPPoolSpec(ipRange, namespace, poolName, pod.Name)
Expect(k8sClient.Create(context.Background(), pool)).NotTo(HaveOccurred())

reconcileLooper, err = reconciler.NewReconcileLooperWithKubeconfig(context.TODO(), kubeConfigPath, timeout)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(k8sClient.Delete(context.Background(), pool)).NotTo(HaveOccurred())
Expect(k8sClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{}))
})

It("cannot be reconciled", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).To(BeEmpty())
})
})
})

func generateIPPoolSpec(ipRange string, namespace string, poolName string, podNames ...string) *v1alpha1.IPPool {
Expand Down
1 change: 1 addition & 0 deletions doc/crds/daemonset-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rules:
- pods
verbs:
- list
- get
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
71 changes: 63 additions & 8 deletions pkg/reconciler/iploop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"strings"
"time"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/allocate"
Expand Down Expand Up @@ -102,19 +103,73 @@ func (rl *ReconcileLooper) findOrphanedIPsPerPool(ipPools []storage.IPPool) erro
func (rl ReconcileLooper) isPodAlive(podRef string, ip string) bool {
for livePodRef, livePod := range rl.liveWhereaboutsPods {
if podRef == livePodRef {
livePodIPs := livePod.ips
logging.Debugf(
"pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s",
livePodRef,
ip,
livePodIPs)
_, isFound := livePodIPs[ip]
return isFound || livePod.phase == v1.PodPending
isFound := isIpOnPod(&livePod, podRef, ip)
if !isFound && (livePod.phase == v1.PodPending) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could make sense to export this into an helper func.

/* Sometimes pods are still coming up, and may not yet have Multus
* annotation added to it yet. We don't want to check the IPs yet
* so re-fetch the Pod 5x
*/
podToMatch := &livePod
retries := 0

logging.Debugf("Re-fetching Pending Pod: %s IP-to-match: %s", livePodRef, ip)

for retries < storage.PodRefreshRetries {
retries += 1
podToMatch = rl.refreshPod(livePodRef)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to instead of retrying now just put the pods in pending state to a list, and after the main loop (that handles all pods that are not pending) iterate all the ones that were in pending state ?

I am unsure if I am overthinking this.

if podToMatch == nil {
logging.Debugf("Cleaning up...")
return false
} else if podToMatch.phase != v1.PodPending {
logging.Debugf("Pending Pod is now in phase: %s", podToMatch.phase)
break
} else {
isFound = isIpOnPod(podToMatch, podRef, ip)
// Short-circuit - Pending Pod may have IP now
if isFound {
logging.Debugf("Pod now has IP annotation while in Pending")
return true
}
time.Sleep(time.Duration(500) * time.Millisecond)
}
}
isFound = isIpOnPod(podToMatch, podRef, ip)
}

return isFound
}
}
return false
}

func (rl ReconcileLooper) refreshPod(podRef string) *podWrapper {
namespace, podName := splitPodRef(podRef)
if namespace == "" || podName == "" {
logging.Errorf("Invalid podRef format: %s", podRef)
return nil
}

pod, err := rl.k8sClient.GetPod(namespace, podName)
if err != nil {
logging.Errorf("Failed to refresh Pod %s: %s\n", podRef, err)
return nil
}

wrappedPod := wrapPod(*pod)
logging.Debugf("Got refreshed pod: %v", wrappedPod)
return wrappedPod
}

func splitPodRef(podRef string) (string, string) {
namespacedName := strings.Split(podRef, "/")
if len(namespacedName) != 2 {
logging.Errorf("Failed to split podRef %s", podRef)
return "", ""
}

return namespacedName[0], namespacedName[1]
}

func composePodRef(pod v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/reconciler/wrappedPod.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type void struct{}

func wrapPod(pod v1.Pod) *podWrapper {
return &podWrapper{
ips: getFlatIPSet(pod),
ips: getFlatIPSet(pod),
phase: pod.Status.Phase,
}
}
Expand Down Expand Up @@ -80,3 +80,14 @@ func getFlatIPSet(pod v1.Pod) map[string]void {
}
return ipSet
}

func isIpOnPod(livePod *podWrapper, podRef, ip string) bool {
livePodIPs := livePod.ips
logging.Debugf(
"pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s",
podRef,
ip,
livePodIPs)
_, isFound := livePodIPs[ip]
return isFound
}
9 changes: 9 additions & 0 deletions pkg/storage/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ func (i *Client) ListPods(ctx context.Context) ([]v1.Pod, error) {
return podList.Items, nil
}

func (i *Client) GetPod(namespace, name string) (*v1.Pod, error) {
pod, err := i.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}

return pod, nil
Comment on lines +121 to +126
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, maybe I'm missing something, but why don't you just return i.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) ?

}

func (i *Client) ListOverlappingIPs(ctx context.Context) ([]whereaboutsv1alpha1.OverlappingRangeIPReservation, error) {
overlappingIPsList := whereaboutsv1alpha1.OverlappingRangeIPReservationList{}
if err := i.client.List(ctx, &overlappingIPsList, &client.ListOptions{}); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var (
RequestTimeout = 10 * time.Second

// DatastoreRetries defines how many retries are attempted when updating the Pool
DatastoreRetries = 100
DatastoreRetries = 100
PodRefreshRetries = 3
)

// IPPool is the interface that represents an manageable pool of allocated IPs
Expand Down