001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.common;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.GenericKubernetesApi;
027import io.kubernetes.client.util.generic.options.ListOptions;
028import java.time.Duration;
029import java.time.Instant;
030import java.util.function.BiConsumer;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * An observer that watches namespaced resources in a given context and
036 * invokes a handler on changes.
037 *
038 * @param <O> the object type for the context
039 * @param <L> the object list type for the context
040 */
041public class K8sObserver<O extends KubernetesObject,
042        L extends KubernetesListObject> {
043
044    /**
045     * The type of change reported by {@link Response} as enum.
046     */
047    public enum ResponseType {
048        ADDED, MODIFIED, DELETED
049    }
050
051    @SuppressWarnings("PMD.FieldNamingConventions")
052    protected final Logger logger = Logger.getLogger(getClass().getName());
053
054    protected final K8sClient client;
055    protected final GenericKubernetesApi<O, L> api;
056    protected final APIResource context;
057    protected final String namespace;
058    protected final ListOptions options;
059    protected final Thread thread;
060    protected BiConsumer<K8sClient, Response<O>> handler;
061    protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated;
062
063    /**
064     * Create and start a new observer for objects in the given context 
065     * (using preferred version) and namespace with the given options.
066     *
067     * @param objectClass the object class
068     * @param objectListClass the object list class
069     * @param client the client
070     * @param context the context
071     * @param namespace the namespace
072     * @param options the options
073     */
074    @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop",
075        "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable",
076        "PMD.CognitiveComplexity" })
077    public K8sObserver(Class<O> objectClass, Class<L> objectListClass,
078            K8sClient client, APIResource context, String namespace,
079            ListOptions options) {
080        this.client = client;
081        this.context = context;
082        this.namespace = namespace;
083        this.options = options;
084
085        api = new GenericKubernetesApi<>(objectClass, objectListClass,
086            context.getGroup(), context.getPreferredVersion(),
087            context.getResourcePlural(), client);
088        thread = Thread.ofVirtual().unstarted(() -> {
089            try {
090                logger.config(() -> "Watching " + context.getResourcePlural()
091                    + " (" + context.getPreferredVersion() + ")"
092                    + " in " + namespace);
093
094                // Watch sometimes terminates without apparent reason.
095                while (!Thread.currentThread().isInterrupted()) {
096                    Instant startedAt = Instant.now();
097                    try {
098                        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
099                        var changed = api.watch(namespace, options).iterator();
100                        while (changed.hasNext()) {
101                            handler.accept(client, changed.next());
102                        }
103                    } catch (ApiException | RuntimeException e) {
104                        logger.log(Level.FINE, e, () -> "Problem watching"
105                            + " (will retry): " + e.getMessage());
106                        delayRestart(startedAt);
107                    }
108                }
109                if (onTerminated != null) {
110                    onTerminated.accept(this, null);
111                }
112            } catch (Throwable e) {
113                logger.log(Level.SEVERE, e, () -> "Probem watching: "
114                    + e.getMessage());
115                if (onTerminated != null) {
116                    onTerminated.accept(this, e);
117                }
118            }
119        });
120    }
121
122    @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
123    private void delayRestart(Instant started) {
124        var runningFor = Duration
125            .between(started, Instant.now()).toMillis();
126        if (runningFor < 5000) {
127            logger.log(Level.FINE, () -> "Waiting... ");
128            try {
129                Thread.sleep(5000 - runningFor);
130            } catch (InterruptedException e1) { // NOPMD
131                // Retry
132            }
133            logger.log(Level.FINE, () -> "Retrying");
134        }
135    }
136
137    /**
138     * Sets the handler.
139     *
140     * @param handler the handler
141     * @return the observer
142     */
143    public K8sObserver<O, L>
144            handler(BiConsumer<K8sClient, Response<O>> handler) {
145        this.handler = handler;
146        return this;
147    }
148
149    /**
150     * Sets a function to invoke if the observer terminates. First argument
151     * is this observer, the second is the throwable that caused the
152     * abnormal termination or `null` if the observer was terminated
153     * by {@link #stop()}.
154     *
155     * @param onTerminated the on terminated
156     * @return the observer
157     */
158    public K8sObserver<O, L> onTerminated(
159            BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) {
160        this.onTerminated = onTerminated;
161        return this;
162    }
163
164    /**
165     * Start the observer.
166     *
167     * @return the observer
168     */
169    public K8sObserver<O, L> start() {
170        if (handler == null) {
171            throw new IllegalStateException("No handler defined");
172        }
173        thread.start();
174        return this;
175    }
176
177    /**
178     * Stops the observer.
179     *
180     * @return the observer
181     */
182    public K8sObserver<O, L> stop() {
183        thread.interrupt();
184        return this;
185    }
186
187    /**
188     * Returns the client.
189     *
190     * @return the client
191     */
192    public K8sClient client() {
193        return client;
194    }
195
196    /**
197     * Returns the context.
198     * 
199     * @return the context
200     */
201    public APIResource context() {
202        return context;
203    }
204
205    /**
206     * Returns the observed namespace.
207     * 
208     * @return the namespace
209     */
210    public String getNamespace() {
211        return namespace;
212    }
213
214    /**
215     * Returns the options for object selection.
216     *
217     * @return the list options
218     */
219    public ListOptions options() {
220        return options;
221    }
222
223    @Override
224    @SuppressWarnings("PMD.UseLocaleWithCaseConversions")
225    public String toString() {
226        return "Observer for " + K8s.toString(context) + " " + namespace;
227    }
228
229}