001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.common;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.GenericKubernetesApi;
027import io.kubernetes.client.util.generic.options.ListOptions;
028import java.time.Duration;
029import java.time.Instant;
030import java.util.function.BiConsumer;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033import org.jgrapes.core.Components;
034
035/**
036 * An observer that watches namespaced resources in a given context and
037 * invokes a handler on changes.
038 *
039 * @param <O> the object type for the context
040 * @param <L> the object list type for the context
041 */
042public class K8sObserver<O extends KubernetesObject,
043        L extends KubernetesListObject> {
044
045    /**
046     * The type of change reported by {@link Response} as enum.
047     */
048    public enum ResponseType {
049        ADDED, MODIFIED, DELETED
050    }
051
052    @SuppressWarnings("PMD.FieldNamingConventions")
053    protected final Logger logger = Logger.getLogger(getClass().getName());
054
055    protected final K8sClient client;
056    protected final GenericKubernetesApi<O, L> api;
057    protected final APIResource context;
058    protected final String namespace;
059    protected final ListOptions options;
060    protected final Thread thread;
061    protected BiConsumer<K8sClient, Response<O>> handler;
062    protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated;
063
064    /**
065     * Create and start a new observer for objects in the given context 
066     * (using preferred version) and namespace with the given options.
067     *
068     * @param objectClass the object class
069     * @param objectListClass the object list class
070     * @param client the client
071     * @param context the context
072     * @param namespace the namespace
073     * @param options the options
074     */
075    @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop",
076        "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable",
077        "PMD.CognitiveComplexity", "PMD.AvoidCatchingGenericException" })
078    public K8sObserver(Class<O> objectClass, Class<L> objectListClass,
079            K8sClient client, APIResource context, String namespace,
080            ListOptions options) {
081        this.client = client;
082        this.context = context;
083        this.namespace = namespace;
084        this.options = options;
085
086        api = new GenericKubernetesApi<>(objectClass, objectListClass,
087            context.getGroup(), context.getPreferredVersion(),
088            context.getResourcePlural(), client);
089        thread = (Components.useVirtualThreads() ? Thread.ofVirtual()
090            : Thread.ofPlatform()).unstarted(() -> {
091                try {
092                    logger
093                        .config(() -> "Watching " + context.getResourcePlural()
094                            + " (" + context.getPreferredVersion() + ")"
095                            + " in " + namespace);
096
097                    // Watch sometimes terminates without apparent reason.
098                    while (!Thread.currentThread().isInterrupted()) {
099                        Instant startedAt = Instant.now();
100                        try {
101                            @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
102                            var changed
103                                = api.watch(namespace, options).iterator();
104                            while (changed.hasNext()) {
105                                handler.accept(client, changed.next());
106                            }
107                        } catch (ApiException | RuntimeException e) {
108                            logger.log(Level.FINE, e, () -> "Problem watching"
109                                + " (will retry): " + e.getMessage());
110                            delayRestart(startedAt);
111                        }
112                    }
113                    if (onTerminated != null) {
114                        onTerminated.accept(this, null);
115                    }
116                } catch (Throwable e) {
117                    logger.log(Level.SEVERE, e, () -> "Probem watching: "
118                        + e.getMessage());
119                    if (onTerminated != null) {
120                        onTerminated.accept(this, e);
121                    }
122                }
123            });
124    }
125
126    @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
127    private void delayRestart(Instant started) {
128        var runningFor = Duration
129            .between(started, Instant.now()).toMillis();
130        if (runningFor < 5000) {
131            logger.log(Level.FINE, () -> "Waiting... ");
132            try {
133                Thread.sleep(5000 - runningFor);
134            } catch (InterruptedException e1) { // NOPMD
135                // Retry
136            }
137            logger.log(Level.FINE, () -> "Retrying");
138        }
139    }
140
141    /**
142     * Sets the handler.
143     *
144     * @param handler the handler
145     * @return the observer
146     */
147    public K8sObserver<O, L>
148            handler(BiConsumer<K8sClient, Response<O>> handler) {
149        this.handler = handler;
150        return this;
151    }
152
153    /**
154     * Sets a function to invoke if the observer terminates. First argument
155     * is this observer, the second is the throwable that caused the
156     * abnormal termination or `null` if the observer was terminated
157     * by {@link #stop()}.
158     *
159     * @param onTerminated the on terminated
160     * @return the observer
161     */
162    public K8sObserver<O, L> onTerminated(
163            BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) {
164        this.onTerminated = onTerminated;
165        return this;
166    }
167
168    /**
169     * Start the observer.
170     *
171     * @return the observer
172     */
173    public K8sObserver<O, L> start() {
174        if (handler == null) {
175            throw new IllegalStateException("No handler defined");
176        }
177        thread.start();
178        return this;
179    }
180
181    /**
182     * Stops the observer.
183     *
184     * @return the observer
185     */
186    public K8sObserver<O, L> stop() {
187        thread.interrupt();
188        return this;
189    }
190
191    /**
192     * Returns the client.
193     *
194     * @return the client
195     */
196    public K8sClient client() {
197        return client;
198    }
199
200    /**
201     * Returns the context.
202     * 
203     * @return the context
204     */
205    public APIResource context() {
206        return context;
207    }
208
209    /**
210     * Returns the observed namespace.
211     * 
212     * @return the namespace
213     */
214    public String getNamespace() {
215        return namespace;
216    }
217
218    /**
219     * Returns the options for object selection.
220     *
221     * @return the list options
222     */
223    public ListOptions options() {
224        return options;
225    }
226
227    @Override
228    @SuppressWarnings("PMD.UseLocaleWithCaseConversions")
229    public String toString() {
230        return "Observer for " + K8s.toString(context) + " " + namespace;
231    }
232
233}