001/* 002 * VM-Operator 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager.events; 020 021import java.lang.ref.WeakReference; 022import java.util.Collection; 023import java.util.Map; 024import java.util.Optional; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import org.jgrapes.core.Channel; 028 029/** 030 * A channel manager that tracks mappings from a key to a channel using 031 * "add/remove" (or "open/close") events and the channels on which they 032 * are delivered. 033 * 034 * @param <K> the key type 035 * @param <C> the channel type 036 * @param <A> the type of the associated data 037 */ 038public class ChannelCache<K, C extends Channel, A> { 039 040 private final Map<K, Data<C, A>> channels = new ConcurrentHashMap<>(); 041 042 /** 043 * Helper 044 */ 045 @SuppressWarnings("PMD.ShortClassName") 046 private static class Data<C extends Channel, A> { 047 public final WeakReference<C> channel; 048 public A associated; 049 050 /** 051 * Instantiates a new value. 052 * 053 * @param channel the channel 054 */ 055 public Data(C channel) { 056 this.channel = new WeakReference<>(channel); 057 } 058 } 059 060 /** 061 * Combines the channel and the associated data. 062 * 063 * @param <C> the generic type 064 * @param <A> the generic type 065 */ 066 @SuppressWarnings("PMD.ShortClassName") 067 public static class Both<C extends Channel, A> { 068 069 /** The channel. */ 070 public C channel; 071 072 /** The associated. */ 073 public A associated; 074 075 /** 076 * Instantiates a new both. 077 * 078 * @param channel the channel 079 * @param associated the associated 080 */ 081 public Both(C channel, A associated) { 082 super(); 083 this.channel = channel; 084 this.associated = associated; 085 } 086 } 087 088 /** 089 * Returns the channel and associates data registered for the key 090 * or an empty optional if no mapping exists. 091 * 092 * @param key the key 093 * @return the result 094 */ 095 public Optional<Both<C, A>> both(K key) { 096 synchronized (channels) { 097 var value = channels.get(key); 098 if (value == null) { 099 return Optional.empty(); 100 } 101 var channel = value.channel.get(); 102 if (channel == null) { 103 // Cleanup old reference 104 channels.remove(key); 105 return Optional.empty(); 106 } 107 return Optional.of(new Both<>(channel, value.associated)); 108 } 109 } 110 111 /** 112 * Store the given data. 113 * 114 * @param key the key 115 * @param channel the channel 116 * @param associated the associated 117 * @return the channel manager 118 */ 119 public ChannelCache<K, C, A> put(K key, C channel, A associated) { 120 Data<C, A> data = new Data<>(channel); 121 data.associated = associated; 122 channels.put(key, data); 123 return this; 124 } 125 126 /** 127 * Store the given data. 128 * 129 * @param key the key 130 * @param channel the channel 131 * @return the channel manager 132 */ 133 public ChannelCache<K, C, A> put(K key, C channel) { 134 put(key, channel, null); 135 return this; 136 } 137 138 /** 139 * Returns the channel registered for the key or an empty optional 140 * if no mapping exists. 141 * 142 * @param key the key 143 * @return the optional 144 */ 145 public Optional<C> channel(K key) { 146 return both(key).map(b -> b.channel); 147 } 148 149 /** 150 * Associate the entry for the channel with the given data. The entry 151 * for the channel must already exist. 152 * 153 * @param key the key 154 * @param data the data 155 * @return the channel manager 156 */ 157 public ChannelCache<K, C, A> associate(K key, A data) { 158 synchronized (channels) { 159 Optional.ofNullable(channels.get(key)) 160 .ifPresent(v -> v.associated = data); 161 } 162 return this; 163 } 164 165 /** 166 * Return the data associated with the entry for the channel. 167 * 168 * @param key the key 169 * @return the data 170 */ 171 public Optional<A> associated(K key) { 172 return both(key).map(b -> b.associated); 173 } 174 175 /** 176 * Returns all associated data. 177 * 178 * @return the collection 179 */ 180 public Collection<A> associated() { 181 synchronized (channels) { 182 return channels.values().stream() 183 .filter(v -> v.channel.get() != null && v.associated != null) 184 .map(v -> v.associated).toList(); 185 } 186 } 187 188 /** 189 * Removes the channel with the given name. 190 * 191 * @param name the name 192 */ 193 public void remove(String name) { 194 synchronized (channels) { 195 channels.remove(name); 196 } 197 } 198 199 /** 200 * Returns all known keys. 201 * 202 * @return the sets the 203 */ 204 public Set<K> keys() { 205 return channels.keySet(); 206 } 207}