001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.logging.Level; 032import org.jdrupes.vmoperator.common.K8s; 033import org.jdrupes.vmoperator.common.K8sClient; 034import org.jdrupes.vmoperator.common.K8sObserver; 035import org.jdrupes.vmoperator.manager.events.Exit; 036import org.jgrapes.core.Channel; 037import org.jgrapes.core.Component; 038import org.jgrapes.core.Components; 039import org.jgrapes.core.annotation.Handler; 040import org.jgrapes.core.events.Start; 041import org.jgrapes.core.events.Stop; 042import org.jgrapes.util.events.ConfigurationUpdate; 043 044/** 045 * A base class for monitoring VM related resources. When started, 046 * it creates observers for all versions of the the {@link APIResource} 047 * configured by {@link #context(APIResource)}. The APIResource is not 048 * passed to the constructor because in some cases it has to be 049 * evaluated lazily. 050 * 051 * @param <O> the object type for the context 052 * @param <L> the object list type for the context 053 */ 054@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis" }) 055public abstract class AbstractMonitor<O extends KubernetesObject, 056 L extends KubernetesListObject, C extends Channel> extends Component { 057 058 private final Class<O> objectClass; 059 private final Class<L> objectListClass; 060 private K8sClient client; 061 private APIResource context; 062 private String namespace; 063 private ListOptions options = new ListOptions(); 064 private final AtomicInteger observerCounter = new AtomicInteger(0); 065 066 /** 067 * Initializes the instance. 068 * 069 * @param componentChannel the component channel 070 * @param objectClass the class of the Kubernetes object to watch 071 * @param objectListClass the class of the list of Kubernetes objects 072 * to watch 073 */ 074 protected AbstractMonitor(Channel componentChannel, 075 Class<O> objectClass, Class<L> objectListClass) { 076 super(componentChannel); 077 this.objectClass = objectClass; 078 this.objectListClass = objectListClass; 079 } 080 081 /** 082 * Return the client. 083 * 084 * @return the client 085 */ 086 public K8sClient client() { 087 return client; 088 } 089 090 /** 091 * Sets the client to be used. 092 * 093 * @param client the client 094 * @return the abstract monitor 095 */ 096 public AbstractMonitor<O, L, C> client(K8sClient client) { 097 this.client = client; 098 return this; 099 } 100 101 /** 102 * Return the observed namespace. 103 * 104 * @return the namespace 105 */ 106 public String namespace() { 107 return namespace; 108 } 109 110 /** 111 * Sets the namespace to be observed. 112 * 113 * @param namespace the namespaceToWatch to set 114 * @return the abstract monitor 115 */ 116 public AbstractMonitor<O, L, C> namespace(String namespace) { 117 this.namespace = namespace; 118 return this; 119 } 120 121 /** 122 * Returns the options for selecting the objects to observe. 123 * 124 * @return the options 125 */ 126 public ListOptions options() { 127 return options; 128 } 129 130 /** 131 * Sets the options for selecting the objects to observe. 132 * 133 * @param options the options to set 134 * @return the abstract monitor 135 */ 136 public AbstractMonitor<O, L, C> options(ListOptions options) { 137 this.options = options; 138 return this; 139 } 140 141 /** 142 * Returns the observed context. 143 * 144 * @return the context 145 */ 146 public APIResource context() { 147 return context; 148 } 149 150 /** 151 * Sets the context to observe. 152 * 153 * @param context the context 154 * @return the abstract monitor 155 */ 156 public AbstractMonitor<O, L, C> context(APIResource context) { 157 this.context = context; 158 return this; 159 } 160 161 /** 162 * Looks for a key "namespace" in the configuration and, if found, 163 * sets the namespace to its value. 164 * 165 * @param event the event 166 */ 167 @Handler 168 public void onConfigurationUpdate(ConfigurationUpdate event) { 169 event.structured(Components.manager(parent()).componentPath()) 170 .ifPresent(c -> { 171 if (c.containsKey("namespace")) { 172 namespace = (String) c.get("namespace"); 173 } 174 }); 175 } 176 177 /** 178 * Handle the start event. Configures the namespace, invokes 179 * {@link #prepareMonitoring()} and starts the observers. 180 * 181 * @param event the event 182 */ 183 @Handler(priority = 10) 184 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 185 public void onStart(Start event) { 186 try { 187 // Get namespace 188 if (namespace == null) { 189 var path = Path 190 .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); 191 if (Files.isReadable(path)) { 192 namespace 193 = Files.lines(path).findFirst().orElse(null); 194 } 195 } 196 197 // Additional preparations by derived class 198 prepareMonitoring(); 199 assert client != null; 200 assert context != null; 201 assert namespace != null; 202 logger.fine(() -> "Observing " + K8s.toString(context) 203 + " objects in " + namespace); 204 205 // Monitor all versions 206 for (var version : context.getVersions()) { 207 createObserver(version); 208 } 209 registerAsGenerator(); 210 } catch (IOException | ApiException e) { 211 logger.log(Level.SEVERE, e, 212 () -> "Cannot watch VMs, terminating."); 213 event.cancel(true); 214 fire(new Exit(1)); 215 } 216 } 217 218 private void createObserver(String version) { 219 observerCounter.incrementAndGet(); 220 new K8sObserver<>(objectClass, objectListClass, client, 221 K8s.preferred(context, version), namespace, options) 222 .handler((c, r) -> { 223 handleChange(c, r); 224 }).onTerminated((o, t) -> { 225 if (observerCounter.decrementAndGet() == 0) { 226 unregisterAsGenerator(); 227 } 228 // Exception has been logged already 229 if (t != null) { 230 fire(new Stop()); 231 } 232 }).start(); 233 } 234 235 /** 236 * Invoked by {@link #onStart(Start)} after the namespace has 237 * been configured and before starting the observer. This is 238 * the last opportunity to invoke {@link #context(APIResource)}. 239 * 240 * @throws IOException Signals that an I/O exception has occurred. 241 * @throws ApiException the api exception 242 */ 243 @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") 244 protected void prepareMonitoring() throws IOException, ApiException { 245 // To be overridden by derived class. 246 } 247 248 /** 249 * Handle an observed change. 250 * 251 * @param client the client 252 * @param change the change 253 */ 254 protected abstract void handleChange(K8sClient client, Response<O> change); 255}