RabbitMQ használata Spring Boottal
Amennyiben aszinkron üzenetküldést szeretnénk megvalósítani különböző rendszerek vagy microservice-ek között, a RabbitMQ egy jó választásnak tűnik. Nyílt forráskódú, széleskörben elterjedt, kellően pehelysúlyú, könnyen telepíthető különböző környezetekben, több protokollt és programozási nyelvet is támogat, és könnyen illeszthető Spring Boothoz. Clusterezhető és monitorozható. Ezt fogom bemutatni azzal együtt, hogy hogy lehet unit és integrációst teszteket írni. Ez utóbbihoz a Testcontainers projektet fogom használni, melynek segítségével JUnit tesztesetekből fogok Docker konténert indítani.
A poszthoz a GitHubon egy példaprojekt is tartozik a jtechlog-rabbitmq néven.
Az egyszerűség kedvéért indítsunk el a RabbitMQ-t egy Docker konténerben a következő paranccsal.
docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Ezzel egy olyan konténer indul el. A 5672
-es porton lehet hozzá kapcsolódni, és igénybe
venni az üzenetküldési funkciókat, míg a 15672
-es porton a management felülethez
lehet hozzáférni.
Szintén az egyszerűség jegyében nem két alkalmazás fog kommunikálni egymással, hanem az alkalmazáson belül az egyik komponens küld egy üzenetet és a másik fogadja.
Az üzenetek tárolása a RabbitMQ-n belül is sorokban történik, mint a legtöbb messaging rendszer esetén. Az üzenet küldője a producer, az üzenet fogadója a consumer.
Az alkalmazás felépítését a következő ábra mutatja. Az EmployeesController
hívja az EmployeesService
service-t, mely elküld egy üzenetet az employeees.queue
sorba. Az EmployeesQueueListener
figyeli a sort, és az kerül automatikusan
meghívásra, ha a sorba üzenet érkezik. Ez hívja tovább az EventsService
service-t.
A RabbitMQ-t az AMQP protokollon szólítjuk meg. Az AMQP egy egyszerű, nyílt, platform és programozási nyelv független protokoll, melyen message-oriented middleware-ekhez lehet kapcsolódni. (Szemben pl. a JMS-sel, ami Javaban használható csak.) Szerencsére a protokollt a Spring Boot elrejti előlünk, sokat nem kell vele foglalkozni.
Használatához vegyük fel a ṗom.xml
fájlban a következő függőséget:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Ez tranzítív be fogja hozni a RabbitMQ AMQP kliens könyvtárat is (com.rabbitmq:amqp-client
).
A RabbitMQ érdekessége, hogy kliensből tudunk létrehozni sort a RabbitMQ szerveren. Ehhez adjuk meg a következőt:
@Configuration
public class RabbitMqConfig {
@Bean
public Queue queue() {
return new Queue("employees.queue");
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Ez egyrészt létrehozza a employees.queue
sort a RabbitMQ szerveren, valamint létrehoz egy
Jackson MessageConverter
-t. Ekkor ha küldéskor átadunk egy Java objektumot, azt
automatikusan Jacksonnal JSON-né konvertálja.
A kapcsolódás automatikusan a localhost
címre a 5672
porton történik.
Ezt konfigurálni az application.properties
-ben lehet a spring.rabbitmq.host
és
spring.rabbitmq.port
konfigurációs paraméterek megadásával.
A küldéshez injektáljunk egy RabbitTemplate
-et,
és ennek metódusaival tudunk üzenetet küldeni.
rabbitTemplate.convertAndSend("employees.queue", new EmployeeHasCreatedEvent(command.getName()));
Ahol az első paraméter a sor neve (erre még később visszatérünk, mert nem ilyen egyszerű a helyzet), a második pedig az objektum. Ez az objektum kerül átkonvertálásra JSON formátumba.
Az üzenetküldést úgy tudjuk meghívni, hogy a http://localhost:8080/api/employees
címre
postoljuk a következő JSON-t:
{
"name": "John Doe"
}
Az üzenet fogadásához a metódusra, melyet szeretnénk, hogy az
üzenet fogadásakor meghívásra kerüljön, rá kell tennünk a @RabbitListener
annotációt.
@RabbitListener(queues = "employees.queue")
public void receiveEvent(EmployeeHasCreatedEvent event) {
// ...
}
Az annotáció paraméterekor meg kell adni a sor nevét. A bejövő üzenet tartalmát
Jacksonnal megpróbálja JSON-ből EmployeeHasCreatedEvent
objektummá alakítani.
Az alkalmazás indításakor hozzákapcsolódik a RabbitMQ-hoz, erről a következő log üzenet tájékoztat:
o.s.a.r.c.CachingConnectionFactory :
Created new connection: rabbitConnectionFactory#2954b5ea:0/SimpleConnection@6cd64b3f [delegate=amqp://guest@127.0.0.1:5672/, localPort= 52106]
A RabbitMQ admin felülete elérhető a http://localhost:15672
címen, alapértelmezett felhasználónév/jelszó
a guest/guest
. Bejelentkezve látható a létrehozott sor és hogy egy alkalmazás kapcsolódott hozzá.
A RabbitMQ azonban nem csak point-to-point kommunikációt tesz lehetővé, hanem támogatja a Publish/Subscribe módot is, hogy egy producer több consumernek küld üzenetet. Másik konfiguráció a Work queues, ahol egy sorból több consumer veszi ki az üzenetet, egyszerre csak az egyik. Ezzel gyorsan lehet pl. egy terheléselosztást implementálni. Ezen kívül lehet ún. üzenetirányítást (Routing) is konfigurálni, mely az üzenet valamilyen tulajdonsága (pl. címzés alapján) dobálja szét az üzeneteket a különböző sorok között.
Ezek támogatására egy plusz absztrakciós szintet vezettek be, amivel mindegyik mód
egyszerűen konfigurálható. Az üzenet ugyanis nem egy sorba történik, hanem egy ún.
Exchange-be, ami dobálja tovább az üzeneteket a sorokba. Azonban mivel
támogatja azt is, hogy az üzenet tulajdonsága alapján döntsön, hogy melyik sorba
továbbítsa, ezért az üzenetnek át lehet adni egy Routing Key
-t, mely
ebben segít. Ez az üzenethez tartozik, azt minősíti.
És az előbbi, point-to-point üzenet esetén nem adtuk meg hogy melyik Exchange-be
menjen az üzenet, ezért az ún. Default Exchange-be került átküldésre, mely
úgy működik, hogy abba a sorba továbbítja az üzenetet, mely a Routing Key-be meg van adva.
Tehát az előbb a convertAndSend()
metódus első parmétere nem a sor neve,
hanem a Routing Key, mely jelen esetben megegyezik a sor nevével.
A Binding az a mechanizmus, ami megmondja, hogy az Exchange-ből a Routing Key alapján melyik sorba is kerüljön az üzenet továbbításra.
Ha pl. egy Routingot akarunk megvalósítani, akkor az Exchange-hez orange
Routing Key-jel bindoljuk a Q1
sort, míg a black
Routing Key-jel a
Q2
sort. Így nem kell az alkalmazásnak tudnia, hogy melyik sorba küldje az üzenetet,
csak az üzenetről kell megmondania (küldéskor), hogy az orange
vagy black
.
A küldés unit teszteléséhez csak mockoljuk ki a RabbitTemplate
-et.
@Test
void testSend() {
employeesService.createEmployee(new CreateEmployeeCommand("John Doe"));
verify(rabbitTemplate).convertAndSend(eq("employees.queue"),
argThat((Object e) -> ((EmployeeHasCreatedEvent)e).getName().equals("John Doe")));
}
Itt a rabbitTemplate
Mockitoval mockolt.
A fogadás unit teszteléséhez hívjuk meg a Listener megfelelő metódusát, mintha csak a Spring Boot tette volna. És nézzük meg, hogy megfelelően hív tovább. Ezért érdemes egy Listener osztályba különválasztani az üzenet fogadását, mely kizárólag fogadja, és ha kell konvertálja az üzenetet.
Az integrációs teszteléshez használjuk a Testcontainers projektet. Fel kell venni a következő függőségeket:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
A teszteseten használjuk a @Testcontainers
annotációt.
Egy Docker konténert a következőképp lehet elindítani:
@SpringBootTest
@Testcontainers
public class EmployeesIT {
@Container
static GenericContainer rabbit = new GenericContainer("rabbitmq:3")
.withExposedPorts(5672);
A @Container
annotáció hatására a Testcontainers elindít egy
új Docker konténert a rabbitmq:3
image alapján, és kihozza a 5672
-es
portját. A teszteset lefutása után le is állítja és törli azt.
A probléma abból adódik, hogy egy random szabad portot választ, ahova
az 5672
-es portot kihozza. És ezt át kell adni a Spring Bootnak
induláskor. Ehhez Spring Boot integrációs tesztnél egy ApplicationContextInitializer
interfészt kell implementálni, és az osztályt megadni a @ContextConfiguration
annotáció paramétereként.
@SpringBootTest
@Testcontainers
@ContextConfiguration(initializers = EmployeesIT.Initializer.class)
public class EmployeesIT {
@Container
static GenericContainer rabbit = new GenericContainer("rabbitmq:3")
.withExposedPorts(5672);
public static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
TestPropertyValues values = TestPropertyValues.of(
"spring.rabbitmq.host=" + rabbit.getContainerIpAddress(),
"spring.rabbitmq.port=" + rabbit.getMappedPort(5672)
);
values.applyTo(configurableApplicationContext);
}
}
}
A teszteset felülírja az EventsService
service-t egy mock service-zel (@MockBean
annotációval),
és azt ellenőrzi, hogy a metódusa meghívásra került-e. Igen ám, de itt az az
izgalmas, hogy a hívás nem szinkron történik, ugyanis az üzenet átkerül a RabbitMQ-ba,
és majd az valamikor kézbesíti. Ezt a Mockitonak meg lehet adni, és ő
képes várni a hívás tényére, sőt még timeoutot is tudunk megadni:
@Test
void testSendAndReceive() {
employeesController.createEmployee(new CreateEmployeeCommand("John Doe"));
verify(eventsService, timeout(4000).times(1))
.processEvent(argThat(e -> e.getName().equals("John Doe")));
}
Az említett kód vár maximum 4 másodpercet, amíg az eventsService
processEvent()
metódusa
meghívásra nem kerül.