Simple CacheVerticle design idea
As a simple cache, the first field you should have is a Map<String,Any> that represents a cache of key-value pairs
In order not to pay extra cost of thread synchronization to operate cache, we can deploy it in Verticle under a worker model and expose the API of cache operation, such as adding and deleting, externally based on EventBus
Now you can write pseudo code like this
class CacheVerticle:AbstractVerticle(){
private val cache = mutableMapOf<String,Any>()
override fun start() {
val eventbus = vertx.eventBus()
eventbus.localConsumer(GET_CACHE_ADDRESS){//todo}
eventbus.localConsumer(ADD_CACHE_ADDRESS){//todo}
eventbus.localConsumer(REMOVE_CACHE_ADDRESS){//todo}
}
}
Copy the code
The next step is to complete the handler content for localConsumer
The add method
Because Kotlin contains an infix function to that is more intuitive to use, I decided to design the exposed API as
fun <T> addCache(pair: Pair<String,T>) = Vertx.currentContext().owner().eventBus().send(ADD_CACHE_ADDRESS,pair)
Copy the code
Just addCache(” key “to 012)
Then we can write an EventBus MessageCodec for the Pair and pass it over
In this video, I designed a wrapper class AddWrapper
(val addPair:Pair
). There is no need for wrapper. Just write a MessageCodec for the Pair and register it with the evenbus instance
So just implement it that way
eventbus.localConsumer<AddWrapper<Any>>(ADD_CACHE_ADDRESS){
val add = it.body()
cache[add.first] = add.second
}
Copy the code
The remove method
Vertx itself provides String MessageCodec, so it can be passed directly
eventbus.localConsumer<String>(REMOVE_CACHE_ADDRESS){
val key = it.body()
cache.remove(key)
}
Copy the code
The get method
Since objects placed in the cache do not necessarily have corresponding MessageCodec, it is not practical to customize a MessageCodec for each of them.
Object::getClass is used as the key to get the registered CODEC. If it does not get the registered CODEC, an exception will be thrown. Both Java lambda and KT lambda are essentially subclasses of a runtime interface, so there is no way to provide MessageCodec for lambda
So my solution is to wrap a layer around it, with all kinds of variables as its contents. The getClass method of the outer wrapping Class will always return the same Class, so just register the MessageCodec of the enclosing Class
Since I was designed to be thread-safe not only for reading and writing to the entire cache (map instance), but also for the operations on the objects placed in it (such as modifying properties), I set the contents as keys, and a handler for values (which can use Pair
Unit passed on eventBus)
This handler will run directly on the thread where the cache verticle resides, so there is no need to worry about thread insecurity if the value of the same cache is changed concurrently. Simple changes to the cache contents are atomized, and the performance of the cache will be affected if there are time-consuming operations
The exposed API is
fun <T> getCache(key:String, handler: (T?). ->Unit){
Vertx.currentContext().owner().eventBus().send(GET_CACHE_ADDRESS,key to handler)
}
Copy the code
Use a T? The type reason is to deal with the case where the key does not exist
eventbus.localConsumer<Pair<String,(Any?) ->Unit>>(GET_CACHE_ADDRESS){
val body = it.body()
body.second(cache[body.first])
}
Copy the code
Attach full implementation
class CacheVerticle: AbstractVerticle() {companion object{
private const val GET_CACHE_ADDRESS = "get_cache_address"
private const val ADD_CACHE_ADDRESS = "add_cache_address"
private const val REMOVE_CACHE_ADDRESS = "remove_cache_address"
fun <T> addCache(pair: Pair<String,T>) = Vertx.currentContext().owner().eventBus().send(ADD_CACHE_ADDRESS,pair)
fun <T> getCache(key:String, handler: (T?). ->Unit){
Vertx.currentContext().owner().eventBus().send(GET_CACHE_ADDRESS,key to handler)
}
fun remove(key:String){
Vertx.currentContext().owner().eventBus().send(REMOVE_CACHE_ADDRESS,key)
}
}
private val cache = mutableMapOf<String,Any>()
override fun start(a) {
val eventbus = vertx.eventBus()
eventbus.registerDefaultCodec(Pair::class.java,UnModifiableObjectCodec(Pair::class.java)) eventbus.localConsumer<Pair<String,(Any?) ->Unit>>(GET_CACHE_ADDRESS){
val body = it.body()
body.second(cache[body.first])
}
eventbus.localConsumer<Pair<String,Any>>(ADD_CACHE_ADDRESS){
val add = it.body()
cache[add.first] = add.second
}
eventbus.localConsumer<String>(REMOVE_CACHE_ADDRESS){
val key = it.body()
cache.remove(key)
}
}
class UnModifiableObjectCodec<T>(private val msgClass:Class<T>):MessageCodec<T,T>{
override fun encodeToWire(buffer: Buffer? , s:T) {
TODO("Not yet implemented")}override fun decodeFromWire(pos: Int, buffer: Buffer?).: T {
TODO("Not yet implemented")}override fun transform(s: T)=s
override fun name(a)=msgClass.name
override fun systemCodecID(a): Byte {
return -1}}}Copy the code
Method of use
class NormalVerticle:AbstractVerticle() {override fun start(a) {
CacheVerticle.addCache("string" to UUID.randomUUID())
CacheVerticle.getCache<UUID>("string") { it? .let { println(it) } } CacheVerticle.remove("string")}}Copy the code