001/*
002 * VM-Operator
003 * Copyright (C) 2023,2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonArray;
022import com.google.gson.JsonObject;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.openapi.models.V1ObjectMeta;
025import io.kubernetes.client.util.Watch;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.util.Optional;
029import java.util.Set;
030import java.util.logging.Level;
031import java.util.stream.Collectors;
032import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
033import org.jdrupes.vmoperator.common.K8s;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sDynamicStub;
036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
037import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
038import org.jdrupes.vmoperator.common.K8sV1PodStub;
039import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
040import org.jdrupes.vmoperator.common.VmDefinitionModel;
041import org.jdrupes.vmoperator.common.VmDefinitionModels;
042import org.jdrupes.vmoperator.common.VmDefinitionStub;
043import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
044import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
045import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
046import org.jdrupes.vmoperator.manager.events.VmChannel;
047import org.jdrupes.vmoperator.manager.events.VmDefChanged;
048import org.jdrupes.vmoperator.util.GsonPtr;
049import org.jgrapes.core.Channel;
050
051/**
052 * Watches for changes of VM definitions.
053 */
054@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
055public class VmMonitor extends
056        AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> {
057
058    /**
059     * Instantiates a new VM definition watcher.
060     *
061     * @param componentChannel the component channel
062     */
063    public VmMonitor(Channel componentChannel) {
064        super(componentChannel, VmDefinitionModel.class,
065            VmDefinitionModels.class);
066    }
067
068    @Override
069    protected void prepareMonitoring() throws IOException, ApiException {
070        client(new K8sClient());
071
072        // Get all our API versions
073        var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM);
074        if (ctx.isEmpty()) {
075            logger.severe(() -> "Cannot get CRD context.");
076            return;
077        }
078        context(ctx.get());
079
080        // Remove left over resources
081        purge();
082    }
083
084    @SuppressWarnings("PMD.CognitiveComplexity")
085    private void purge() throws ApiException {
086        // Get existing CRs (VMs)
087        var known = K8sDynamicStub.list(client(), context(), namespace())
088            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
089        ListOptions opts = new ListOptions();
090        opts.setLabelSelector(
091            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
092                + "app.kubernetes.io/name=" + APP_NAME);
093        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
094            K8sV1ConfigMapStub.CONTEXT)) {
095            for (var resStub : K8sDynamicStub.list(client(), context,
096                namespace(), opts)) {
097                String instance = resStub.model()
098                    .map(m -> m.metadata().getName()).orElse("(unknown)");
099                if (!known.contains(instance)) {
100                    resStub.delete();
101                }
102            }
103        }
104    }
105
106    @Override
107    protected void handleChange(K8sClient client,
108            Watch.Response<VmDefinitionModel> response) {
109        V1ObjectMeta metadata = response.object.getMetadata();
110        VmChannel channel = channel(metadata.getName()).orElse(null);
111        if (channel == null) {
112            return;
113        }
114
115        // Get full definition and associate with channel as backup
116        var vmDef = response.object;
117        if (vmDef.data() == null) {
118            // ADDED event does not provide data, see
119            // https://github.com/kubernetes-client/java/issues/3215
120            vmDef = getModel(client, vmDef);
121        }
122        if (vmDef.data() != null) {
123            // New data, augment and save
124            addDynamicData(channel.client(), vmDef, channel.vmDefinition());
125            channel.setVmDefinition(vmDef);
126        } else {
127            // Reuse cached
128            vmDef = channel.vmDefinition();
129        }
130        if (vmDef == null) {
131            logger.warning(
132                () -> "Cannot get model for " + response.object.getMetadata());
133            return;
134        }
135
136        // Create and fire event
137        channel.pipeline()
138            .fire(new VmDefChanged(ResponseType.valueOf(response.type),
139                channel.setGeneration(
140                    response.object.getMetadata().getGeneration()),
141                vmDef), channel);
142    }
143
144    private VmDefinitionModel getModel(K8sClient client,
145            VmDefinitionModel vmDef) {
146        try {
147            return VmDefinitionStub.get(client, context(), namespace(),
148                vmDef.metadata().getName()).model().orElse(null);
149        } catch (ApiException e) {
150            return null;
151        }
152    }
153
154    private void addDynamicData(K8sClient client, VmDefinitionModel vmState,
155            VmDefinitionModel prevState) {
156        var rootNode = GsonPtr.to(vmState.data()).get(JsonObject.class);
157
158        // Maintain (or initialize) the resetCount
159        rootNode.addProperty("resetCount", Optional.ofNullable(prevState)
160            .map(ps -> GsonPtr.to(ps.data()))
161            .flatMap(d -> d.getAsLong("resetCount")).orElse(0L));
162
163        // Add defaults in case the VM is not running
164        rootNode.addProperty("nodeName", "");
165        rootNode.addProperty("nodeAddress", "");
166
167        // VM definition status changes before the pod terminates.
168        // This results in pod information being shown for a stopped
169        // VM which is irritating. So check condition first.
170        var isRunning = GsonPtr.to(rootNode).to("status", "conditions")
171            .get(JsonArray.class)
172            .asList().stream().filter(el -> "Running"
173                .equals(((JsonObject) el).get("type").getAsString()))
174            .findFirst().map(el -> "True"
175                .equals(((JsonObject) el).get("status").getAsString()))
176            .orElse(false);
177        if (!isRunning) {
178            return;
179        }
180        var podSearch = new ListOptions();
181        podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
182            + ",app.kubernetes.io/component=" + APP_NAME
183            + ",app.kubernetes.io/instance=" + vmState.getMetadata().getName());
184        try {
185            var podList
186                = K8sV1PodStub.list(client, namespace(), podSearch);
187            for (var podStub : podList) {
188                var nodeName = podStub.model().get().getSpec().getNodeName();
189                rootNode.addProperty("nodeName", nodeName);
190                logger.fine(() -> "Added node name " + nodeName
191                    + " to VM info for " + vmState.getMetadata().getName());
192                @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
193                var addrs = new JsonArray();
194                podStub.model().get().getStatus().getPodIPs().stream()
195                    .map(ip -> ip.getIp()).forEach(addrs::add);
196                rootNode.add("nodeAddresses", addrs);
197                logger.fine(() -> "Added node addresses " + addrs
198                    + " to VM info for " + vmState.getMetadata().getName());
199            }
200        } catch (ApiException e) {
201            logger.log(Level.WARNING, e,
202                () -> "Cannot access node information: " + e.getMessage());
203        }
204    }
205}