ThreadLocal, Project Reactor Context és a Java 25 ScopedValue

Előfordul, hogy az egyik metódusból értéket kell átadnunk egy másik metódusnak anélkül, hogy azt paraméterként adnánk át. Lehet ez amiatt, mert a két metódus között nincs közvetlen hívás, és nincs ráhatásunk a közbülső metódusokra.

Ilyen lehet pl. a bejelentkezett felhasználó, a tranzakció, vagy a log MDC. Ezeket gyakran környezetnek (contextnek) nevezik, pl. Security Context, Transaction Context, Mapped Diagnostic Context.

Másképp kell ezt megoldani Servlet API-ra épülő megoldásnál (ThreadLocal), pl. Spring MVC-nél, és máshogy reaktív módon, pl. a Project Reactorra épülő Spring WebFlux esetén (context). Sőt, a 25-ös Javaban megjelent a ScopedValue osztály a ThreadLocal osztály kiváltására.

A poszthoz tartozó példaprogram elérhető a GitHubon.

A poszt még két videót tartalmaz.

ThreadLocal használata

A Servlet API-ra épülő keretrendszerek, pl. a Spring MVC esetén kihasználható, hogy egy HTTP kérést egy szál szolgál ki. Pl. a Spring is szálhoz kapcsolja a bejelentkezett felhasználót, a tranzakciót, vagy a JPA persistence context-et. Sőt, ha request scope-pal definiálunk egy bean-t, az is ehhez a szálhoz kerül letárolásra. A szálat indító http kérést is bárhol le tudjuk kérdezni.

Erre megoldás a ThreadLocal osztály, mely egy olyan adatstruktúra, mely lehetővé teszi, hogy szálanként különböző értékeket tároljunk.

Sőt, a ThreadLocal használható akkor is, ha egy osztály nem szálbiztos, viszont létrehozása túl költséges ahhoz, hogy minden használat esetén újra példányosítsuk. Ilyen például a Random osztály, melyhez kapcsolódik egy ThreadLocalRandom osztály is.

A ThreadLocal elsősorban keretrendszer fejlesztőknek hasznos, de ismeretével jobban megérthetők az említett mechanizmusok.

Deklarálok két metódust, a hívó neve legyen processOrder(), a hívott neve saveOrder(). Az előbbi generál egy véletlen UUID-t, melyre a másodiknak szüksége van.

A metódusokat három szálon hívom párhuzamosan. Ehhez létrehozok egy thread pool-t, és három Callable példányt. Ahhoz, hogy a hívások különböző sorrendben történjenek, teszek bele egy véletlen várakozást.

A 19-es Java-tól az ExecutorService implementálja az AutoCloseable interfészt.

Majd bevezetem a ThreadLocal-t, melyben a processOrder() elhelyezi az értéket. A saveOrder() pedig kiveszi.

Lefuttatva ugyanahhoz az szálhoz ugyanaz a request id kapcsolódik.

@Slf4j
public class ThreadLocalApplication {

    private final Random random = new Random();

    private final ThreadLocal<String> requestId = new ThreadLocal<>();

    static void main() {
        new ThreadLocalApplication().run();
    }

    @SneakyThrows
    private void run() {
        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
            executor.invokeAll(
                    IntStream.range(0, 3)
                            .mapToObj(i -> Executors.callable(this::processOrder))
                            .toList()
            );
        }
    }

    @SneakyThrows
    private void processOrder() {
        String id = UUID.randomUUID().toString();
        log.info("process: {}, {}", id, requestId.get());
        requestId.set(id);
        Thread.sleep(random.nextInt(1000));
        saveOrder();
    }

    private void saveOrder() {
        String id = requestId.get();
        log.info("save: {}", id);
        // requestId.remove(); // Thread pool miatt
    }
}

Thread pool használata esetén, mivel a szálakat újrafelhasználja, a szálhoz tartozó érték megmarad.

