001/* 002 * VM-Operator 003 * Copyright (C) 2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import io.kubernetes.client.Discovery.APIResource; 022import io.kubernetes.client.common.KubernetesListObject; 023import io.kubernetes.client.common.KubernetesObject; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch.Response; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.Optional; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.logging.Level; 033import org.jdrupes.vmoperator.common.K8s; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sObserver; 036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 037import org.jdrupes.vmoperator.manager.events.ChannelManager; 038import org.jdrupes.vmoperator.manager.events.Exit; 039import org.jgrapes.core.Channel; 040import org.jgrapes.core.Component; 041import org.jgrapes.core.Components; 042import org.jgrapes.core.annotation.Handler; 043import org.jgrapes.core.events.Start; 044import org.jgrapes.core.events.Stop; 045import org.jgrapes.util.events.ConfigurationUpdate; 046 047/** 048 * A base class for monitoring VM related resources. 049 * 050 * @param <O> the object type for the context 051 * @param <L> the object list type for the context 052 */ 053@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis" }) 054public abstract class AbstractMonitor<O extends KubernetesObject, 055 L extends KubernetesListObject, C extends Channel> extends Component { 056 057 private final Class<O> objectClass; 058 private final Class<L> objectListClass; 059 private K8sClient client; 060 private APIResource context; 061 private String namespace; 062 private ListOptions options = new ListOptions(); 063 private final AtomicInteger observerCounter = new AtomicInteger(0); 064 private ChannelManager<String, C, ?> channelManager; 065 private boolean channelManagerMaster; 066 067 /** 068 * Initializes the instance. 069 * 070 * @param componentChannel the component channel 071 */ 072 protected AbstractMonitor(Channel componentChannel, Class<O> objectClass, 073 Class<L> objectListClass) { 074 super(componentChannel); 075 this.objectClass = objectClass; 076 this.objectListClass = objectListClass; 077 } 078 079 /** 080 * Return the client. 081 * 082 * @return the client 083 */ 084 public K8sClient client() { 085 return client; 086 } 087 088 /** 089 * Sets the client to be used. 090 * 091 * @param client the client 092 * @return the abstract monitor 093 */ 094 public AbstractMonitor<O, L, C> client(K8sClient client) { 095 this.client = client; 096 return this; 097 } 098 099 /** 100 * Return the observed namespace. 101 * 102 * @return the namespace 103 */ 104 public String namespace() { 105 return namespace; 106 } 107 108 /** 109 * Sets the namespace to be observed. 110 * 111 * @param namespace the namespaceToWatch to set 112 * @return the abstract monitor 113 */ 114 public AbstractMonitor<O, L, C> namespace(String namespace) { 115 this.namespace = namespace; 116 return this; 117 } 118 119 /** 120 * Returns the options for selecting the objects to observe. 121 * 122 * @return the options 123 */ 124 public ListOptions options() { 125 return options; 126 } 127 128 /** 129 * Sets the options for selecting the objects to observe. 130 * 131 * @param options the options to set 132 * @return the abstract monitor 133 */ 134 public AbstractMonitor<O, L, C> options(ListOptions options) { 135 this.options = options; 136 return this; 137 } 138 139 /** 140 * Returns the observed context. 141 * 142 * @return the context 143 */ 144 public APIResource context() { 145 return context; 146 } 147 148 /** 149 * Sets the context to observe. 150 * 151 * @param context the context 152 * @return the abstract monitor 153 */ 154 public AbstractMonitor<O, L, C> context(APIResource context) { 155 this.context = context; 156 return this; 157 } 158 159 /** 160 * Returns the channel manager. 161 * 162 * @return the context 163 */ 164 public ChannelManager<String, C, ?> channelManager() { 165 return channelManager; 166 } 167 168 /** 169 * Sets the channel manager. 170 * 171 * @param channelManager the channel manager 172 * @return the abstract monitor 173 */ 174 public AbstractMonitor<O, L, C> 175 channelManager(ChannelManager<String, C, ?> channelManager) { 176 this.channelManager = channelManager; 177 return this; 178 } 179 180 /** 181 * Looks for a key "namespace" in the configuration and, if found, 182 * sets the namespace to its value. 183 * 184 * @param event the event 185 */ 186 @Handler 187 public void onConfigurationUpdate(ConfigurationUpdate event) { 188 event.structured(Components.manager(parent()).componentPath()) 189 .ifPresent(c -> { 190 if (c.containsKey("namespace")) { 191 namespace = (String) c.get("namespace"); 192 } 193 }); 194 } 195 196 /** 197 * Handle the start event. Configures the namespace invokes 198 * {@link #prepareMonitoring()} and starts the observers. 199 * 200 * @param event the event 201 */ 202 @Handler(priority = 10) 203 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 204 public void onStart(Start event) { 205 try { 206 // Get namespace 207 if (namespace == null) { 208 var path = Path 209 .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); 210 if (Files.isReadable(path)) { 211 namespace 212 = Files.lines(path).findFirst().orElse(null); 213 } 214 } 215 216 // Additional preparations by derived class 217 prepareMonitoring(); 218 assert client != null; 219 assert context != null; 220 assert namespace != null; 221 logger.fine(() -> "Observing " + K8s.toString(context) 222 + " objects in " + namespace); 223 224 // Monitor all versions 225 for (var version : context.getVersions()) { 226 createObserver(version); 227 } 228 registerAsGenerator(); 229 } catch (IOException | ApiException e) { 230 logger.log(Level.SEVERE, e, 231 () -> "Cannot watch VMs, terminating."); 232 event.cancel(true); 233 fire(new Exit(1)); 234 } 235 } 236 237 private void createObserver(String version) { 238 observerCounter.incrementAndGet(); 239 new K8sObserver<>(objectClass, objectListClass, client, 240 K8s.preferred(context, version), namespace, options) 241 .handler((c, r) -> { 242 handleChange(c, r); 243 if (ResponseType.valueOf(r.type) == ResponseType.DELETED 244 && channelManagerMaster) { 245 channelManager.remove(r.object.getMetadata().getName()); 246 } 247 }).onTerminated((o, t) -> { 248 if (observerCounter.decrementAndGet() == 0) { 249 unregisterAsGenerator(); 250 } 251 // Exception has been logged already 252 if (t != null) { 253 fire(new Stop()); 254 } 255 }).start(); 256 } 257 258 /** 259 * Invoked by {@link #onStart(Start)} after the namespace has 260 * been configured and before starting the observer. 261 * 262 * @throws IOException Signals that an I/O exception has occurred. 263 * @throws ApiException the api exception 264 */ 265 @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") 266 protected void prepareMonitoring() throws IOException, ApiException { 267 // To be overridden by derived class. 268 } 269 270 /** 271 * Handle an observed change. 272 * 273 * @param client the client 274 * @param change the change 275 */ 276 protected abstract void handleChange(K8sClient client, Response<O> change); 277 278 /** 279 * Returns the {@link Channel} for the given name. 280 * 281 * @param name the name 282 * @return the channel used for events related to the specified object 283 */ 284 protected Optional<C> channel(String name) { 285 return channelManager.getChannel(name); 286 } 287}