001/*
002 * VM-Operator
003 * Copyright (C) 2023,2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import io.kubernetes.client.openapi.ApiException;
022import io.kubernetes.client.openapi.models.V1ObjectMeta;
023import io.kubernetes.client.util.Watch;
024import io.kubernetes.client.util.generic.options.ListOptions;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.Set;
032import java.util.logging.Level;
033import java.util.stream.Collectors;
034import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
035import org.jdrupes.vmoperator.common.K8s;
036import org.jdrupes.vmoperator.common.K8sClient;
037import org.jdrupes.vmoperator.common.K8sDynamicStub;
038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
039import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
040import org.jdrupes.vmoperator.common.K8sV1PodStub;
041import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
042import org.jdrupes.vmoperator.common.VmDefinition;
043import org.jdrupes.vmoperator.common.VmDefinitionModel;
044import org.jdrupes.vmoperator.common.VmDefinitionModels;
045import org.jdrupes.vmoperator.common.VmDefinitionStub;
046import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
047import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
049import org.jdrupes.vmoperator.manager.events.ChannelManager;
050import org.jdrupes.vmoperator.manager.events.VmChannel;
051import org.jdrupes.vmoperator.manager.events.VmDefChanged;
052import org.jdrupes.vmoperator.util.DataPath;
053import org.jgrapes.core.Channel;
054import org.jgrapes.core.Event;
055
056/**
057 * Watches for changes of VM definitions.
058 */
059@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
060public class VmMonitor extends
061        AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> {
062
063    private final ChannelManager<String, VmChannel, ?> channelManager;
064
065    /**
066     * Instantiates a new VM definition watcher.
067     *
068     * @param componentChannel the component channel
069     * @param channelManager the channel manager
070     */
071    public VmMonitor(Channel componentChannel,
072            ChannelManager<String, VmChannel, ?> channelManager) {
073        super(componentChannel, VmDefinitionModel.class,
074            VmDefinitionModels.class);
075        this.channelManager = channelManager;
076    }
077
078    @Override
079    protected void prepareMonitoring() throws IOException, ApiException {
080        client(new K8sClient());
081
082        // Get all our API versions
083        var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM);
084        if (ctx.isEmpty()) {
085            logger.severe(() -> "Cannot get CRD context.");
086            return;
087        }
088        context(ctx.get());
089
090        // Remove left over resources
091        purge();
092    }
093
094    @SuppressWarnings("PMD.CognitiveComplexity")
095    private void purge() throws ApiException {
096        // Get existing CRs (VMs)
097        var known = K8sDynamicStub.list(client(), context(), namespace())
098            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
099        ListOptions opts = new ListOptions();
100        opts.setLabelSelector(
101            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
102                + "app.kubernetes.io/name=" + APP_NAME);
103        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
104            K8sV1ConfigMapStub.CONTEXT)) {
105            for (var resStub : K8sDynamicStub.list(client(), context,
106                namespace(), opts)) {
107                String instance = resStub.model()
108                    .map(m -> m.metadata().getName()).orElse("(unknown)");
109                if (!known.contains(instance)) {
110                    resStub.delete();
111                }
112            }
113        }
114    }
115
116    @Override
117    protected void handleChange(K8sClient client,
118            Watch.Response<VmDefinitionModel> response) {
119        V1ObjectMeta metadata = response.object.getMetadata();
120        VmChannel channel = channelManager.channelGet(metadata.getName());
121
122        // Remove from channel manager if deleted
123        if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
124            channelManager.remove(metadata.getName());
125        }
126
127        // Get full definition and associate with channel as backup
128        var vmModel = response.object;
129        if (vmModel.data() == null) {
130            // ADDED event does not provide data, see
131            // https://github.com/kubernetes-client/java/issues/3215
132            vmModel = getModel(client, vmModel);
133        }
134        VmDefinition vmDef = null;
135        if (vmModel.data() != null) {
136            // New data, augment and save
137            vmDef = client.getJSON().getGson().fromJson(vmModel.data(),
138                VmDefinition.class);
139            addDynamicData(channel.client(), vmDef, channel.vmDefinition());
140            channel.setVmDefinition(vmDef);
141        }
142        if (vmDef == null) {
143            // Reuse cached (e.g. if deleted)
144            vmDef = channel.vmDefinition();
145        }
146        if (vmDef == null) {
147            logger.warning(() -> "Cannot get defintion for "
148                + response.object.getMetadata());
149            return;
150        }
151
152        // Create and fire changed event. Remove channel from channel
153        // manager on completion.
154        channel.pipeline()
155            .fire(Event.onCompletion(
156                new VmDefChanged(ResponseType.valueOf(response.type),
157                    channel.setGeneration(response.object.getMetadata()
158                        .getGeneration()),
159                    vmDef),
160                e -> {
161                    if (e.type() == ResponseType.DELETED) {
162                        channelManager.remove(e.vmDefinition().name());
163                    }
164                }), channel);
165    }
166
167    private VmDefinitionModel getModel(K8sClient client,
168            VmDefinitionModel vmDef) {
169        try {
170            return VmDefinitionStub.get(client, context(), namespace(),
171                vmDef.metadata().getName()).model().orElse(null);
172        } catch (ApiException e) {
173            return null;
174        }
175    }
176
177    @SuppressWarnings("PMD.AvoidDuplicateLiterals")
178    private void addDynamicData(K8sClient client, VmDefinition vmDef,
179            VmDefinition prevState) {
180        // Maintain (or initialize) the resetCount
181        vmDef.extra("resetCount",
182            Optional.ofNullable(prevState).map(d -> d.extra("resetCount"))
183                .orElse(0L));
184
185        // Node information
186        // Add defaults in case the VM is not running
187        vmDef.extra("nodeName", "");
188        vmDef.extra("nodeAddress", "");
189
190        // VM definition status changes before the pod terminates.
191        // This results in pod information being shown for a stopped
192        // VM which is irritating. So check condition first.
193        @SuppressWarnings("PMD.LambdaCanBeMethodReference")
194        var isRunning
195            = vmDef.<List<Map<String, Object>>> fromStatus("conditions")
196                .orElse(Collections.emptyList()).stream()
197                .filter(cond -> DataPath.get(cond, "type")
198                    .map(t -> "Running".equals(t)).orElse(false))
199                .findFirst().map(cond -> DataPath.get(cond, "status")
200                    .map(s -> "True".equals(s)).orElse(false))
201                .orElse(false);
202        if (!isRunning) {
203            return;
204        }
205        var podSearch = new ListOptions();
206        podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
207            + ",app.kubernetes.io/component=" + APP_NAME
208            + ",app.kubernetes.io/instance=" + vmDef.name());
209        try {
210            var podList
211                = K8sV1PodStub.list(client, namespace(), podSearch);
212            for (var podStub : podList) {
213                var nodeName = podStub.model().get().getSpec().getNodeName();
214                vmDef.extra("nodeName", nodeName);
215                logger.fine(() -> "Added node name " + nodeName
216                    + " to VM info for " + vmDef.name());
217                @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
218                var addrs = new ArrayList<String>();
219                podStub.model().get().getStatus().getPodIPs().stream()
220                    .map(ip -> ip.getIp()).forEach(addrs::add);
221                vmDef.extra("nodeAddresses", addrs);
222                logger.fine(() -> "Added node addresses " + addrs
223                    + " to VM info for " + vmDef.name());
224            }
225        } catch (ApiException e) {
226            logger.log(Level.WARNING, e,
227                () -> "Cannot access node information: " + e.getMessage());
228        }
229    }
230}