001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.common; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.GenericKubernetesApi; 027import io.kubernetes.client.util.generic.options.ListOptions; 028import java.time.Duration; 029import java.time.Instant; 030import java.util.function.BiConsumer; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * An observer that watches namespaced resources in a given context and 036 * invokes a handler on changes. 037 * 038 * @param <O> the object type for the context 039 * @param <L> the object list type for the context 040 */ 041public class K8sObserver<O extends KubernetesObject, 042 L extends KubernetesListObject> { 043 044 /** 045 * The type of change reported by {@link Response} as enum. 046 */ 047 public enum ResponseType { 048 ADDED, MODIFIED, DELETED 049 } 050 051 @SuppressWarnings("PMD.FieldNamingConventions") 052 protected final Logger logger = Logger.getLogger(getClass().getName()); 053 054 protected final K8sClient client; 055 protected final GenericKubernetesApi<O, L> api; 056 protected final APIResource context; 057 protected final String namespace; 058 protected final ListOptions options; 059 protected final Thread thread; 060 protected BiConsumer<K8sClient, Response<O>> handler; 061 protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated; 062 063 /** 064 * Create and start a new observer for objects in the given context 065 * (using preferred version) and namespace with the given options. 066 * 067 * @param objectClass the object class 068 * @param objectListClass the object list class 069 * @param client the client 070 * @param context the context 071 * @param namespace the namespace 072 * @param options the options 073 */ 074 @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop", 075 "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable", 076 "PMD.CognitiveComplexity" }) 077 public K8sObserver(Class<O> objectClass, Class<L> objectListClass, 078 K8sClient client, APIResource context, String namespace, 079 ListOptions options) { 080 this.client = client; 081 this.context = context; 082 this.namespace = namespace; 083 this.options = options; 084 085 api = new GenericKubernetesApi<>(objectClass, objectListClass, 086 context.getGroup(), context.getPreferredVersion(), 087 context.getResourcePlural(), client); 088 thread = Thread.ofVirtual().unstarted(() -> { 089 try { 090 logger.config(() -> "Watching " + context.getResourcePlural() 091 + " (" + context.getPreferredVersion() + ")" 092 + " in " + namespace); 093 094 // Watch sometimes terminates without apparent reason. 095 while (!Thread.currentThread().isInterrupted()) { 096 Instant startedAt = Instant.now(); 097 try { 098 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 099 var changed = api.watch(namespace, options).iterator(); 100 while (changed.hasNext()) { 101 handler.accept(client, changed.next()); 102 } 103 } catch (ApiException | RuntimeException e) { 104 logger.log(Level.FINE, e, () -> "Problem watching" 105 + " (will retry): " + e.getMessage()); 106 delayRestart(startedAt); 107 } 108 } 109 if (onTerminated != null) { 110 onTerminated.accept(this, null); 111 } 112 } catch (Throwable e) { 113 logger.log(Level.SEVERE, e, () -> "Probem watching: " 114 + e.getMessage()); 115 if (onTerminated != null) { 116 onTerminated.accept(this, e); 117 } 118 } 119 }); 120 } 121 122 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 123 private void delayRestart(Instant started) { 124 var runningFor = Duration 125 .between(started, Instant.now()).toMillis(); 126 if (runningFor < 5000) { 127 logger.log(Level.FINE, () -> "Waiting... "); 128 try { 129 Thread.sleep(5000 - runningFor); 130 } catch (InterruptedException e1) { // NOPMD 131 // Retry 132 } 133 logger.log(Level.FINE, () -> "Retrying"); 134 } 135 } 136 137 /** 138 * Sets the handler. 139 * 140 * @param handler the handler 141 * @return the observer 142 */ 143 public K8sObserver<O, L> 144 handler(BiConsumer<K8sClient, Response<O>> handler) { 145 this.handler = handler; 146 return this; 147 } 148 149 /** 150 * Sets a function to invoke if the observer terminates. First argument 151 * is this observer, the second is the throwable that caused the 152 * abnormal termination or `null` if the observer was terminated 153 * by {@link #stop()}. 154 * 155 * @param onTerminated the on terminated 156 * @return the observer 157 */ 158 public K8sObserver<O, L> onTerminated( 159 BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) { 160 this.onTerminated = onTerminated; 161 return this; 162 } 163 164 /** 165 * Start the observer. 166 * 167 * @return the observer 168 */ 169 public K8sObserver<O, L> start() { 170 if (handler == null) { 171 throw new IllegalStateException("No handler defined"); 172 } 173 thread.start(); 174 return this; 175 } 176 177 /** 178 * Stops the observer. 179 * 180 * @return the observer 181 */ 182 public K8sObserver<O, L> stop() { 183 thread.interrupt(); 184 return this; 185 } 186 187 /** 188 * Returns the client. 189 * 190 * @return the client 191 */ 192 public K8sClient client() { 193 return client; 194 } 195 196 /** 197 * Returns the context. 198 * 199 * @return the context 200 */ 201 public APIResource context() { 202 return context; 203 } 204 205 /** 206 * Returns the observed namespace. 207 * 208 * @return the namespace 209 */ 210 public String getNamespace() { 211 return namespace; 212 } 213 214 /** 215 * Returns the options for object selection. 216 * 217 * @return the list options 218 */ 219 public ListOptions options() { 220 return options; 221 } 222 223 @Override 224 @SuppressWarnings("PMD.UseLocaleWithCaseConversions") 225 public String toString() { 226 return "Observer for " + K8s.toString(context) + " " + namespace; 227 } 228 229}