001/* 002 * VM-Operator 003 * Copyright (C) 2023,2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonArray; 022import com.google.gson.JsonObject; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.openapi.models.V1ObjectMeta; 025import io.kubernetes.client.util.Watch; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.util.Optional; 029import java.util.Set; 030import java.util.logging.Level; 031import java.util.stream.Collectors; 032import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP; 033import org.jdrupes.vmoperator.common.K8s; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sDynamicStub; 036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 037import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 038import org.jdrupes.vmoperator.common.K8sV1PodStub; 039import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 040import org.jdrupes.vmoperator.common.VmDefinitionModel; 041import org.jdrupes.vmoperator.common.VmDefinitionModels; 042import org.jdrupes.vmoperator.common.VmDefinitionStub; 043import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 044import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM; 045import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 046import org.jdrupes.vmoperator.manager.events.VmChannel; 047import org.jdrupes.vmoperator.manager.events.VmDefChanged; 048import org.jdrupes.vmoperator.util.GsonPtr; 049import org.jgrapes.core.Channel; 050 051/** 052 * Watches for changes of VM definitions. 053 */ 054@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 055public class VmMonitor extends 056 AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> { 057 058 /** 059 * Instantiates a new VM definition watcher. 060 * 061 * @param componentChannel the component channel 062 */ 063 public VmMonitor(Channel componentChannel) { 064 super(componentChannel, VmDefinitionModel.class, 065 VmDefinitionModels.class); 066 } 067 068 @Override 069 protected void prepareMonitoring() throws IOException, ApiException { 070 client(new K8sClient()); 071 072 // Get all our API versions 073 var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM); 074 if (ctx.isEmpty()) { 075 logger.severe(() -> "Cannot get CRD context."); 076 return; 077 } 078 context(ctx.get()); 079 080 // Remove left over resources 081 purge(); 082 } 083 084 @SuppressWarnings("PMD.CognitiveComplexity") 085 private void purge() throws ApiException { 086 // Get existing CRs (VMs) 087 var known = K8sDynamicStub.list(client(), context(), namespace()) 088 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 089 ListOptions opts = new ListOptions(); 090 opts.setLabelSelector( 091 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 092 + "app.kubernetes.io/name=" + APP_NAME); 093 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 094 K8sV1ConfigMapStub.CONTEXT)) { 095 for (var resStub : K8sDynamicStub.list(client(), context, 096 namespace(), opts)) { 097 String instance = resStub.model() 098 .map(m -> m.metadata().getName()).orElse("(unknown)"); 099 if (!known.contains(instance)) { 100 resStub.delete(); 101 } 102 } 103 } 104 } 105 106 @Override 107 protected void handleChange(K8sClient client, 108 Watch.Response<VmDefinitionModel> response) { 109 V1ObjectMeta metadata = response.object.getMetadata(); 110 VmChannel channel = channel(metadata.getName()).orElse(null); 111 if (channel == null) { 112 return; 113 } 114 115 // Get full definition and associate with channel as backup 116 var vmDef = response.object; 117 if (vmDef.data() == null) { 118 // ADDED event does not provide data, see 119 // https://github.com/kubernetes-client/java/issues/3215 120 vmDef = getModel(client, vmDef); 121 } 122 if (vmDef.data() != null) { 123 // New data, augment and save 124 addDynamicData(channel.client(), vmDef, channel.vmDefinition()); 125 channel.setVmDefinition(vmDef); 126 } else { 127 // Reuse cached 128 vmDef = channel.vmDefinition(); 129 } 130 if (vmDef == null) { 131 logger.warning( 132 () -> "Cannot get model for " + response.object.getMetadata()); 133 return; 134 } 135 136 // Create and fire event 137 channel.pipeline() 138 .fire(new VmDefChanged(ResponseType.valueOf(response.type), 139 channel.setGeneration( 140 response.object.getMetadata().getGeneration()), 141 vmDef), channel); 142 } 143 144 private VmDefinitionModel getModel(K8sClient client, 145 VmDefinitionModel vmDef) { 146 try { 147 return VmDefinitionStub.get(client, context(), namespace(), 148 vmDef.metadata().getName()).model().orElse(null); 149 } catch (ApiException e) { 150 return null; 151 } 152 } 153 154 private void addDynamicData(K8sClient client, VmDefinitionModel vmState, 155 VmDefinitionModel prevState) { 156 var rootNode = GsonPtr.to(vmState.data()).get(JsonObject.class); 157 158 // Maintain (or initialize) the resetCount 159 rootNode.addProperty("resetCount", Optional.ofNullable(prevState) 160 .map(ps -> GsonPtr.to(ps.data())) 161 .flatMap(d -> d.getAsLong("resetCount")).orElse(0L)); 162 163 // Add defaults in case the VM is not running 164 rootNode.addProperty("nodeName", ""); 165 rootNode.addProperty("nodeAddress", ""); 166 167 // VM definition status changes before the pod terminates. 168 // This results in pod information being shown for a stopped 169 // VM which is irritating. So check condition first. 170 var isRunning = GsonPtr.to(rootNode).to("status", "conditions") 171 .get(JsonArray.class) 172 .asList().stream().filter(el -> "Running" 173 .equals(((JsonObject) el).get("type").getAsString())) 174 .findFirst().map(el -> "True" 175 .equals(((JsonObject) el).get("status").getAsString())) 176 .orElse(false); 177 if (!isRunning) { 178 return; 179 } 180 var podSearch = new ListOptions(); 181 podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME 182 + ",app.kubernetes.io/component=" + APP_NAME 183 + ",app.kubernetes.io/instance=" + vmState.getMetadata().getName()); 184 try { 185 var podList 186 = K8sV1PodStub.list(client, namespace(), podSearch); 187 for (var podStub : podList) { 188 var nodeName = podStub.model().get().getSpec().getNodeName(); 189 rootNode.addProperty("nodeName", nodeName); 190 logger.fine(() -> "Added node name " + nodeName 191 + " to VM info for " + vmState.getMetadata().getName()); 192 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 193 var addrs = new JsonArray(); 194 podStub.model().get().getStatus().getPodIPs().stream() 195 .map(ip -> ip.getIp()).forEach(addrs::add); 196 rootNode.add("nodeAddresses", addrs); 197 logger.fine(() -> "Added node addresses " + addrs 198 + " to VM info for " + vmState.getMetadata().getName()); 199 } 200 } catch (ApiException e) { 201 logger.log(Level.WARNING, e, 202 () -> "Cannot access node information: " + e.getMessage()); 203 } 204 } 205}