001/* 002 * VM-Operator 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.core.JsonProcessingException; 022import com.fasterxml.jackson.databind.ObjectMapper; 023import com.fasterxml.jackson.databind.node.ObjectNode; 024import java.io.IOException; 025import java.io.Writer; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnixDomainSocketAddress; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.time.Duration; 031import java.time.Instant; 032import java.util.LinkedList; 033import java.util.Queue; 034import java.util.logging.Level; 035import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities; 036import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; 037import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown; 038import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 039import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand; 040import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent; 041import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady; 042import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; 043import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; 044import org.jgrapes.core.Channel; 045import org.jgrapes.core.Component; 046import org.jgrapes.core.Components; 047import org.jgrapes.core.Components.Timer; 048import org.jgrapes.core.EventPipeline; 049import org.jgrapes.core.annotation.Handler; 050import org.jgrapes.core.events.Start; 051import org.jgrapes.core.events.Stop; 052import org.jgrapes.io.events.Closed; 053import org.jgrapes.io.events.ConnectError; 054import org.jgrapes.io.events.Input; 055import org.jgrapes.io.events.OpenSocketConnection; 056import org.jgrapes.io.util.ByteBufferWriter; 057import org.jgrapes.io.util.LineCollector; 058import org.jgrapes.net.SocketIOChannel; 059import org.jgrapes.net.events.ClientConnected; 060import org.jgrapes.util.events.ConfigurationUpdate; 061import org.jgrapes.util.events.FileChanged; 062import org.jgrapes.util.events.WatchFile; 063 064/** 065 * A component that handles the communication over the Qemu monitor 066 * socket. 067 * 068 * If the log level for this class is set to fine, the messages 069 * exchanged on the monitor socket are logged. 070 */ 071@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 072public class QemuMonitor extends Component { 073 074 private static ObjectMapper mapper = new ObjectMapper(); 075 076 private EventPipeline rep; 077 private Path socketPath; 078 private int powerdownTimeout; 079 private SocketIOChannel monitorChannel; 080 private final Queue<QmpCommand> executing = new LinkedList<>(); 081 private Instant powerdownStartedAt; 082 private Stop suspendedStop; 083 private Timer powerdownTimer; 084 private boolean powerdownConfirmed; 085 086 /** 087 * Instantiates a new qemu monitor. 088 * 089 * @param componentChannel the component channel 090 * @param configDir the config dir 091 * @throws IOException Signals that an I/O exception has occurred. 092 */ 093 @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic", 094 "PMD.ConstructorCallsOverridableMethod" }) 095 public QemuMonitor(Channel componentChannel, Path configDir) 096 throws IOException { 097 super(componentChannel); 098 attach(new RamController(channel())); 099 attach(new CpuController(channel())); 100 attach(new DisplayController(channel(), configDir)); 101 attach(new CdMediaController(channel())); 102 } 103 104 /** 105 * As the initial configuration of this component depends on the 106 * configuration of the {@link Runner}, it doesn't have a handler 107 * for the {@link ConfigurationUpdate} event. The values are 108 * forwarded from the {@link Runner} instead. 109 * 110 * @param socketPath the socket path 111 * @param powerdownTimeout 112 */ 113 /* default */ void configure(Path socketPath, int powerdownTimeout) { 114 this.socketPath = socketPath; 115 this.powerdownTimeout = powerdownTimeout; 116 } 117 118 /** 119 * Handle the start event. 120 * 121 * @param event the event 122 * @throws IOException Signals that an I/O exception has occurred. 123 */ 124 @Handler 125 public void onStart(Start event) throws IOException { 126 rep = event.associated(EventPipeline.class).get(); 127 if (socketPath == null) { 128 return; 129 } 130 Files.deleteIfExists(socketPath); 131 fire(new WatchFile(socketPath)); 132 } 133 134 /** 135 * Watch for the creation of the swtpm socket and start the 136 * qemu process if it has been created. 137 * 138 * @param event the event 139 */ 140 @Handler 141 public void onFileChanged(FileChanged event) { 142 if (event.change() == FileChanged.Kind.CREATED 143 && event.path().equals(socketPath)) { 144 // qemu running, open socket 145 fire(new OpenSocketConnection( 146 UnixDomainSocketAddress.of(socketPath)) 147 .setAssociated(QemuMonitor.class, this)); 148 } 149 } 150 151 /** 152 * Check if this is from opening the monitor socket and if true, 153 * save the socket in the context and associate the channel with 154 * the context. Then send the initial message to the socket. 155 * 156 * @param event the event 157 * @param channel the channel 158 */ 159 @SuppressWarnings("resource") 160 @Handler 161 public void onClientConnected(ClientConnected event, 162 SocketIOChannel channel) { 163 event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> { 164 monitorChannel = channel; 165 channel.setAssociated(QemuMonitor.class, this); 166 channel.setAssociated(Writer.class, new ByteBufferWriter( 167 channel).nativeCharset()); 168 channel.setAssociated(LineCollector.class, 169 new LineCollector() 170 .consumer(line -> { 171 try { 172 processMonitorInput(line); 173 } catch (IOException e) { 174 throw new UndeclaredThrowableException(e); 175 } 176 })); 177 fire(new MonitorCommand(new QmpCapabilities())); 178 }); 179 } 180 181 /** 182 * Called when a connection attempt fails. 183 * 184 * @param event the event 185 * @param channel the channel 186 */ 187 @Handler 188 public void onConnectError(ConnectError event, SocketIOChannel channel) { 189 event.event().associated(QemuMonitor.class).ifPresent(qm -> { 190 rep.fire(new Stop()); 191 }); 192 } 193 194 /** 195 * Handle data from qemu monitor connection. 196 * 197 * @param event the event 198 * @param channel the channel 199 */ 200 @Handler 201 public void onInput(Input<?> event, SocketIOChannel channel) { 202 if (channel.associated(QemuMonitor.class).isEmpty()) { 203 return; 204 } 205 channel.associated(LineCollector.class).ifPresent(collector -> { 206 collector.feed(event); 207 }); 208 } 209 210 private void processMonitorInput(String line) 211 throws IOException { 212 logger.fine(() -> "monitor(in): " + line); 213 try { 214 var response = mapper.readValue(line, ObjectNode.class); 215 if (response.has("QMP")) { 216 rep.fire(new MonitorReady()); 217 return; 218 } 219 if (response.has("return") || response.has("error")) { 220 QmpCommand executed = executing.poll(); 221 logger.fine( 222 () -> String.format("(Previous \"monitor(in)\" is result " 223 + "from executing %s)", executed)); 224 rep.fire(MonitorResult.from(executed, response)); 225 return; 226 } 227 if (response.has("event")) { 228 MonitorEvent.from(response).ifPresent(rep::fire); 229 } 230 } catch (JsonProcessingException e) { 231 throw new IOException(e); 232 } 233 } 234 235 /** 236 * On closed. 237 * 238 * @param event the event 239 */ 240 @Handler 241 @SuppressWarnings({ "PMD.AvoidSynchronizedStatement", 242 "PMD.AvoidDuplicateLiterals" }) 243 public void onClosed(Closed<?> event, SocketIOChannel channel) { 244 channel.associated(QemuMonitor.class).ifPresent(qm -> { 245 monitorChannel = null; 246 synchronized (this) { 247 if (powerdownTimer != null) { 248 powerdownTimer.cancel(); 249 } 250 if (suspendedStop != null) { 251 suspendedStop.resumeHandling(); 252 suspendedStop = null; 253 } 254 } 255 }); 256 } 257 258 /** 259 * On monitor command. 260 * 261 * @param event the event 262 */ 263 @Handler 264 @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", 265 "PMD.AvoidSynchronizedStatement" }) 266 public void onExecQmpCommand(MonitorCommand event) { 267 var command = event.command(); 268 logger.fine(() -> "monitor(out): " + command.toString()); 269 String asText; 270 try { 271 asText = command.asText(); 272 } catch (JsonProcessingException e) { 273 logger.log(Level.SEVERE, e, 274 () -> "Cannot serialize Json: " + e.getMessage()); 275 return; 276 } 277 synchronized (executing) { 278 monitorChannel.associated(Writer.class).ifPresent(writer -> { 279 try { 280 executing.add(command); 281 writer.append(asText).append('\n').flush(); 282 } catch (IOException e) { 283 // Cannot happen, but... 284 logger.log(Level.WARNING, e, e::getMessage); 285 } 286 }); 287 } 288 } 289 290 /** 291 * Shutdown the VM. 292 * 293 * @param event the event 294 */ 295 @Handler(priority = 100) 296 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 297 public void onStop(Stop event) { 298 if (monitorChannel != null) { 299 // We have a connection to Qemu, attempt ACPI shutdown. 300 event.suspendHandling(); 301 suspendedStop = event; 302 303 // Attempt powerdown command. If not confirmed, assume 304 // "hanging" qemu process. 305 powerdownTimer = Components.schedule(t -> { 306 // Powerdown not confirmed 307 logger.fine(() -> "QMP powerdown command has not effect."); 308 synchronized (this) { 309 powerdownTimer = null; 310 if (suspendedStop != null) { 311 suspendedStop.resumeHandling(); 312 suspendedStop = null; 313 } 314 } 315 }, Duration.ofSeconds(1)); 316 logger.fine(() -> "Attempting QMP powerdown."); 317 powerdownStartedAt = Instant.now(); 318 fire(new MonitorCommand(new QmpPowerdown())); 319 } 320 } 321 322 /** 323 * On powerdown event. 324 * 325 * @param event the event 326 */ 327 @Handler 328 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 329 public void onPowerdownEvent(PowerdownEvent event) { 330 synchronized (this) { 331 // Cancel confirmation timeout 332 if (powerdownTimer != null) { 333 powerdownTimer.cancel(); 334 } 335 336 // (Re-)schedule timer as fallback 337 logger.fine(() -> "QMP powerdown confirmed, waiting..."); 338 powerdownTimer = Components.schedule(t -> { 339 logger.fine(() -> "Powerdown timeout reached."); 340 synchronized (this) { 341 if (suspendedStop != null) { 342 suspendedStop.resumeHandling(); 343 suspendedStop = null; 344 } 345 } 346 }, powerdownStartedAt.plusSeconds(powerdownTimeout)); 347 powerdownConfirmed = true; 348 } 349 } 350 351 /** 352 * On configure qemu. 353 * 354 * @param event the event 355 */ 356 @Handler 357 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 358 public void onConfigureQemu(ConfigureQemu event) { 359 int newTimeout = event.configuration().vm.powerdownTimeout; 360 if (powerdownTimeout != newTimeout) { 361 powerdownTimeout = newTimeout; 362 synchronized (this) { 363 if (powerdownTimer != null && powerdownConfirmed) { 364 powerdownTimer 365 .reschedule(powerdownStartedAt.plusSeconds(newTimeout)); 366 } 367 368 } 369 } 370 } 371 372}