001/*
002 * VM-Operator
003 * Copyright (C) 2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.common;
020
021import io.kubernetes.client.Discovery.APIResource;
022import io.kubernetes.client.common.KubernetesListObject;
023import io.kubernetes.client.common.KubernetesObject;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch.Response;
026import io.kubernetes.client.util.generic.GenericKubernetesApi;
027import io.kubernetes.client.util.generic.options.ListOptions;
028import java.time.Duration;
029import java.time.Instant;
030import java.util.function.BiConsumer;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * An observer that watches namespaced resources in a given context and
036 * invokes a handler on changes.
037 *
038 * @param <O> the object type for the context
039 * @param <L> the object list type for the context
040 */
041public class K8sObserver<O extends KubernetesObject,
042        L extends KubernetesListObject> {
043
044    /**
045     * The type of change reported by {@link Response} as enum.
046     */
047    public enum ResponseType {
048        ADDED, MODIFIED, DELETED
049    }
050
051    @SuppressWarnings("PMD.FieldNamingConventions")
052    protected final Logger logger = Logger.getLogger(getClass().getName());
053
054    protected final K8sClient client;
055    protected final GenericKubernetesApi<O, L> api;
056    protected final APIResource context;
057    protected final String namespace;
058    protected final ListOptions options;
059    protected final Thread thread;
060    protected BiConsumer<K8sClient, Response<O>> handler;
061    protected BiConsumer<K8sObserver<O, L>, Throwable> onTerminated;
062
063    /**
064     * Create and start a new observer for objects in the given context 
065     * (using preferred version) and namespace with the given options.
066     *
067     * @param objectClass the object class
068     * @param objectListClass the object list class
069     * @param client the client
070     * @param context the context
071     * @param namespace the namespace
072     * @param options the options
073     */
074    @SuppressWarnings({ "PMD.AvoidBranchingStatementAsLastInLoop",
075        "PMD.UseObjectForClearerAPI", "PMD.AvoidCatchingThrowable",
076        "PMD.CognitiveComplexity" })
077    public K8sObserver(Class<O> objectClass, Class<L> objectListClass,
078            K8sClient client, APIResource context, String namespace,
079            ListOptions options) {
080        this.client = client;
081        this.context = context;
082        this.namespace = namespace;
083        this.options = options;
084
085        api = new GenericKubernetesApi<>(objectClass, objectListClass,
086            context.getGroup(), context.getPreferredVersion(),
087            context.getResourcePlural(), client);
088        thread = new Thread(() -> {
089            try {
090                logger.config(() -> "Watching " + context.getResourcePlural()
091                    + " (" + context.getPreferredVersion() + ")"
092                    + " in " + namespace);
093
094                // Watch sometimes terminates without apparent reason.
095                while (!Thread.currentThread().isInterrupted()) {
096                    Instant startedAt = Instant.now();
097                    try {
098                        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
099                        var changed = api.watch(namespace, options).iterator();
100                        while (changed.hasNext()) {
101                            handler.accept(client, changed.next());
102                        }
103                    } catch (ApiException e) {
104                        logger.log(Level.FINE, e, () -> "Problem watching"
105                            + " (will retry): " + e.getMessage());
106                        delayRestart(startedAt);
107                    }
108                }
109                if (onTerminated != null) {
110                    onTerminated.accept(this, null);
111                }
112            } catch (Throwable e) {
113                logger.log(Level.SEVERE, e, () -> "Probem watching: "
114                    + e.getMessage());
115                if (onTerminated != null) {
116                    onTerminated.accept(this, e);
117                }
118            }
119        });
120        thread.setDaemon(true);
121    }
122
123    @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
124    private void delayRestart(Instant started) {
125        var runningFor = Duration
126            .between(started, Instant.now()).toMillis();
127        if (runningFor < 5000) {
128            logger.log(Level.FINE, () -> "Waiting... ");
129            try {
130                Thread.sleep(5000 - runningFor);
131            } catch (InterruptedException e1) { // NOPMD
132                // Retry
133            }
134            logger.log(Level.FINE, () -> "Retrying");
135        }
136    }
137
138    /**
139     * Sets the handler.
140     *
141     * @param handler the handler
142     * @return the observer
143     */
144    public K8sObserver<O, L>
145            handler(BiConsumer<K8sClient, Response<O>> handler) {
146        this.handler = handler;
147        return this;
148    }
149
150    /**
151     * Sets a function to invoke if the observer terminates. First argument
152     * is this observer, the second is the throwable that caused the
153     * abnormal termination or `null` if the observer was terminated
154     * by {@link #stop()}.
155     *
156     * @param onTerminated the on terminated
157     * @return the observer
158     */
159    public K8sObserver<O, L> onTerminated(
160            BiConsumer<K8sObserver<O, L>, Throwable> onTerminated) {
161        this.onTerminated = onTerminated;
162        return this;
163    }
164
165    /**
166     * Start the observer.
167     *
168     * @return the observer
169     */
170    public K8sObserver<O, L> start() {
171        if (handler == null) {
172            throw new IllegalStateException("No handler defined");
173        }
174        thread.start();
175        return this;
176    }
177
178    /**
179     * Stops the observer.
180     *
181     * @return the observer
182     */
183    public K8sObserver<O, L> stop() {
184        thread.interrupt();
185        return this;
186    }
187
188    /**
189     * Returns the client.
190     *
191     * @return the client
192     */
193    public K8sClient client() {
194        return client;
195    }
196
197    /**
198     * Returns the context.
199     * 
200     * @return the context
201     */
202    public APIResource context() {
203        return context;
204    }
205
206    /**
207     * Returns the observed namespace.
208     * 
209     * @return the namespace
210     */
211    public String getNamespace() {
212        return namespace;
213    }
214
215    /**
216     * Returns the options for object selection.
217     *
218     * @return the list options
219     */
220    public ListOptions options() {
221        return options;
222    }
223
224    @Override
225    @SuppressWarnings("PMD.UseLocaleWithCaseConversions")
226    public String toString() {
227        return "Observer for " + K8s.toString(context) + " " + namespace;
228    }
229
230}