001/* 002 * VM-Operator 003 * Copyright (C) 2023,2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import io.kubernetes.client.openapi.ApiException; 022import io.kubernetes.client.openapi.models.V1ObjectMeta; 023import io.kubernetes.client.util.Watch; 024import io.kubernetes.client.util.generic.options.ListOptions; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.Set; 032import java.util.logging.Level; 033import java.util.stream.Collectors; 034import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP; 035import org.jdrupes.vmoperator.common.K8s; 036import org.jdrupes.vmoperator.common.K8sClient; 037import org.jdrupes.vmoperator.common.K8sDynamicStub; 038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 039import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 040import org.jdrupes.vmoperator.common.K8sV1PodStub; 041import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 042import org.jdrupes.vmoperator.common.VmDefinition; 043import org.jdrupes.vmoperator.common.VmDefinitionModel; 044import org.jdrupes.vmoperator.common.VmDefinitionModels; 045import org.jdrupes.vmoperator.common.VmDefinitionStub; 046import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 047import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM; 048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 049import org.jdrupes.vmoperator.manager.events.ChannelManager; 050import org.jdrupes.vmoperator.manager.events.VmChannel; 051import org.jdrupes.vmoperator.manager.events.VmDefChanged; 052import org.jdrupes.vmoperator.util.DataPath; 053import org.jgrapes.core.Channel; 054import org.jgrapes.core.Event; 055 056/** 057 * Watches for changes of VM definitions. 058 */ 059@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 060public class VmMonitor extends 061 AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> { 062 063 private final ChannelManager<String, VmChannel, ?> channelManager; 064 065 /** 066 * Instantiates a new VM definition watcher. 067 * 068 * @param componentChannel the component channel 069 * @param channelManager the channel manager 070 */ 071 public VmMonitor(Channel componentChannel, 072 ChannelManager<String, VmChannel, ?> channelManager) { 073 super(componentChannel, VmDefinitionModel.class, 074 VmDefinitionModels.class); 075 this.channelManager = channelManager; 076 } 077 078 @Override 079 protected void prepareMonitoring() throws IOException, ApiException { 080 client(new K8sClient()); 081 082 // Get all our API versions 083 var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM); 084 if (ctx.isEmpty()) { 085 logger.severe(() -> "Cannot get CRD context."); 086 return; 087 } 088 context(ctx.get()); 089 090 // Remove left over resources 091 purge(); 092 } 093 094 @SuppressWarnings("PMD.CognitiveComplexity") 095 private void purge() throws ApiException { 096 // Get existing CRs (VMs) 097 var known = K8sDynamicStub.list(client(), context(), namespace()) 098 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 099 ListOptions opts = new ListOptions(); 100 opts.setLabelSelector( 101 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 102 + "app.kubernetes.io/name=" + APP_NAME); 103 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 104 K8sV1ConfigMapStub.CONTEXT)) { 105 for (var resStub : K8sDynamicStub.list(client(), context, 106 namespace(), opts)) { 107 String instance = resStub.model() 108 .map(m -> m.metadata().getName()).orElse("(unknown)"); 109 if (!known.contains(instance)) { 110 resStub.delete(); 111 } 112 } 113 } 114 } 115 116 @Override 117 protected void handleChange(K8sClient client, 118 Watch.Response<VmDefinitionModel> response) { 119 V1ObjectMeta metadata = response.object.getMetadata(); 120 VmChannel channel = channelManager.channelGet(metadata.getName()); 121 122 // Remove from channel manager if deleted 123 if (ResponseType.valueOf(response.type) == ResponseType.DELETED) { 124 channelManager.remove(metadata.getName()); 125 } 126 127 // Get full definition and associate with channel as backup 128 var vmModel = response.object; 129 if (vmModel.data() == null) { 130 // ADDED event does not provide data, see 131 // https://github.com/kubernetes-client/java/issues/3215 132 vmModel = getModel(client, vmModel); 133 } 134 VmDefinition vmDef = null; 135 if (vmModel.data() != null) { 136 // New data, augment and save 137 vmDef = client.getJSON().getGson().fromJson(vmModel.data(), 138 VmDefinition.class); 139 addDynamicData(channel.client(), vmDef, channel.vmDefinition()); 140 channel.setVmDefinition(vmDef); 141 } 142 if (vmDef == null) { 143 // Reuse cached (e.g. if deleted) 144 vmDef = channel.vmDefinition(); 145 } 146 if (vmDef == null) { 147 logger.warning(() -> "Cannot get defintion for " 148 + response.object.getMetadata()); 149 return; 150 } 151 152 // Create and fire changed event. Remove channel from channel 153 // manager on completion. 154 channel.pipeline() 155 .fire(Event.onCompletion( 156 new VmDefChanged(ResponseType.valueOf(response.type), 157 channel.setGeneration(response.object.getMetadata() 158 .getGeneration()), 159 vmDef), 160 e -> { 161 if (e.type() == ResponseType.DELETED) { 162 channelManager.remove(e.vmDefinition().name()); 163 } 164 }), channel); 165 } 166 167 private VmDefinitionModel getModel(K8sClient client, 168 VmDefinitionModel vmDef) { 169 try { 170 return VmDefinitionStub.get(client, context(), namespace(), 171 vmDef.metadata().getName()).model().orElse(null); 172 } catch (ApiException e) { 173 return null; 174 } 175 } 176 177 @SuppressWarnings("PMD.AvoidDuplicateLiterals") 178 private void addDynamicData(K8sClient client, VmDefinition vmDef, 179 VmDefinition prevState) { 180 // Maintain (or initialize) the resetCount 181 vmDef.extra("resetCount", 182 Optional.ofNullable(prevState).map(d -> d.extra("resetCount")) 183 .orElse(0L)); 184 185 // Node information 186 // Add defaults in case the VM is not running 187 vmDef.extra("nodeName", ""); 188 vmDef.extra("nodeAddress", ""); 189 190 // VM definition status changes before the pod terminates. 191 // This results in pod information being shown for a stopped 192 // VM which is irritating. So check condition first. 193 @SuppressWarnings("PMD.LambdaCanBeMethodReference") 194 var isRunning 195 = vmDef.<List<Map<String, Object>>> fromStatus("conditions") 196 .orElse(Collections.emptyList()).stream() 197 .filter(cond -> DataPath.get(cond, "type") 198 .map(t -> "Running".equals(t)).orElse(false)) 199 .findFirst().map(cond -> DataPath.get(cond, "status") 200 .map(s -> "True".equals(s)).orElse(false)) 201 .orElse(false); 202 if (!isRunning) { 203 return; 204 } 205 var podSearch = new ListOptions(); 206 podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME 207 + ",app.kubernetes.io/component=" + APP_NAME 208 + ",app.kubernetes.io/instance=" + vmDef.name()); 209 try { 210 var podList 211 = K8sV1PodStub.list(client, namespace(), podSearch); 212 for (var podStub : podList) { 213 var nodeName = podStub.model().get().getSpec().getNodeName(); 214 vmDef.extra("nodeName", nodeName); 215 logger.fine(() -> "Added node name " + nodeName 216 + " to VM info for " + vmDef.name()); 217 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 218 var addrs = new ArrayList<String>(); 219 podStub.model().get().getStatus().getPodIPs().stream() 220 .map(ip -> ip.getIp()).forEach(addrs::add); 221 vmDef.extra("nodeAddresses", addrs); 222 logger.fine(() -> "Added node addresses " + addrs 223 + " to VM info for " + vmDef.name()); 224 } 225 } catch (ApiException e) { 226 logger.log(Level.WARNING, e, 227 () -> "Cannot access node information: " + e.getMessage()); 228 } 229 } 230}