001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.Optional;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.logging.Level;
033import org.jdrupes.vmoperator.common.K8s;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sObserver;
036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
037import org.jdrupes.vmoperator.manager.events.ChannelManager;
038import org.jdrupes.vmoperator.manager.events.Exit;
039import org.jgrapes.core.Channel;
040import org.jgrapes.core.Component;
041import org.jgrapes.core.Components;
042import org.jgrapes.core.annotation.Handler;
043import org.jgrapes.core.events.Start;
044import org.jgrapes.core.events.Stop;
045import org.jgrapes.util.events.ConfigurationUpdate;
046
047/**
048 * A base class for monitoring VM related resources.
049 * 
050 * @param <O> the object type for the context
051 * @param <L> the object list type for the context
052 */
053@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis" })
054public abstract class AbstractMonitor<O extends KubernetesObject,
055        L extends KubernetesListObject, C extends Channel> extends Component {
056
057    private final Class<O> objectClass;
058    private final Class<L> objectListClass;
059    private K8sClient client;
060    private APIResource context;
061    private String namespace;
062    private ListOptions options = new ListOptions();
063    private final AtomicInteger observerCounter = new AtomicInteger(0);
064    private ChannelManager<String, C, ?> channelManager;
065    private boolean channelManagerMaster;
066
067    /**
068     * Initializes the instance.
069     *
070     * @param componentChannel the component channel
071     */
072    protected AbstractMonitor(Channel componentChannel, Class<O> objectClass,
073            Class<L> objectListClass) {
074        super(componentChannel);
075        this.objectClass = objectClass;
076        this.objectListClass = objectListClass;
077    }
078
079    /**
080     * Return the client.
081     * 
082     * @return the client
083     */
084    public K8sClient client() {
085        return client;
086    }
087
088    /**
089     * Sets the client to be used.
090     *
091     * @param client the client
092     * @return the abstract monitor
093     */
094    public AbstractMonitor<O, L, C> client(K8sClient client) {
095        this.client = client;
096        return this;
097    }
098
099    /**
100     * Return the observed namespace.
101     * 
102     * @return the namespace
103     */
104    public String namespace() {
105        return namespace;
106    }
107
108    /**
109     * Sets the namespace to be observed.
110     *
111     * @param namespace the namespaceToWatch to set
112     * @return the abstract monitor
113     */
114    public AbstractMonitor<O, L, C> namespace(String namespace) {
115        this.namespace = namespace;
116        return this;
117    }
118
119    /**
120     * Returns the options for selecting the objects to observe.
121     * 
122     * @return the options
123     */
124    public ListOptions options() {
125        return options;
126    }
127
128    /**
129     * Sets the options for selecting the objects to observe.
130     *
131     * @param options the options to set
132     * @return the abstract monitor
133     */
134    public AbstractMonitor<O, L, C> options(ListOptions options) {
135        this.options = options;
136        return this;
137    }
138
139    /**
140     * Returns the observed context.
141     * 
142     * @return the context
143     */
144    public APIResource context() {
145        return context;
146    }
147
148    /**
149     * Sets the context to observe.
150     *
151     * @param context the context
152     * @return the abstract monitor
153     */
154    public AbstractMonitor<O, L, C> context(APIResource context) {
155        this.context = context;
156        return this;
157    }
158
159    /**
160     * Returns the channel manager.
161     * 
162     * @return the context
163     */
164    public ChannelManager<String, C, ?> channelManager() {
165        return channelManager;
166    }
167
168    /**
169     * Sets the channel manager.
170     *
171     * @param channelManager the channel manager
172     * @return the abstract monitor
173     */
174    public AbstractMonitor<O, L, C>
175            channelManager(ChannelManager<String, C, ?> channelManager) {
176        this.channelManager = channelManager;
177        return this;
178    }
179
180    /**
181     * Looks for a key "namespace" in the configuration and, if found,
182     * sets the namespace to its value.
183     *
184     * @param event the event
185     */
186    @Handler
187    public void onConfigurationUpdate(ConfigurationUpdate event) {
188        event.structured(Components.manager(parent()).componentPath())
189            .ifPresent(c -> {
190                if (c.containsKey("namespace")) {
191                    namespace = (String) c.get("namespace");
192                }
193            });
194    }
195
196    /**
197     * Handle the start event. Configures the namespace invokes
198     * {@link #prepareMonitoring()} and starts the observers.
199     *
200     * @param event the event
201     */
202    @Handler(priority = 10)
203    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
204    public void onStart(Start event) {
205        try {
206            // Get namespace
207            if (namespace == null) {
208                var path = Path
209                    .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
210                if (Files.isReadable(path)) {
211                    namespace
212                        = Files.lines(path).findFirst().orElse(null);
213                }
214            }
215
216            // Additional preparations by derived class
217            prepareMonitoring();
218            assert client != null;
219            assert context != null;
220            assert namespace != null;
221            logger.fine(() -> "Observing " + K8s.toString(context)
222                + " objects in " + namespace);
223
224            // Monitor all versions
225            for (var version : context.getVersions()) {
226                createObserver(version);
227            }
228            registerAsGenerator();
229        } catch (IOException | ApiException e) {
230            logger.log(Level.SEVERE, e,
231                () -> "Cannot watch VMs, terminating.");
232            event.cancel(true);
233            fire(new Exit(1));
234        }
235    }
236
237    private void createObserver(String version) {
238        observerCounter.incrementAndGet();
239        new K8sObserver<>(objectClass, objectListClass, client,
240            K8s.preferred(context, version), namespace, options)
241                .handler((c, r) -> {
242                    handleChange(c, r);
243                    if (ResponseType.valueOf(r.type) == ResponseType.DELETED
244                        && channelManagerMaster) {
245                        channelManager.remove(r.object.getMetadata().getName());
246                    }
247                }).onTerminated((o, t) -> {
248                    if (observerCounter.decrementAndGet() == 0) {
249                        unregisterAsGenerator();
250                    }
251                    // Exception has been logged already
252                    if (t != null) {
253                        fire(new Stop());
254                    }
255                }).start();
256    }
257
258    /**
259     * Invoked by {@link #onStart(Start)} after the namespace has
260     * been configured and before starting the observer.
261     *
262     * @throws IOException Signals that an I/O exception has occurred.
263     * @throws ApiException the api exception
264     */
265    @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
266    protected void prepareMonitoring() throws IOException, ApiException {
267        // To be overridden by derived class.
268    }
269
270    /**
271     * Handle an observed change.
272     *
273     * @param client the client
274     * @param change the change
275     */
276    protected abstract void handleChange(K8sClient client, Response<O> change);
277
278    /**
279     * Returns the {@link Channel} for the given name.
280     *
281     * @param name the name
282     * @return the channel used for events related to the specified object
283     */
284    protected Optional<C> channel(String name) {
285        return channelManager.getChannel(name);
286    }
287}