001/*
002 * VM-Operator
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.core.JsonProcessingException;
022import com.fasterxml.jackson.databind.ObjectMapper;
023import com.fasterxml.jackson.databind.node.ObjectNode;
024import java.io.IOException;
025import java.io.Writer;
026import java.lang.reflect.UndeclaredThrowableException;
027import java.net.UnixDomainSocketAddress;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.time.Duration;
031import java.time.Instant;
032import java.util.LinkedList;
033import java.util.Queue;
034import java.util.logging.Level;
035import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities;
036import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
037import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown;
038import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
039import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand;
040import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent;
041import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
042import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
043import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
044import org.jgrapes.core.Channel;
045import org.jgrapes.core.Component;
046import org.jgrapes.core.Components;
047import org.jgrapes.core.Components.Timer;
048import org.jgrapes.core.EventPipeline;
049import org.jgrapes.core.annotation.Handler;
050import org.jgrapes.core.events.Start;
051import org.jgrapes.core.events.Stop;
052import org.jgrapes.io.events.Closed;
053import org.jgrapes.io.events.ConnectError;
054import org.jgrapes.io.events.Input;
055import org.jgrapes.io.events.OpenSocketConnection;
056import org.jgrapes.io.util.ByteBufferWriter;
057import org.jgrapes.io.util.LineCollector;
058import org.jgrapes.net.SocketIOChannel;
059import org.jgrapes.net.events.ClientConnected;
060import org.jgrapes.util.events.ConfigurationUpdate;
061import org.jgrapes.util.events.FileChanged;
062import org.jgrapes.util.events.WatchFile;
063
064/**
065 * A component that handles the communication over the Qemu monitor
066 * socket.
067 * 
068 * If the log level for this class is set to fine, the messages 
069 * exchanged on the monitor socket are logged.
070 */
071@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
072public class QemuMonitor extends Component {
073
074    private static ObjectMapper mapper = new ObjectMapper();
075
076    private EventPipeline rep;
077    private Path socketPath;
078    private int powerdownTimeout;
079    private SocketIOChannel monitorChannel;
080    private final Queue<QmpCommand> executing = new LinkedList<>();
081    private Instant powerdownStartedAt;
082    private Stop suspendedStop;
083    private Timer powerdownTimer;
084    private boolean powerdownConfirmed;
085
086    /**
087     * Instantiates a new qemu monitor.
088     *
089     * @param componentChannel the component channel
090     * @param configDir the config dir
091     * @throws IOException Signals that an I/O exception has occurred.
092     */
093    @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic",
094        "PMD.ConstructorCallsOverridableMethod" })
095    public QemuMonitor(Channel componentChannel, Path configDir)
096            throws IOException {
097        super(componentChannel);
098        attach(new RamController(channel()));
099        attach(new CpuController(channel()));
100        attach(new DisplayController(channel(), configDir));
101        attach(new CdMediaController(channel()));
102    }
103
104    /**
105     * As the initial configuration of this component depends on the 
106     * configuration of the {@link Runner}, it doesn't have a handler 
107     * for the {@link ConfigurationUpdate} event. The values are 
108     * forwarded from the {@link Runner} instead.
109     *
110     * @param socketPath the socket path
111     * @param powerdownTimeout 
112     */
113    /* default */ void configure(Path socketPath, int powerdownTimeout) {
114        this.socketPath = socketPath;
115        this.powerdownTimeout = powerdownTimeout;
116    }
117
118    /**
119     * Handle the start event.
120     *
121     * @param event the event
122     * @throws IOException Signals that an I/O exception has occurred.
123     */
124    @Handler
125    public void onStart(Start event) throws IOException {
126        rep = event.associated(EventPipeline.class).get();
127        if (socketPath == null) {
128            return;
129        }
130        Files.deleteIfExists(socketPath);
131        fire(new WatchFile(socketPath));
132    }
133
134    /**
135     * Watch for the creation of the swtpm socket and start the
136     * qemu process if it has been created.
137     *
138     * @param event the event
139     */
140    @Handler
141    public void onFileChanged(FileChanged event) {
142        if (event.change() == FileChanged.Kind.CREATED
143            && event.path().equals(socketPath)) {
144            // qemu running, open socket
145            fire(new OpenSocketConnection(
146                UnixDomainSocketAddress.of(socketPath))
147                    .setAssociated(QemuMonitor.class, this));
148        }
149    }
150
151    /**
152     * Check if this is from opening the monitor socket and if true,
153     * save the socket in the context and associate the channel with
154     * the context. Then send the initial message to the socket.
155     *
156     * @param event the event
157     * @param channel the channel
158     */
159    @SuppressWarnings("resource")
160    @Handler
161    public void onClientConnected(ClientConnected event,
162            SocketIOChannel channel) {
163        event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> {
164            monitorChannel = channel;
165            channel.setAssociated(QemuMonitor.class, this);
166            channel.setAssociated(Writer.class, new ByteBufferWriter(
167                channel).nativeCharset());
168            channel.setAssociated(LineCollector.class,
169                new LineCollector()
170                    .consumer(line -> {
171                        try {
172                            processMonitorInput(line);
173                        } catch (IOException e) {
174                            throw new UndeclaredThrowableException(e);
175                        }
176                    }));
177            fire(new MonitorCommand(new QmpCapabilities()));
178        });
179    }
180
181    /**
182     * Called when a connection attempt fails.
183     *
184     * @param event the event
185     * @param channel the channel
186     */
187    @Handler
188    public void onConnectError(ConnectError event, SocketIOChannel channel) {
189        event.event().associated(QemuMonitor.class).ifPresent(qm -> {
190            rep.fire(new Stop());
191        });
192    }
193
194    /**
195     * Handle data from qemu monitor connection.
196     *
197     * @param event the event
198     * @param channel the channel
199     */
200    @Handler
201    public void onInput(Input<?> event, SocketIOChannel channel) {
202        if (channel.associated(QemuMonitor.class).isEmpty()) {
203            return;
204        }
205        channel.associated(LineCollector.class).ifPresent(collector -> {
206            collector.feed(event);
207        });
208    }
209
210    private void processMonitorInput(String line)
211            throws IOException {
212        logger.fine(() -> "monitor(in): " + line);
213        try {
214            var response = mapper.readValue(line, ObjectNode.class);
215            if (response.has("QMP")) {
216                rep.fire(new MonitorReady());
217                return;
218            }
219            if (response.has("return") || response.has("error")) {
220                QmpCommand executed = executing.poll();
221                logger.fine(
222                    () -> String.format("(Previous \"monitor(in)\" is result "
223                        + "from executing %s)", executed));
224                rep.fire(MonitorResult.from(executed, response));
225                return;
226            }
227            if (response.has("event")) {
228                MonitorEvent.from(response).ifPresent(rep::fire);
229            }
230        } catch (JsonProcessingException e) {
231            throw new IOException(e);
232        }
233    }
234
235    /**
236     * On closed.
237     *
238     * @param event the event
239     */
240    @Handler
241    @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
242        "PMD.AvoidDuplicateLiterals" })
243    public void onClosed(Closed<?> event, SocketIOChannel channel) {
244        channel.associated(QemuMonitor.class).ifPresent(qm -> {
245            monitorChannel = null;
246            synchronized (this) {
247                if (powerdownTimer != null) {
248                    powerdownTimer.cancel();
249                }
250                if (suspendedStop != null) {
251                    suspendedStop.resumeHandling();
252                    suspendedStop = null;
253                }
254            }
255        });
256    }
257
258    /**
259     * On monitor command.
260     *
261     * @param event the event
262     */
263    @Handler
264    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
265        "PMD.AvoidSynchronizedStatement" })
266    public void onExecQmpCommand(MonitorCommand event) {
267        var command = event.command();
268        logger.fine(() -> "monitor(out): " + command.toString());
269        String asText;
270        try {
271            asText = command.asText();
272        } catch (JsonProcessingException e) {
273            logger.log(Level.SEVERE, e,
274                () -> "Cannot serialize Json: " + e.getMessage());
275            return;
276        }
277        synchronized (executing) {
278            monitorChannel.associated(Writer.class).ifPresent(writer -> {
279                try {
280                    executing.add(command);
281                    writer.append(asText).append('\n').flush();
282                } catch (IOException e) {
283                    // Cannot happen, but...
284                    logger.log(Level.WARNING, e, e::getMessage);
285                }
286            });
287        }
288    }
289
290    /**
291     * Shutdown the VM.
292     *
293     * @param event the event
294     */
295    @Handler(priority = 100)
296    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
297    public void onStop(Stop event) {
298        if (monitorChannel != null) {
299            // We have a connection to Qemu, attempt ACPI shutdown.
300            event.suspendHandling();
301            suspendedStop = event;
302
303            // Attempt powerdown command. If not confirmed, assume
304            // "hanging" qemu process.
305            powerdownTimer = Components.schedule(t -> {
306                // Powerdown not confirmed
307                logger.fine(() -> "QMP powerdown command has not effect.");
308                synchronized (this) {
309                    powerdownTimer = null;
310                    if (suspendedStop != null) {
311                        suspendedStop.resumeHandling();
312                        suspendedStop = null;
313                    }
314                }
315            }, Duration.ofSeconds(1));
316            logger.fine(() -> "Attempting QMP powerdown.");
317            powerdownStartedAt = Instant.now();
318            fire(new MonitorCommand(new QmpPowerdown()));
319        }
320    }
321
322    /**
323     * On powerdown event.
324     *
325     * @param event the event
326     */
327    @Handler
328    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
329    public void onPowerdownEvent(PowerdownEvent event) {
330        synchronized (this) {
331            // Cancel confirmation timeout
332            if (powerdownTimer != null) {
333                powerdownTimer.cancel();
334            }
335
336            // (Re-)schedule timer as fallback
337            logger.fine(() -> "QMP powerdown confirmed, waiting...");
338            powerdownTimer = Components.schedule(t -> {
339                logger.fine(() -> "Powerdown timeout reached.");
340                synchronized (this) {
341                    if (suspendedStop != null) {
342                        suspendedStop.resumeHandling();
343                        suspendedStop = null;
344                    }
345                }
346            }, powerdownStartedAt.plusSeconds(powerdownTimeout));
347            powerdownConfirmed = true;
348        }
349    }
350
351    /**
352     * On configure qemu.
353     *
354     * @param event the event
355     */
356    @Handler
357    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
358    public void onConfigureQemu(ConfigureQemu event) {
359        int newTimeout = event.configuration().vm.powerdownTimeout;
360        if (powerdownTimeout != newTimeout) {
361            powerdownTimeout = newTimeout;
362            synchronized (this) {
363                if (powerdownTimer != null && powerdownConfirmed) {
364                    powerdownTimer
365                        .reschedule(powerdownStartedAt.plusSeconds(newTimeout));
366                }
367
368            }
369        }
370    }
371
372}