001/*
002 * VM-Operator
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.core.JsonProcessingException;
022import com.fasterxml.jackson.databind.ObjectMapper;
023import com.fasterxml.jackson.databind.node.ObjectNode;
024import java.io.IOException;
025import java.io.Writer;
026import java.lang.reflect.UndeclaredThrowableException;
027import java.net.UnixDomainSocketAddress;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.time.Duration;
031import java.time.Instant;
032import java.util.LinkedList;
033import java.util.Queue;
034import java.util.logging.Level;
035import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities;
036import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
037import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown;
038import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
039import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand;
040import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent;
041import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
042import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
043import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
044import org.jgrapes.core.Channel;
045import org.jgrapes.core.Component;
046import org.jgrapes.core.Components;
047import org.jgrapes.core.Components.Timer;
048import org.jgrapes.core.EventPipeline;
049import org.jgrapes.core.annotation.Handler;
050import org.jgrapes.core.events.Start;
051import org.jgrapes.core.events.Stop;
052import org.jgrapes.io.events.Closed;
053import org.jgrapes.io.events.ConnectError;
054import org.jgrapes.io.events.Input;
055import org.jgrapes.io.events.OpenSocketConnection;
056import org.jgrapes.io.util.ByteBufferWriter;
057import org.jgrapes.io.util.LineCollector;
058import org.jgrapes.net.SocketIOChannel;
059import org.jgrapes.net.events.ClientConnected;
060import org.jgrapes.util.events.ConfigurationUpdate;
061import org.jgrapes.util.events.FileChanged;
062import org.jgrapes.util.events.WatchFile;
063
064/**
065 * A component that handles the communication over the Qemu monitor
066 * socket.
067 * 
068 * If the log level for this class is set to fine, the messages 
069 * exchanged on the monitor socket are logged.
070 */
071@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
072public class QemuMonitor extends Component {
073
074    private static ObjectMapper mapper = new ObjectMapper();
075
076    private EventPipeline rep;
077    private Path socketPath;
078    private int powerdownTimeout;
079    private SocketIOChannel monitorChannel;
080    private final Queue<QmpCommand> executing = new LinkedList<>();
081    private Instant powerdownStartedAt;
082    private Stop suspendedStop;
083    private Timer powerdownTimer;
084    private boolean powerdownConfirmed;
085
086    /**
087     * Instantiates a new qemu monitor.
088     *
089     * @param componentChannel the component channel
090     * @param configDir the config dir
091     * @throws IOException Signals that an I/O exception has occurred.
092     */
093    @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic",
094        "PMD.ConstructorCallsOverridableMethod" })
095    public QemuMonitor(Channel componentChannel, Path configDir)
096            throws IOException {
097        super(componentChannel);
098        attach(new RamController(channel()));
099        attach(new CpuController(channel()));
100        attach(new DisplayController(channel(), configDir));
101        attach(new CdMediaController(channel()));
102    }
103
104    /**
105     * As the initial configuration of this component depends on the 
106     * configuration of the {@link Runner}, it doesn't have a handler 
107     * for the {@link ConfigurationUpdate} event. The values are 
108     * forwarded from the {@link Runner} instead.
109     *
110     * @param socketPath the socket path
111     * @param powerdownTimeout 
112     */
113    /* default */ void configure(Path socketPath, int powerdownTimeout) {
114        this.socketPath = socketPath;
115        this.powerdownTimeout = powerdownTimeout;
116    }
117
118    /**
119     * Handle the start event.
120     *
121     * @param event the event
122     * @throws IOException Signals that an I/O exception has occurred.
123     */
124    @Handler
125    public void onStart(Start event) throws IOException {
126        rep = event.associated(EventPipeline.class).get();
127        if (socketPath == null) {
128            return;
129        }
130        Files.deleteIfExists(socketPath);
131        fire(new WatchFile(socketPath));
132    }
133
134    /**
135     * Watch for the creation of the swtpm socket and start the
136     * qemu process if it has been created.
137     *
138     * @param event the event
139     */
140    @Handler
141    public void onFileChanged(FileChanged event) {
142        if (event.change() == FileChanged.Kind.CREATED
143            && event.path().equals(socketPath)) {
144            // qemu running, open socket
145            fire(new OpenSocketConnection(
146                UnixDomainSocketAddress.of(socketPath))
147                    .setAssociated(QemuMonitor.class, this));
148        }
149    }
150
151    /**
152     * Check if this is from opening the monitor socket and if true,
153     * save the socket in the context and associate the channel with
154     * the context. Then send the initial message to the socket.
155     *
156     * @param event the event
157     * @param channel the channel
158     */
159    @SuppressWarnings("resource")
160    @Handler
161    public void onClientConnected(ClientConnected event,
162            SocketIOChannel channel) {
163        event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> {
164            monitorChannel = channel;
165            channel.setAssociated(QemuMonitor.class, this);
166            channel.setAssociated(Writer.class, new ByteBufferWriter(
167                channel).nativeCharset());
168            channel.setAssociated(LineCollector.class,
169                new LineCollector()
170                    .consumer(line -> {
171                        try {
172                            processMonitorInput(line);
173                        } catch (IOException e) {
174                            throw new UndeclaredThrowableException(e);
175                        }
176                    }));
177            fire(new MonitorCommand(new QmpCapabilities()));
178        });
179    }
180
181    /**
182     * Called when a connection attempt fails.
183     *
184     * @param event the event
185     * @param channel the channel
186     */
187    @Handler
188    public void onConnectError(ConnectError event, SocketIOChannel channel) {
189        event.event().associated(QemuMonitor.class).ifPresent(qm -> {
190            rep.fire(new Stop());
191        });
192    }
193
194    /**
195     * Handle data from qemu monitor connection.
196     *
197     * @param event the event
198     * @param channel the channel
199     */
200    @Handler
201    public void onInput(Input<?> event, SocketIOChannel channel) {
202        if (channel.associated(QemuMonitor.class).isEmpty()) {
203            return;
204        }
205        channel.associated(LineCollector.class).ifPresent(collector -> {
206            collector.feed(event);
207        });
208    }
209
210    private void processMonitorInput(String line)
211            throws IOException {
212        logger.fine(() -> "monitor(in): " + line);
213        try {
214            var response = mapper.readValue(line, ObjectNode.class);
215            if (response.has("QMP")) {
216                rep.fire(new MonitorReady());
217                return;
218            }
219            if (response.has("return") || response.has("error")) {
220                QmpCommand executed = executing.poll();
221                logger.fine(
222                    () -> String.format("(Previous \"monitor(in)\" is result "
223                        + "from executing %s)", executed));
224                rep.fire(MonitorResult.from(executed, response));
225                return;
226            }
227            if (response.has("event")) {
228                MonitorEvent.from(response).ifPresent(rep::fire);
229            }
230        } catch (JsonProcessingException e) {
231            throw new IOException(e);
232        }
233    }
234
235    /**
236     * On closed.
237     *
238     * @param event the event
239     */
240    @Handler
241    public void onClosed(Closed<?> event, SocketIOChannel channel) {
242        channel.associated(QemuMonitor.class).ifPresent(qm -> {
243            monitorChannel = null;
244            synchronized (this) {
245                if (powerdownTimer != null) {
246                    powerdownTimer.cancel();
247                }
248                if (suspendedStop != null) {
249                    suspendedStop.resumeHandling();
250                    suspendedStop = null;
251                }
252            }
253        });
254    }
255
256    /**
257     * On monitor command.
258     *
259     * @param event the event
260     */
261    @Handler
262    @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
263    public void onExecQmpCommand(MonitorCommand event) {
264        var command = event.command();
265        logger.fine(() -> "monitor(out): " + command.toString());
266        String asText;
267        try {
268            asText = command.asText();
269        } catch (JsonProcessingException e) {
270            logger.log(Level.SEVERE, e,
271                () -> "Cannot serialize Json: " + e.getMessage());
272            return;
273        }
274        synchronized (executing) {
275            monitorChannel.associated(Writer.class).ifPresent(writer -> {
276                try {
277                    executing.add(command);
278                    writer.append(asText).append('\n').flush();
279                } catch (IOException e) {
280                    // Cannot happen, but...
281                    logger.log(Level.WARNING, e, e::getMessage);
282                }
283            });
284        }
285    }
286
287    /**
288     * Shutdown the VM.
289     *
290     * @param event the event
291     */
292    @Handler(priority = 100)
293    public void onStop(Stop event) {
294        if (monitorChannel != null) {
295            // We have a connection to Qemu, attempt ACPI shutdown.
296            event.suspendHandling();
297            suspendedStop = event;
298
299            // Attempt powerdown command. If not confirmed, assume
300            // "hanging" qemu process.
301            powerdownTimer = Components.schedule(t -> {
302                // Powerdown not confirmed
303                logger.fine(() -> "QMP powerdown command has not effect.");
304                synchronized (this) {
305                    powerdownTimer = null;
306                    if (suspendedStop != null) {
307                        suspendedStop.resumeHandling();
308                        suspendedStop = null;
309                    }
310                }
311            }, Duration.ofSeconds(1));
312            logger.fine(() -> "Attempting QMP powerdown.");
313            powerdownStartedAt = Instant.now();
314            fire(new MonitorCommand(new QmpPowerdown()));
315        }
316    }
317
318    /**
319     * On powerdown event.
320     *
321     * @param event the event
322     */
323    @Handler
324    public void onPowerdownEvent(PowerdownEvent event) {
325        synchronized (this) {
326            // Cancel confirmation timeout
327            if (powerdownTimer != null) {
328                powerdownTimer.cancel();
329            }
330
331            // (Re-)schedule timer as fallback
332            logger.fine(() -> "QMP powerdown confirmed, waiting...");
333            powerdownTimer = Components.schedule(t -> {
334                logger.fine(() -> "Powerdown timeout reached.");
335                synchronized (this) {
336                    if (suspendedStop != null) {
337                        suspendedStop.resumeHandling();
338                        suspendedStop = null;
339                    }
340                }
341            }, powerdownStartedAt.plusSeconds(powerdownTimeout));
342            powerdownConfirmed = true;
343        }
344    }
345
346    /**
347     * On configure qemu.
348     *
349     * @param event the event
350     */
351    @Handler
352    public void onConfigureQemu(ConfigureQemu event) {
353        int newTimeout = event.configuration().vm.powerdownTimeout;
354        if (powerdownTimeout != newTimeout) {
355            powerdownTimeout = newTimeout;
356            synchronized (this) {
357                if (powerdownTimer != null && powerdownConfirmed) {
358                    powerdownTimer
359                        .reschedule(powerdownStartedAt.plusSeconds(newTimeout));
360                }
361
362            }
363        }
364    }
365
366}