001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.common; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.GenericKubernetesApi; 027import io.kubernetes.client.util.generic.options.ListOptions; 028import java.time.Duration; 029import java.time.Instant; 030import java.util.function.BiConsumer; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033import org.jgrapes.core.Components; 034 035/** 036 * An observer that watches namespaced resources in a given context and 037 * invokes a handler on changes. 038 * 039 * @param <O> the object type for the context 040 * @param <L> the object list type for the context 041 */ 042public class K8sObserver<O extends KubernetesObject, 043 L extends KubernetesListObject> { 044 045 /** 046 * The type of change reported by {@link Response} as enum. 047 */ 048 public enum ResponseType { 049 ADDED, MODIFIED, DELETED 050 } 051 052 @SuppressWarnings("PMD.FieldNamingConventions") 053 protected final Logger logger = Logger.getLogger(getClass().getName()); 054 055 protected final K8sClient client; 056 protected final GenericKubernetesApi<O, L> api; 057 protected final APIResource context; 058 protected final String namespace; 059 protected final ListOptions options; 060 protected final Thread thread; 061 protected BiConsumer<K8sClient, Response<O>> handler; 062 protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated; 063 064 /** 065 * Create and start a new observer for objects in the given context 066 * (using preferred version) and namespace with the given options. 067 * 068 * @param objectClass the object class 069 * @param objectListClass the object list class 070 * @param client the client 071 * @param context the context 072 * @param namespace the namespace 073 * @param options the options 074 */ 075 @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop", 076 "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable", 077 "PMD.CognitiveComplexity", "PMD.AvoidCatchingGenericException" }) 078 public K8sObserver(Class<O> objectClass, Class<L> objectListClass, 079 K8sClient client, APIResource context, String namespace, 080 ListOptions options) { 081 this.client = client; 082 this.context = context; 083 this.namespace = namespace; 084 this.options = options; 085 086 api = new GenericKubernetesApi<>(objectClass, objectListClass, 087 context.getGroup(), context.getPreferredVersion(), 088 context.getResourcePlural(), client); 089 thread = (Components.useVirtualThreads() ? Thread.ofVirtual() 090 : Thread.ofPlatform()).unstarted(() -> { 091 try { 092 logger 093 .config(() -> "Watching " + context.getResourcePlural() 094 + " (" + context.getPreferredVersion() + ")" 095 + " in " + namespace); 096 097 // Watch sometimes terminates without apparent reason. 098 while (!Thread.currentThread().isInterrupted()) { 099 Instant startedAt = Instant.now(); 100 try { 101 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 102 var changed 103 = api.watch(namespace, options).iterator(); 104 while (changed.hasNext()) { 105 handler.accept(client, changed.next()); 106 } 107 } catch (ApiException | RuntimeException e) { 108 logger.log(Level.FINE, e, () -> "Problem watching" 109 + " (will retry): " + e.getMessage()); 110 delayRestart(startedAt); 111 } 112 } 113 if (onTerminated != null) { 114 onTerminated.accept(this, null); 115 } 116 } catch (Throwable e) { 117 logger.log(Level.SEVERE, e, () -> "Probem watching: " 118 + e.getMessage()); 119 if (onTerminated != null) { 120 onTerminated.accept(this, e); 121 } 122 } 123 }); 124 } 125 126 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 127 private void delayRestart(Instant started) { 128 var runningFor = Duration 129 .between(started, Instant.now()).toMillis(); 130 if (runningFor < 5000) { 131 logger.log(Level.FINE, () -> "Waiting... "); 132 try { 133 Thread.sleep(5000 - runningFor); 134 } catch (InterruptedException e1) { // NOPMD 135 // Retry 136 } 137 logger.log(Level.FINE, () -> "Retrying"); 138 } 139 } 140 141 /** 142 * Sets the handler. 143 * 144 * @param handler the handler 145 * @return the observer 146 */ 147 public K8sObserver<O, L> 148 handler(BiConsumer<K8sClient, Response<O>> handler) { 149 this.handler = handler; 150 return this; 151 } 152 153 /** 154 * Sets a function to invoke if the observer terminates. First argument 155 * is this observer, the second is the throwable that caused the 156 * abnormal termination or `null` if the observer was terminated 157 * by {@link #stop()}. 158 * 159 * @param onTerminated the on terminated 160 * @return the observer 161 */ 162 public K8sObserver<O, L> onTerminated( 163 BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) { 164 this.onTerminated = onTerminated; 165 return this; 166 } 167 168 /** 169 * Start the observer. 170 * 171 * @return the observer 172 */ 173 public K8sObserver<O, L> start() { 174 if (handler == null) { 175 throw new IllegalStateException("No handler defined"); 176 } 177 thread.start(); 178 return this; 179 } 180 181 /** 182 * Stops the observer. 183 * 184 * @return the observer 185 */ 186 public K8sObserver<O, L> stop() { 187 thread.interrupt(); 188 return this; 189 } 190 191 /** 192 * Returns the client. 193 * 194 * @return the client 195 */ 196 public K8sClient client() { 197 return client; 198 } 199 200 /** 201 * Returns the context. 202 * 203 * @return the context 204 */ 205 public APIResource context() { 206 return context; 207 } 208 209 /** 210 * Returns the observed namespace. 211 * 212 * @return the namespace 213 */ 214 public String getNamespace() { 215 return namespace; 216 } 217 218 /** 219 * Returns the options for object selection. 220 * 221 * @return the list options 222 */ 223 public ListOptions options() { 224 return options; 225 } 226 227 @Override 228 @SuppressWarnings("PMD.UseLocaleWithCaseConversions") 229 public String toString() { 230 return "Observer for " + K8s.toString(context) + " " + namespace; 231 } 232 233}