package main import ( "context" "flag" "fmt" "os" "time" "github.com/seal-io/terraform-provider-kaniko/utils" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/utils/pointer" ) func main() { kubeconfig := flag.String("kubeconfig", "/Users/yangzun/.kube/config", "absolute path to the kubeconfig file") config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 获取当前时间 now := time.Now() // jobName := "a" podName := fmt.Sprintf("main-%s", utils.String(8)) namespace := "default" // fmt.Printf("Watching pod %s in namespace %s\n", podName, namespace) // watchPod(clientset, jobName, namespace) createPod(clientset, podName, namespace) // podName := createJob(clientset, jobName, namespace) watchPod(clientset, podName, namespace, now) // cleanJob(clientset, jobName, namespace) defer func() { // 打印执行时间 fmt.Println("time=", time.Since(now)) cleanPod(clientset, podName, namespace) }() } // watch pod 状态,直到其 secceed 或者 failed func watchPod(clientset *kubernetes.Clientset, podName, namespace string, nowTime time.Time) { fmt.Printf("Watching pod %s\n", podName) watchCount := 0 // 循环从 watcher.ResultChan()中读取事件, 支持自动重连 OuterLoop: for { watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.ListOptions{ FieldSelector: "metadata.name=" + podName, TimeoutSeconds: pointer.Int64Ptr(600), // Set the timeout to 1 hour }) if err != nil { panic(err) } ch := watch.ResultChan() for event := range ch { watchCount++ fmt.Println("watchCount=", watchCount) pod, ok := event.Object.(*v1.Pod) if !ok { fmt.Println("unexpected type") continue } // 如果 pod 的状态是 failed 或者 succeeded,则退出循环 if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { fmt.Printf("Pod %s finished with status %s\n", podName, pod.Status.Phase) break OuterLoop } else { fmt.Printf("Pod %s still running with status %s, time: %s\n", podName, pod.Status.Phase, time.Since(nowTime)) } } } // for event := range watch.ResultChan() { // watchCount++ // fmt.Println("watchCount=", watchCount) // pod, ok := event.Object.(*v1.Pod) // if !ok { // fmt.Println("unexpected type") // continue // } // // 如果 pod 的状态是 failed 或者 succeeded,则退出循环 // if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { // fmt.Printf("Pod %s finished with status %s\n", podName, pod.Status.Phase) // break // } else { // fmt.Printf("Pod %s still running with status %s\n", podName, pod.Status.Phase) // } // } } // 创建 job func createPod(clientset *kubernetes.Clientset, podName, namespace string) { fmt.Printf("Creating pod %s\n", podName) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, }, Spec: v1.PodSpec{ Containers: []v1.Container{ { Name: podName, Image: "perl", Command: []string{ "sleep", "180", }, }, }, RestartPolicy: v1.RestartPolicyNever, }, } _, err := clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{ FieldManager: "test", }) if err != nil { panic(err) } } // 清理 job // func cleanJob(clientset *kubernetes.Clientset, jobName, namespace string) { // fmt.Print("Deleting job") // err := clientset.BatchV1().Jobs(namespace).Delete(context.TODO(), jobName, metav1.DeleteOptions{ // GracePeriodSeconds: pointer.Int64Ptr(0), // }) // if err != nil { // fmt.Println(err) // os.Exit(1) // } // } // 清理 pod func cleanPod(clientset *kubernetes.Clientset, podName, namespace string) { fmt.Print("Deleting pod") err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{ GracePeriodSeconds: pointer.Int64Ptr(0), }) if err != nil { fmt.Println(err) os.Exit(1) } }