001/* 002 * VM-Operator 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.core.JsonProcessingException; 022import com.fasterxml.jackson.databind.ObjectMapper; 023import com.fasterxml.jackson.databind.node.ObjectNode; 024import java.io.IOException; 025import java.io.Writer; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnixDomainSocketAddress; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.time.Duration; 031import java.time.Instant; 032import java.util.LinkedList; 033import java.util.Queue; 034import java.util.logging.Level; 035import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities; 036import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; 037import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown; 038import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 039import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand; 040import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent; 041import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady; 042import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; 043import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; 044import org.jgrapes.core.Channel; 045import org.jgrapes.core.Component; 046import org.jgrapes.core.Components; 047import org.jgrapes.core.Components.Timer; 048import org.jgrapes.core.EventPipeline; 049import org.jgrapes.core.annotation.Handler; 050import org.jgrapes.core.events.Start; 051import org.jgrapes.core.events.Stop; 052import org.jgrapes.io.events.Closed; 053import org.jgrapes.io.events.ConnectError; 054import org.jgrapes.io.events.Input; 055import org.jgrapes.io.events.OpenSocketConnection; 056import org.jgrapes.io.util.ByteBufferWriter; 057import org.jgrapes.io.util.LineCollector; 058import org.jgrapes.net.SocketIOChannel; 059import org.jgrapes.net.events.ClientConnected; 060import org.jgrapes.util.events.ConfigurationUpdate; 061import org.jgrapes.util.events.FileChanged; 062import org.jgrapes.util.events.WatchFile; 063 064/** 065 * A component that handles the communication over the Qemu monitor 066 * socket. 067 * 068 * If the log level for this class is set to fine, the messages 069 * exchanged on the monitor socket are logged. 070 */ 071@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 072public class QemuMonitor extends Component { 073 074 private static ObjectMapper mapper = new ObjectMapper(); 075 076 private EventPipeline rep; 077 private Path socketPath; 078 private int powerdownTimeout; 079 private SocketIOChannel monitorChannel; 080 private final Queue<QmpCommand> executing = new LinkedList<>(); 081 private Instant powerdownStartedAt; 082 private Stop suspendedStop; 083 private Timer powerdownTimer; 084 private boolean powerdownConfirmed; 085 086 /** 087 * Instantiates a new qemu monitor. 088 * 089 * @param componentChannel the component channel 090 * @param configDir the config dir 091 * @throws IOException Signals that an I/O exception has occurred. 092 */ 093 @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic", 094 "PMD.ConstructorCallsOverridableMethod" }) 095 public QemuMonitor(Channel componentChannel, Path configDir) 096 throws IOException { 097 super(componentChannel); 098 attach(new RamController(channel())); 099 attach(new CpuController(channel())); 100 attach(new DisplayController(channel(), configDir)); 101 attach(new CdMediaController(channel())); 102 } 103 104 /** 105 * As the initial configuration of this component depends on the 106 * configuration of the {@link Runner}, it doesn't have a handler 107 * for the {@link ConfigurationUpdate} event. The values are 108 * forwarded from the {@link Runner} instead. 109 * 110 * @param socketPath the socket path 111 * @param powerdownTimeout 112 */ 113 /* default */ void configure(Path socketPath, int powerdownTimeout) { 114 this.socketPath = socketPath; 115 this.powerdownTimeout = powerdownTimeout; 116 } 117 118 /** 119 * Handle the start event. 120 * 121 * @param event the event 122 * @throws IOException Signals that an I/O exception has occurred. 123 */ 124 @Handler 125 public void onStart(Start event) throws IOException { 126 rep = event.associated(EventPipeline.class).get(); 127 if (socketPath == null) { 128 return; 129 } 130 Files.deleteIfExists(socketPath); 131 fire(new WatchFile(socketPath)); 132 } 133 134 /** 135 * Watch for the creation of the swtpm socket and start the 136 * qemu process if it has been created. 137 * 138 * @param event the event 139 */ 140 @Handler 141 public void onFileChanged(FileChanged event) { 142 if (event.change() == FileChanged.Kind.CREATED 143 && event.path().equals(socketPath)) { 144 // qemu running, open socket 145 fire(new OpenSocketConnection( 146 UnixDomainSocketAddress.of(socketPath)) 147 .setAssociated(QemuMonitor.class, this)); 148 } 149 } 150 151 /** 152 * Check if this is from opening the monitor socket and if true, 153 * save the socket in the context and associate the channel with 154 * the context. Then send the initial message to the socket. 155 * 156 * @param event the event 157 * @param channel the channel 158 */ 159 @SuppressWarnings("resource") 160 @Handler 161 public void onClientConnected(ClientConnected event, 162 SocketIOChannel channel) { 163 event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> { 164 monitorChannel = channel; 165 channel.setAssociated(QemuMonitor.class, this); 166 channel.setAssociated(Writer.class, new ByteBufferWriter( 167 channel).nativeCharset()); 168 channel.setAssociated(LineCollector.class, 169 new LineCollector() 170 .consumer(line -> { 171 try { 172 processMonitorInput(line); 173 } catch (IOException e) { 174 throw new UndeclaredThrowableException(e); 175 } 176 })); 177 fire(new MonitorCommand(new QmpCapabilities())); 178 }); 179 } 180 181 /** 182 * Called when a connection attempt fails. 183 * 184 * @param event the event 185 * @param channel the channel 186 */ 187 @Handler 188 public void onConnectError(ConnectError event, SocketIOChannel channel) { 189 event.event().associated(QemuMonitor.class).ifPresent(qm -> { 190 rep.fire(new Stop()); 191 }); 192 } 193 194 /** 195 * Handle data from qemu monitor connection. 196 * 197 * @param event the event 198 * @param channel the channel 199 */ 200 @Handler 201 public void onInput(Input<?> event, SocketIOChannel channel) { 202 if (channel.associated(QemuMonitor.class).isEmpty()) { 203 return; 204 } 205 channel.associated(LineCollector.class).ifPresent(collector -> { 206 collector.feed(event); 207 }); 208 } 209 210 private void processMonitorInput(String line) 211 throws IOException { 212 logger.fine(() -> "monitor(in): " + line); 213 try { 214 var response = mapper.readValue(line, ObjectNode.class); 215 if (response.has("QMP")) { 216 rep.fire(new MonitorReady()); 217 return; 218 } 219 if (response.has("return") || response.has("error")) { 220 QmpCommand executed = executing.poll(); 221 logger.fine( 222 () -> String.format("(Previous \"monitor(in)\" is result " 223 + "from executing %s)", executed)); 224 rep.fire(MonitorResult.from(executed, response)); 225 return; 226 } 227 if (response.has("event")) { 228 MonitorEvent.from(response).ifPresent(rep::fire); 229 } 230 } catch (JsonProcessingException e) { 231 throw new IOException(e); 232 } 233 } 234 235 /** 236 * On closed. 237 * 238 * @param event the event 239 */ 240 @Handler 241 public void onClosed(Closed<?> event, SocketIOChannel channel) { 242 channel.associated(QemuMonitor.class).ifPresent(qm -> { 243 monitorChannel = null; 244 synchronized (this) { 245 if (powerdownTimer != null) { 246 powerdownTimer.cancel(); 247 } 248 if (suspendedStop != null) { 249 suspendedStop.resumeHandling(); 250 suspendedStop = null; 251 } 252 } 253 }); 254 } 255 256 /** 257 * On monitor command. 258 * 259 * @param event the event 260 */ 261 @Handler 262 @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") 263 public void onExecQmpCommand(MonitorCommand event) { 264 var command = event.command(); 265 logger.fine(() -> "monitor(out): " + command.toString()); 266 String asText; 267 try { 268 asText = command.asText(); 269 } catch (JsonProcessingException e) { 270 logger.log(Level.SEVERE, e, 271 () -> "Cannot serialize Json: " + e.getMessage()); 272 return; 273 } 274 synchronized (executing) { 275 monitorChannel.associated(Writer.class).ifPresent(writer -> { 276 try { 277 executing.add(command); 278 writer.append(asText).append('\n').flush(); 279 } catch (IOException e) { 280 // Cannot happen, but... 281 logger.log(Level.WARNING, e, e::getMessage); 282 } 283 }); 284 } 285 } 286 287 /** 288 * Shutdown the VM. 289 * 290 * @param event the event 291 */ 292 @Handler(priority = 100) 293 public void onStop(Stop event) { 294 if (monitorChannel != null) { 295 // We have a connection to Qemu, attempt ACPI shutdown. 296 event.suspendHandling(); 297 suspendedStop = event; 298 299 // Attempt powerdown command. If not confirmed, assume 300 // "hanging" qemu process. 301 powerdownTimer = Components.schedule(t -> { 302 // Powerdown not confirmed 303 logger.fine(() -> "QMP powerdown command has not effect."); 304 synchronized (this) { 305 powerdownTimer = null; 306 if (suspendedStop != null) { 307 suspendedStop.resumeHandling(); 308 suspendedStop = null; 309 } 310 } 311 }, Duration.ofSeconds(1)); 312 logger.fine(() -> "Attempting QMP powerdown."); 313 powerdownStartedAt = Instant.now(); 314 fire(new MonitorCommand(new QmpPowerdown())); 315 } 316 } 317 318 /** 319 * On powerdown event. 320 * 321 * @param event the event 322 */ 323 @Handler 324 public void onPowerdownEvent(PowerdownEvent event) { 325 synchronized (this) { 326 // Cancel confirmation timeout 327 if (powerdownTimer != null) { 328 powerdownTimer.cancel(); 329 } 330 331 // (Re-)schedule timer as fallback 332 logger.fine(() -> "QMP powerdown confirmed, waiting..."); 333 powerdownTimer = Components.schedule(t -> { 334 logger.fine(() -> "Powerdown timeout reached."); 335 synchronized (this) { 336 if (suspendedStop != null) { 337 suspendedStop.resumeHandling(); 338 suspendedStop = null; 339 } 340 } 341 }, powerdownStartedAt.plusSeconds(powerdownTimeout)); 342 powerdownConfirmed = true; 343 } 344 } 345 346 /** 347 * On configure qemu. 348 * 349 * @param event the event 350 */ 351 @Handler 352 public void onConfigureQemu(ConfigureQemu event) { 353 int newTimeout = event.configuration().vm.powerdownTimeout; 354 if (powerdownTimeout != newTimeout) { 355 powerdownTimeout = newTimeout; 356 synchronized (this) { 357 if (powerdownTimer != null && powerdownConfirmed) { 358 powerdownTimer 359 .reschedule(powerdownStartedAt.plusSeconds(newTimeout)); 360 } 361 362 } 363 } 364 } 365 366}