001/*
002 * VM-Operator
003 * Copyright (C) 2023,2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonArray;
022import com.google.gson.JsonObject;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.openapi.models.V1ObjectMeta;
025import io.kubernetes.client.util.Watch;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.util.Optional;
029import java.util.Set;
030import java.util.logging.Level;
031import java.util.stream.Collectors;
032import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
033import org.jdrupes.vmoperator.common.K8s;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sDynamicStub;
036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
037import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
038import org.jdrupes.vmoperator.common.K8sV1PodStub;
039import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
040import org.jdrupes.vmoperator.common.VmDefinitionModel;
041import org.jdrupes.vmoperator.common.VmDefinitionModels;
042import org.jdrupes.vmoperator.common.VmDefinitionStub;
043import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
044import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
045import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
046import org.jdrupes.vmoperator.manager.events.ChannelManager;
047import org.jdrupes.vmoperator.manager.events.VmChannel;
048import org.jdrupes.vmoperator.manager.events.VmDefChanged;
049import org.jdrupes.vmoperator.util.GsonPtr;
050import org.jgrapes.core.Channel;
051import org.jgrapes.core.Event;
052
053/**
054 * Watches for changes of VM definitions.
055 */
056@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
057public class VmMonitor extends
058        AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> {
059
060    private final ChannelManager<String, VmChannel, ?> channelManager;
061
062    /**
063     * Instantiates a new VM definition watcher.
064     *
065     * @param componentChannel the component channel
066     * @param channelManager the channel manager
067     */
068    public VmMonitor(Channel componentChannel,
069            ChannelManager<String, VmChannel, ?> channelManager) {
070        super(componentChannel, VmDefinitionModel.class,
071            VmDefinitionModels.class);
072        this.channelManager = channelManager;
073    }
074
075    @Override
076    protected void prepareMonitoring() throws IOException, ApiException {
077        client(new K8sClient());
078
079        // Get all our API versions
080        var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM);
081        if (ctx.isEmpty()) {
082            logger.severe(() -> "Cannot get CRD context.");
083            return;
084        }
085        context(ctx.get());
086
087        // Remove left over resources
088        purge();
089    }
090
091    @SuppressWarnings("PMD.CognitiveComplexity")
092    private void purge() throws ApiException {
093        // Get existing CRs (VMs)
094        var known = K8sDynamicStub.list(client(), context(), namespace())
095            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
096        ListOptions opts = new ListOptions();
097        opts.setLabelSelector(
098            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
099                + "app.kubernetes.io/name=" + APP_NAME);
100        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
101            K8sV1ConfigMapStub.CONTEXT)) {
102            for (var resStub : K8sDynamicStub.list(client(), context,
103                namespace(), opts)) {
104                String instance = resStub.model()
105                    .map(m -> m.metadata().getName()).orElse("(unknown)");
106                if (!known.contains(instance)) {
107                    resStub.delete();
108                }
109            }
110        }
111    }
112
113    @Override
114    protected void handleChange(K8sClient client,
115            Watch.Response<VmDefinitionModel> response) {
116        V1ObjectMeta metadata = response.object.getMetadata();
117        VmChannel channel = channelManager.channelGet(metadata.getName());
118
119        // Get full definition and associate with channel as backup
120        var vmDef = response.object;
121        if (vmDef.data() == null) {
122            // ADDED event does not provide data, see
123            // https://github.com/kubernetes-client/java/issues/3215
124            vmDef = getModel(client, vmDef);
125        }
126        if (vmDef.data() != null) {
127            // New data, augment and save
128            addDynamicData(channel.client(), vmDef, channel.vmDefinition());
129            channel.setVmDefinition(vmDef);
130        } else {
131            // Reuse cached
132            vmDef = channel.vmDefinition();
133        }
134        if (vmDef == null) {
135            logger.warning(
136                () -> "Cannot get model for " + response.object.getMetadata());
137            return;
138        }
139        if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
140            channelManager.remove(metadata.getName());
141        }
142
143        // Create and fire changed event. Remove channel from channel
144        // manager on completion.
145        channel.pipeline()
146            .fire(Event.onCompletion(
147                new VmDefChanged(ResponseType.valueOf(response.type),
148                    channel.setGeneration(
149                        response.object.getMetadata().getGeneration()),
150                    vmDef),
151                e -> {
152                    if (e.type() == ResponseType.DELETED) {
153                        channelManager
154                            .remove(e.vmDefinition().metadata().getName());
155                    }
156                }), channel);
157    }
158
159    private VmDefinitionModel getModel(K8sClient client,
160            VmDefinitionModel vmDef) {
161        try {
162            return VmDefinitionStub.get(client, context(), namespace(),
163                vmDef.metadata().getName()).model().orElse(null);
164        } catch (ApiException e) {
165            return null;
166        }
167    }
168
169    private void addDynamicData(K8sClient client, VmDefinitionModel vmState,
170            VmDefinitionModel prevState) {
171        var rootNode = GsonPtr.to(vmState.data()).get(JsonObject.class);
172
173        // Maintain (or initialize) the resetCount
174        rootNode.addProperty("resetCount", Optional.ofNullable(prevState)
175            .map(ps -> GsonPtr.to(ps.data()))
176            .flatMap(d -> d.getAsLong("resetCount")).orElse(0L));
177
178        // Add defaults in case the VM is not running
179        rootNode.addProperty("nodeName", "");
180        rootNode.addProperty("nodeAddress", "");
181
182        // VM definition status changes before the pod terminates.
183        // This results in pod information being shown for a stopped
184        // VM which is irritating. So check condition first.
185        var isRunning = GsonPtr.to(rootNode).to("status", "conditions")
186            .get(JsonArray.class)
187            .asList().stream().filter(el -> "Running"
188                .equals(((JsonObject) el).get("type").getAsString()))
189            .findFirst().map(el -> "True"
190                .equals(((JsonObject) el).get("status").getAsString()))
191            .orElse(false);
192        if (!isRunning) {
193            return;
194        }
195        var podSearch = new ListOptions();
196        podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
197            + ",app.kubernetes.io/component=" + APP_NAME
198            + ",app.kubernetes.io/instance=" + vmState.getMetadata().getName());
199        try {
200            var podList
201                = K8sV1PodStub.list(client, namespace(), podSearch);
202            for (var podStub : podList) {
203                var nodeName = podStub.model().get().getSpec().getNodeName();
204                rootNode.addProperty("nodeName", nodeName);
205                logger.fine(() -> "Added node name " + nodeName
206                    + " to VM info for " + vmState.getMetadata().getName());
207                @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
208                var addrs = new JsonArray();
209                podStub.model().get().getStatus().getPodIPs().stream()
210                    .map(ip -> ip.getIp()).forEach(addrs::add);
211                rootNode.add("nodeAddresses", addrs);
212                logger.fine(() -> "Added node addresses " + addrs
213                    + " to VM info for " + vmState.getMetadata().getName());
214            }
215        } catch (ApiException e) {
216            logger.log(Level.WARNING, e,
217                () -> "Cannot access node information: " + e.getMessage());
218        }
219    }
220}