RabbitMQ használata Spring Boottal

Amennyiben aszinkron üzenetküldést szeretnénk megvalósítani különböző rendszerek vagy microservice-ek között, a RabbitMQ egy jó választásnak tűnik. Nyílt forráskódú, széleskörben elterjedt, kellően pehelysúlyú, könnyen telepíthető különböző környezetekben, több protokollt és programozási nyelvet is támogat, és könnyen illeszthető Spring Boothoz. Clusterezhető és monitorozható. Ezt fogom bemutatni azzal együtt, hogy hogy lehet unit és integrációst teszteket írni. Ez utóbbihoz a Testcontainers projektet fogom használni, melynek segítségével JUnit tesztesetekből fogok Docker konténert indítani.

A poszthoz a GitHubon egy példaprojekt is tartozik a jtechlog-rabbitmq néven.

Az egyszerűség kedvéért indítsunk el a RabbitMQ-t egy Docker konténerben a következő paranccsal.

docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Ezzel egy olyan konténer indul el. A 5672-es porton lehet hozzá kapcsolódni, és igénybe venni az üzenetküldési funkciókat, míg a 15672-es porton a management felülethez lehet hozzáférni.

Szintén az egyszerűség jegyében nem két alkalmazás fog kommunikálni egymással, hanem az alkalmazáson belül az egyik komponens küld egy üzenetet és a másik fogadja.

Az üzenetek tárolása a RabbitMQ-n belül is sorokban történik, mint a legtöbb messaging rendszer esetén. Az üzenet küldője a producer, az üzenet fogadója a consumer.

Az alkalmazás felépítését a következő ábra mutatja. Az EmployeesController hívja az EmployeesService service-t, mely elküld egy üzenetet az employeees.queue sorba. Az EmployeesQueueListener figyeli a sort, és az kerül automatikusan meghívásra, ha a sorba üzenet érkezik. Ez hívja tovább az EventsService service-t.

Alkalmazás

A RabbitMQ-t az AMQP protokollon szólítjuk meg. Az AMQP egy egyszerű, nyílt, platform és programozási nyelv független protokoll, melyen message-oriented middleware-ekhez lehet kapcsolódni. (Szemben pl. a JMS-sel, ami Javaban használható csak.) Szerencsére a protokollt a Spring Boot elrejti előlünk, sokat nem kell vele foglalkozni.

Használatához vegyük fel a ṗom.xml fájlban a következő függőséget:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Ez tranzítív be fogja hozni a RabbitMQ AMQP kliens könyvtárat is (com.rabbitmq:amqp-client).

