001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.logging.Level;
032import org.jdrupes.vmoperator.common.K8s;
033import org.jdrupes.vmoperator.common.K8sClient;
034import org.jdrupes.vmoperator.common.K8sObserver;
035import org.jdrupes.vmoperator.manager.events.Exit;
036import org.jgrapes.core.Channel;
037import org.jgrapes.core.Component;
038import org.jgrapes.core.Components;
039import org.jgrapes.core.annotation.Handler;
040import org.jgrapes.core.events.Start;
041import org.jgrapes.core.events.Stop;
042import org.jgrapes.util.events.ConfigurationUpdate;
043
044/**
045 * A base class for monitoring VM related resources. When started,
046 * it creates observers for all versions of the the {@link APIResource}
047 * configured by {@link #context(APIResource)}. The APIResource is not
048 * passed to the constructor because in some cases it has to be
049 * evaluated lazily.
050 * 
051 * @param <O> the object type for the context
052 * @param <L> the object list type for the context
053 */
054@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis" })
055public abstract class AbstractMonitor<O extends KubernetesObject,
056        L extends KubernetesListObject, C extends Channel> extends Component {
057
058    private final Class<O> objectClass;
059    private final Class<L> objectListClass;
060    private K8sClient client;
061    private APIResource context;
062    private String namespace;
063    private ListOptions options = new ListOptions();
064    private final AtomicInteger observerCounter = new AtomicInteger(0);
065
066    /**
067     * Initializes the instance.
068     *
069     * @param componentChannel the component channel
070     * @param objectClass the class of the Kubernetes object to watch
071     * @param objectListClass the class of the list of Kubernetes objects
072     * to watch
073     */
074    protected AbstractMonitor(Channel componentChannel,
075            Class<O> objectClass, Class<L> objectListClass) {
076        super(componentChannel);
077        this.objectClass = objectClass;
078        this.objectListClass = objectListClass;
079    }
080
081    /**
082     * Return the client.
083     * 
084     * @return the client
085     */
086    public K8sClient client() {
087        return client;
088    }
089
090    /**
091     * Sets the client to be used.
092     *
093     * @param client the client
094     * @return the abstract monitor
095     */
096    public AbstractMonitor<O, L, C> client(K8sClient client) {
097        this.client = client;
098        return this;
099    }
100
101    /**
102     * Return the observed namespace.
103     * 
104     * @return the namespace
105     */
106    public String namespace() {
107        return namespace;
108    }
109
110    /**
111     * Sets the namespace to be observed.
112     *
113     * @param namespace the namespaceToWatch to set
114     * @return the abstract monitor
115     */
116    public AbstractMonitor<O, L, C> namespace(String namespace) {
117        this.namespace = namespace;
118        return this;
119    }
120
121    /**
122     * Returns the options for selecting the objects to observe.
123     * 
124     * @return the options
125     */
126    public ListOptions options() {
127        return options;
128    }
129
130    /**
131     * Sets the options for selecting the objects to observe.
132     *
133     * @param options the options to set
134     * @return the abstract monitor
135     */
136    public AbstractMonitor<O, L, C> options(ListOptions options) {
137        this.options = options;
138        return this;
139    }
140
141    /**
142     * Returns the observed context.
143     * 
144     * @return the context
145     */
146    public APIResource context() {
147        return context;
148    }
149
150    /**
151     * Sets the context to observe.
152     *
153     * @param context the context
154     * @return the abstract monitor
155     */
156    public AbstractMonitor<O, L, C> context(APIResource context) {
157        this.context = context;
158        return this;
159    }
160
161    /**
162     * Looks for a key "namespace" in the configuration and, if found,
163     * sets the namespace to its value.
164     *
165     * @param event the event
166     */
167    @Handler
168    public void onConfigurationUpdate(ConfigurationUpdate event) {
169        event.structured(Components.manager(parent()).componentPath())
170            .ifPresent(c -> {
171                if (c.containsKey("namespace")) {
172                    namespace = (String) c.get("namespace");
173                }
174            });
175    }
176
177    /**
178     * Handle the start event. Configures the namespace, invokes
179     * {@link #prepareMonitoring()} and starts the observers.
180     *
181     * @param event the event
182     */
183    @Handler(priority = 10)
184    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
185    public void onStart(Start event) {
186        try {
187            // Get namespace
188            if (namespace == null) {
189                var path = Path
190                    .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
191                if (Files.isReadable(path)) {
192                    namespace
193                        = Files.lines(path).findFirst().orElse(null);
194                }
195            }
196
197            // Additional preparations by derived class
198            prepareMonitoring();
199            assert client != null;
200            assert context != null;
201            assert namespace != null;
202            logger.fine(() -> "Observing " + K8s.toString(context)
203                + " objects in " + namespace);
204
205            // Monitor all versions
206            for (var version : context.getVersions()) {
207                createObserver(version);
208            }
209            registerAsGenerator();
210        } catch (IOException | ApiException e) {
211            logger.log(Level.SEVERE, e,
212                () -> "Cannot watch VMs, terminating.");
213            event.cancel(true);
214            fire(new Exit(1));
215        }
216    }
217
218    private void createObserver(String version) {
219        observerCounter.incrementAndGet();
220        new K8sObserver<>(objectClass, objectListClass, client,
221            K8s.preferred(context, version), namespace, options)
222                .handler((c, r) -> {
223                    handleChange(c, r);
224                }).onTerminated((o, t) -> {
225                    if (observerCounter.decrementAndGet() == 0) {
226                        unregisterAsGenerator();
227                    }
228                    // Exception has been logged already
229                    if (t != null) {
230                        fire(new Stop());
231                    }
232                }).start();
233    }
234
235    /**
236     * Invoked by {@link #onStart(Start)} after the namespace has
237     * been configured and before starting the observer. This is
238     * the last opportunity to invoke {@link #context(APIResource)}.
239     *
240     * @throws IOException Signals that an I/O exception has occurred.
241     * @throws ApiException the api exception
242     */
243    @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
244    protected void prepareMonitoring() throws IOException, ApiException {
245        // To be overridden by derived class.
246    }
247
248    /**
249     * Handle an observed change.
250     *
251     * @param client the client
252     * @param change the change
253     */
254    protected abstract void handleChange(K8sClient client, Response<O> change);
255}