001/* 002 * VM-Operator 003 * Copyright (C) 2023,2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonArray; 022import com.google.gson.JsonObject; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.openapi.models.V1ObjectMeta; 025import io.kubernetes.client.util.Watch; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.util.Optional; 029import java.util.Set; 030import java.util.logging.Level; 031import java.util.stream.Collectors; 032import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP; 033import org.jdrupes.vmoperator.common.K8s; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sDynamicStub; 036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 037import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 038import org.jdrupes.vmoperator.common.K8sV1PodStub; 039import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 040import org.jdrupes.vmoperator.common.VmDefinitionModel; 041import org.jdrupes.vmoperator.common.VmDefinitionModels; 042import org.jdrupes.vmoperator.common.VmDefinitionStub; 043import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 044import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM; 045import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 046import org.jdrupes.vmoperator.manager.events.ChannelManager; 047import org.jdrupes.vmoperator.manager.events.VmChannel; 048import org.jdrupes.vmoperator.manager.events.VmDefChanged; 049import org.jdrupes.vmoperator.util.GsonPtr; 050import org.jgrapes.core.Channel; 051import org.jgrapes.core.Event; 052 053/** 054 * Watches for changes of VM definitions. 055 */ 056@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 057public class VmMonitor extends 058 AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> { 059 060 private final ChannelManager<String, VmChannel, ?> channelManager; 061 062 /** 063 * Instantiates a new VM definition watcher. 064 * 065 * @param componentChannel the component channel 066 * @param channelManager the channel manager 067 */ 068 public VmMonitor(Channel componentChannel, 069 ChannelManager<String, VmChannel, ?> channelManager) { 070 super(componentChannel, VmDefinitionModel.class, 071 VmDefinitionModels.class); 072 this.channelManager = channelManager; 073 } 074 075 @Override 076 protected void prepareMonitoring() throws IOException, ApiException { 077 client(new K8sClient()); 078 079 // Get all our API versions 080 var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM); 081 if (ctx.isEmpty()) { 082 logger.severe(() -> "Cannot get CRD context."); 083 return; 084 } 085 context(ctx.get()); 086 087 // Remove left over resources 088 purge(); 089 } 090 091 @SuppressWarnings("PMD.CognitiveComplexity") 092 private void purge() throws ApiException { 093 // Get existing CRs (VMs) 094 var known = K8sDynamicStub.list(client(), context(), namespace()) 095 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 096 ListOptions opts = new ListOptions(); 097 opts.setLabelSelector( 098 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 099 + "app.kubernetes.io/name=" + APP_NAME); 100 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 101 K8sV1ConfigMapStub.CONTEXT)) { 102 for (var resStub : K8sDynamicStub.list(client(), context, 103 namespace(), opts)) { 104 String instance = resStub.model() 105 .map(m -> m.metadata().getName()).orElse("(unknown)"); 106 if (!known.contains(instance)) { 107 resStub.delete(); 108 } 109 } 110 } 111 } 112 113 @Override 114 protected void handleChange(K8sClient client, 115 Watch.Response<VmDefinitionModel> response) { 116 V1ObjectMeta metadata = response.object.getMetadata(); 117 VmChannel channel = channelManager.channelGet(metadata.getName()); 118 119 // Get full definition and associate with channel as backup 120 var vmDef = response.object; 121 if (vmDef.data() == null) { 122 // ADDED event does not provide data, see 123 // https://github.com/kubernetes-client/java/issues/3215 124 vmDef = getModel(client, vmDef); 125 } 126 if (vmDef.data() != null) { 127 // New data, augment and save 128 addDynamicData(channel.client(), vmDef, channel.vmDefinition()); 129 channel.setVmDefinition(vmDef); 130 } else { 131 // Reuse cached 132 vmDef = channel.vmDefinition(); 133 } 134 if (vmDef == null) { 135 logger.warning( 136 () -> "Cannot get model for " + response.object.getMetadata()); 137 return; 138 } 139 if (ResponseType.valueOf(response.type) == ResponseType.DELETED) { 140 channelManager.remove(metadata.getName()); 141 } 142 143 // Create and fire changed event. Remove channel from channel 144 // manager on completion. 145 channel.pipeline() 146 .fire(Event.onCompletion( 147 new VmDefChanged(ResponseType.valueOf(response.type), 148 channel.setGeneration( 149 response.object.getMetadata().getGeneration()), 150 vmDef), 151 e -> { 152 if (e.type() == ResponseType.DELETED) { 153 channelManager 154 .remove(e.vmDefinition().metadata().getName()); 155 } 156 }), channel); 157 } 158 159 private VmDefinitionModel getModel(K8sClient client, 160 VmDefinitionModel vmDef) { 161 try { 162 return VmDefinitionStub.get(client, context(), namespace(), 163 vmDef.metadata().getName()).model().orElse(null); 164 } catch (ApiException e) { 165 return null; 166 } 167 } 168 169 private void addDynamicData(K8sClient client, VmDefinitionModel vmState, 170 VmDefinitionModel prevState) { 171 var rootNode = GsonPtr.to(vmState.data()).get(JsonObject.class); 172 173 // Maintain (or initialize) the resetCount 174 rootNode.addProperty("resetCount", Optional.ofNullable(prevState) 175 .map(ps -> GsonPtr.to(ps.data())) 176 .flatMap(d -> d.getAsLong("resetCount")).orElse(0L)); 177 178 // Add defaults in case the VM is not running 179 rootNode.addProperty("nodeName", ""); 180 rootNode.addProperty("nodeAddress", ""); 181 182 // VM definition status changes before the pod terminates. 183 // This results in pod information being shown for a stopped 184 // VM which is irritating. So check condition first. 185 var isRunning = GsonPtr.to(rootNode).to("status", "conditions") 186 .get(JsonArray.class) 187 .asList().stream().filter(el -> "Running" 188 .equals(((JsonObject) el).get("type").getAsString())) 189 .findFirst().map(el -> "True" 190 .equals(((JsonObject) el).get("status").getAsString())) 191 .orElse(false); 192 if (!isRunning) { 193 return; 194 } 195 var podSearch = new ListOptions(); 196 podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME 197 + ",app.kubernetes.io/component=" + APP_NAME 198 + ",app.kubernetes.io/instance=" + vmState.getMetadata().getName()); 199 try { 200 var podList 201 = K8sV1PodStub.list(client, namespace(), podSearch); 202 for (var podStub : podList) { 203 var nodeName = podStub.model().get().getSpec().getNodeName(); 204 rootNode.addProperty("nodeName", nodeName); 205 logger.fine(() -> "Added node name " + nodeName 206 + " to VM info for " + vmState.getMetadata().getName()); 207 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 208 var addrs = new JsonArray(); 209 podStub.model().get().getStatus().getPodIPs().stream() 210 .map(ip -> ip.getIp()).forEach(addrs::add); 211 rootNode.add("nodeAddresses", addrs); 212 logger.fine(() -> "Added node addresses " + addrs 213 + " to VM info for " + vmState.getMetadata().getName()); 214 } 215 } catch (ApiException e) { 216 logger.log(Level.WARNING, e, 217 () -> "Cannot access node information: " + e.getMessage()); 218 } 219 } 220}