A RabbitMQ érdekessége, hogy kliensből tudunk létrehozni sort a RabbitMQ szerveren. Ehhez adjuk meg a következőt:

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue queue() {
        return new Queue("employees.queue");
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

Ez egyrészt létrehozza a employees.queue sort a RabbitMQ szerveren, valamint létrehoz egy Jackson MessageConverter-t. Ekkor ha küldéskor átadunk egy Java objektumot, azt automatikusan Jacksonnal JSON-né konvertálja.

A kapcsolódás automatikusan a localhost címre a 5672 porton történik. Ezt konfigurálni az application.properties-ben lehet a spring.rabbitmq.host és spring.rabbitmq.port konfigurációs paraméterek megadásával.

A küldéshez injektáljunk egy RabbitTemplate-et, és ennek metódusaival tudunk üzenetet küldeni.

rabbitTemplate.convertAndSend("employees.queue", new EmployeeHasCreatedEvent(command.getName()));

Ahol az első paraméter a sor neve (erre még később visszatérünk, mert nem ilyen egyszerű a helyzet), a második pedig az objektum. Ez az objektum kerül átkonvertálásra JSON formátumba.

Az üzenetküldést úgy tudjuk meghívni, hogy a http://localhost:8080/api/employees címre postoljuk a következő JSON-t:

{
    "name": "John Doe"
}

Az üzenet fogadásához a metódusra, melyet szeretnénk, hogy az üzenet fogadásakor meghívásra kerüljön, rá kell tennünk a @RabbitListener annotációt.

@RabbitListener(queues = "employees.queue")
public void receiveEvent(EmployeeHasCreatedEvent event) {
    // ...
}

Az annotáció paraméterekor meg kell adni a sor nevét. A bejövő üzenet tartalmát Jacksonnal megpróbálja JSON-ből EmployeeHasCreatedEvent objektummá alakítani.

Az alkalmazás indításakor hozzákapcsolódik a RabbitMQ-hoz, erről a következő log üzenet tájékoztat:

o.s.a.r.c.CachingConnectionFactory       : 
  Created new connection: rabbitConnectionFactory#2954b5ea:0/SimpleConnection@6cd64b3f [delegate=amqp://guest@127.0.0.1:5672/, localPort= 52106]

A RabbitMQ admin felülete elérhető a http://localhost:15672 címen, alapértelmezett felhasználónév/jelszó a guest/guest. Bejelentkezve látható a létrehozott sor és hogy egy alkalmazás kapcsolódott hozzá.

A RabbitMQ azonban nem csak point-to-point kommunikációt tesz lehetővé, hanem támogatja a Publish/Subscribe módot is, hogy egy producer több consumernek küld üzenetet. Másik konfiguráció a Work queues, ahol egy sorból több consumer veszi ki az üzenetet, egyszerre csak az egyik. Ezzel gyorsan lehet pl. egy terheléselosztást implementálni. Ezen kívül lehet ún. üzenetirányítást (Routing) is konfigurálni, mely az üzenet valamilyen tulajdonsága (pl. címzés alapján) dobálja szét az üzeneteket a különböző sorok között.

Ezek támogatására egy plusz absztrakciós szintet vezettek be, amivel mindegyik mód egyszerűen konfigurálható. Az üzenet ugyanis nem egy sorba történik, hanem egy ún. Exchange-be, ami dobálja tovább az üzeneteket a sorokba. Azonban mivel támogatja azt is, hogy az üzenet tulajdonsága alapján döntsön, hogy melyik sorba továbbítsa, ezért az üzenetnek át lehet adni egy Routing Key-t, mely ebben segít. Ez az üzenethez tartozik, azt minősíti.

És az előbbi, point-to-point üzenet esetén nem adtuk meg hogy melyik Exchange-be menjen az üzenet, ezért az ún. Default Exchange-be került átküldésre, mely úgy működik, hogy abba a sorba továbbítja az üzenetet, mely a Routing Key-be meg van adva. Tehát az előbb a convertAndSend() metódus első parmétere nem a sor neve, hanem a Routing Key, mely jelen esetben megegyezik a sor nevével.

A Binding az a mechanizmus, ami megmondja, hogy az Exchange-ből a Routing Key alapján melyik sorba is kerüljön az üzenet továbbításra.

Ha pl. egy Routingot akarunk megvalósítani, akkor az Exchange-hez orange Routing Key-jel bindoljuk a Q1 sort, míg a black Routing Key-jel a Q2 sort. Így nem kell az alkalmazásnak tudnia, hogy melyik sorba küldje az üzenetet, csak az üzenetről kell megmondania (küldéskor), hogy az orange vagy black.

Exchange

A küldés unit teszteléséhez csak mockoljuk ki a RabbitTemplate-et.

@Test
void testSend() {
    employeesService.createEmployee(new CreateEmployeeCommand("John Doe"));

    verify(rabbitTemplate).convertAndSend(eq("employees.queue"), 
      argThat((Object e) -> ((EmployeeHasCreatedEvent)e).getName().equals("John Doe")));
}

Itt a rabbitTemplate Mockitoval mockolt.

A fogadás unit teszteléséhez hívjuk meg a Listener megfelelő metódusát, mintha csak a Spring Boot tette volna. És nézzük meg, hogy megfelelően hív tovább. Ezért érdemes egy Listener osztályba különválasztani az üzenet fogadását, mely kizárólag fogadja, és ha kell konvertálja az üzenetet.

Az integrációs teszteléshez használjuk a Testcontainers projektet. Fel kell venni a következő függőségeket:

<dependency>
	<groupId>org.testcontainers</groupId>
	<artifactId>testcontainers</artifactId>
	<version>${testcontainers.version}</version>
	<scope>test</scope>
</dependency>

<dependency>
	<groupId>org.testcontainers</groupId>
	<artifactId>junit-jupiter</artifactId>
	<version>${testcontainers.version}</version>
	<scope>test</scope>
</dependency>

A teszteseten használjuk a @Testcontainers annotációt. Egy Docker konténert a következőképp lehet elindítani:

@SpringBootTest
@Testcontainers
public class EmployeesIT {

    @Container
    static GenericContainer rabbit = new GenericContainer("rabbitmq:3")
            .withExposedPorts(5672);

A @Container annotáció hatására a Testcontainers elindít egy új Docker konténert a rabbitmq:3 image alapján, és kihozza a 5672-es portját. A teszteset lefutása után le is állítja és törli azt.

A probléma abból adódik, hogy egy random szabad portot választ, ahova az 5672-es portot kihozza. És ezt át kell adni a Spring Bootnak induláskor. Ehhez Spring Boot integrációs tesztnél egy ApplicationContextInitializer interfészt kell implementálni, és az osztályt megadni a @ContextConfiguration annotáció paramétereként.

@SpringBootTest
@Testcontainers
@ContextConfiguration(initializers = EmployeesIT.Initializer.class)
public class EmployeesIT {
  
  @Container
  static GenericContainer rabbit = new GenericContainer("rabbitmq:3")
          .withExposedPorts(5672);

  public static class Initializer implements
              ApplicationContextInitializer<ConfigurableApplicationContext> {
          @Override
          public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
              TestPropertyValues values = TestPropertyValues.of(
                      "spring.rabbitmq.host=" + rabbit.getContainerIpAddress(),
                      "spring.rabbitmq.port=" + rabbit.getMappedPort(5672)
              );
              values.applyTo(configurableApplicationContext);
          }
      }
}

A teszteset felülírja az EventsService service-t egy mock service-zel (@MockBean annotációval), és azt ellenőrzi, hogy a metódusa meghívásra került-e. Igen ám, de itt az az izgalmas, hogy a hívás nem szinkron történik, ugyanis az üzenet átkerül a RabbitMQ-ba, és majd az valamikor kézbesíti. Ezt a Mockitonak meg lehet adni, és ő képes várni a hívás tényére, sőt még timeoutot is tudunk megadni:

@Test
void testSendAndReceive() {
    employeesController.createEmployee(new CreateEmployeeCommand("John Doe"));

    verify(eventsService, timeout(4000).times(1))
      .processEvent(argThat(e -> e.getName().equals("John Doe")));
}

Az említett kód vár maximum 4 másodpercet, amíg az eventsService processEvent() metódusa meghívásra nem kerül.