Ha leveszem a thread pool méretét 2-re, akkor látszik, hogy egy korábbi szál értéke benne marad a ThreadLocal-ban. Ezért mindenképp figyelni kell arra, hogy a munka végeztével ürítsük a ThreadLocal értékét. Ez a remove() metódussal érhető el.

A ThreadLocal érdekessége, hogy úgy működik, hogy a Thread osztályban van egy ThreadLocal.ThreadLocalMap threadLocals attribútum, mely a ThreadLocal-hoz tárolja egy egyedi map adatszerkezetben az értéket.

Context használata Project Reactorban

Project Reactor esetén is tudunk az egyik metódusból értéket átadni a másik metódusnak anélkül, hogy ezt paraméterként deklarálnánk.

Itt azonban nem támaszkodhatunk a szálakra, ugyanis a Project Reactor concurrency-agnostic, ami azt jelenti, hogy nem erőltet ránk semmilyen párhuzamossági modellt. Ezért elképzelhető, hogy bizonyos operátorok, mint például a flatMap, publishOn, subscribeOn és delayElement operátorok, szálat váltanak. Sőt, a Project Reactorra épülő Spring WebFlux minden kérést magonként ugyanazon a szálon szolgál ki.

A Project Reactor 3.1.0 verziójában jelent meg a Context, mely egy Map-szerű adatszerkezet, mely streamenként képes különböző értékeket tárolni, melyekhez az operátorok hozzáférhetnek.

Deklarálok két metódust, a hívó neve legyen processOrder(), a hívott neve saveOrder(). Az előbbi generál egy véletlen UUID-t, melyre a másodiknak szüksége van.

A contextWrite() metódussal írok egy értéket, amit a deferContextual() metódus hívásával tudom kiolvasni.

@Slf4j
public class ReactiveContextApplication {

    static void main() {
        new ReactiveContextApplication().run();
    }

    private void run() {
        Flux.range(0, 3)
                        .flatMap(i -> processOrder())
                                .subscribe();
    }

    private Mono<Void> processOrder() {
        String id = UUID.randomUUID().toString();
        log.info("process: {}", id);
        return saveOrder()
                .contextWrite(Context.of("id", id));
    }

    private Mono<Void> saveOrder() {
        return Mono.deferContextual(ctx -> {
            log.info("save: {}", ctx.get("id").toString());
            return Mono.empty();
        });
    }

}

Java 25 ScopedValue

A Java 25-ben jelent meg a ScopedValue osztály, melyet a ThreadLocal leváltására vezettek be. A cél itt is az, hogy az egyik metódus értéket tudjon átadni a másik metódusnak anélkül, hogy paraméterként definiálni kéne.

A ScopedValue előnye, hogy jobban illeszkedik a structured concurrency-hez, és a virtual threadekhez, valamint az érték módosíthatatlan, és a hívás után automatikusan eltűnik, nem kell explicit módon takarítani.

Ez a háttérben szintén a szálakra épül.

Deklarálok két metódust, a hívó neve legyen processOrder(), a hívott neve saveOrder(). Az előbbi generál egy véletlen UUID-t, melyre a másodiknak szüksége van.

A ScopedValue.where hívásnál tárolom le az értéket, melyet a get() metódussal tudom kiolvasni.

@Slf4j
public class ScopedValueApplication {

    private final Random random = new Random();

    private final ScopedValue<String> requestId = ScopedValue.newInstance();

    static void main() {
        new ScopedValueApplication().run();
    }

    @SneakyThrows
    private void run() {
        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
            executor.invokeAll(IntStream.range(0, 3)
                    .mapToObj(i -> Executors.callable(this::processOrder))
                    .toList()
            );
        }
    }

    @SneakyThrows
    private void processOrder() {
        String id = UUID.randomUUID().toString();
        log.info("process: {}", id);
        Thread.sleep(random.nextInt(1000));
        ScopedValue.where(requestId, id).run(this::saveOrder);
    }

    private void saveOrder() {
        String id = requestId.get();        
        log.info("save: {}", id);
    }
}