Danila Rassokhin

📧 Email: danilarassokhin@gmail.com
💻 GitHub: @CrissNamon
📑 LinkedIn: Danila Rassokhin
🐦 Twitter: @KpekepSalt
📚 Medium: @danilarassokhin
📜 Блог

17 January 2023

Уведомления на Spring WebFlux и SSE

by Danila Rassokhin

Иногда может возникнуть необходимость отправлять пользователю данные в реальном времени, например уведомления на Frontend приложении. Для этого можно использовать следующий простой способ с использованием Flux из Project Reactor и ServerSentEvents из Spring.

Для начала необходимо создать класс для хранения уведомления. Для простоты у него будут только два поля: id и message.

public class Notification {

  private final Integer id;
  private final String message;

  public Notification(Integer id, String message) {
    this.id = id;
    this.message = message;
  }

  public Integer getId() {
    return id;
  }
}

Далее создадим сервис, который будет управлять подписками и уведомлениями. Определять, кому принадлежит уведомление будем по id пользователя.

Будем генерировать случайный id для каждого уведомления. Ограничим наш Random в промежутке [0, 3].

private Integer generateId() {
	return RANDOM.nextInt(4);
}

Вместо реальных данных будем генерировать простые уведомления раз в 2 секунды.

private ServerSentEvent<Notification> generateNotification() {
    return ServerSentEvent.<Notification>builder()
        .data(new Notification(generateId(), "Notification"))
        .build();
}

private void generateNotifications(FluxSink<ServerSentEvent<Notification>> sink) {
    Flux.interval(Duration.ofSeconds(2)) // Создаем уведомления каждые 2 секунды
        .map(i -> generateNotification())
        .doOnNext(serverSentEvent -> {
          sink.next(serverSentEvent); // Отправляем уведомления в глобальный Flux через его FluxSink
          log.info("Sent for {}", serverSentEvent.data().getId());
        })
        .doFinally(signalType -> log.info("Notification flux closed")) // Логируем закрытие нашего генератора
        .takeWhile(notification -> !sink.isCancelled()) // Генерируем сообщения пока не закрыт глобальный Flux
        .subscribe();
}

Если мы начнем отправлять уведомления сразу из генератора, то можем столкнуться с проблемой таймаута и разрывом соединения. Сейчас уведомления создаются раз в 2 секунды, но в реальной системе уведомления могут создаваться минуты и даже часы. Если в течение определенного времени (обычно пары минут) в открытое соединение не будет отправлено никаких данных, то оно автоматически закроется.

Чтобы избежать этого, создадим heartbeat - дополнительный поток из пустых комментариев, которые будут отправляться в открытое соединения, чтобы поддерживать его открытым. О его закрытии позаботится сам Spring и закроет наш heartbeat автоматически.

private <T> Flux keepAlive(Duration duration, Flux<T> data, Integer id) {
    Flux<ServerSentEvent<T>> heartBeat = Flux.interval(duration) // Создаем Flux с определенным интервалом
        .map(
            e -> ServerSentEvent.<T>builder() // Создаем новый объект SSE с комментарием и пустым телом
                .comment("keep alive for: " + id)
                .build()
        )
        .doFinally(signalType -> log.info("Heartbeat closed for id: {}", id));
    return Flux.merge(heartBeat, data);
}

Теперь напишем простой метод для подписки на на уведомления по из id.

public Flux<ServerSentEvent<Notification>> subscribe(int id) {
    return keepAlive(Duration.ofSeconds(3),
                     notificationFlux.filter(notification -> notification.data() == null || 
                         notification.data().getId() == id),
                     id);
}

В конструкторе сервиса создадим глобальный Flux.

private final Flux<ServerSentEvent<Notification>> notificationFlux;

public NotificationService() {
    notificationFlux = Flux.push(this::generateNotifications);
}

Теперь создадим RestController и простой endpoint для подписки на уведомления.

@RestController
public class NotificationController {

  private final NotificationService notificationService;

  @Autowired
  public NotificationController(NotificationService notificationService) {
    this.notificationService = notificationService;
  }

  @GetMapping(value = "/subscribe/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<ServerSentEvent<Notification>> subscribe(@PathVariable Integer id) {
    return notificationService.subscribe(id);
  }
}

Если открыть в браузере http://localhost:8080/subscribe/1 можно получить следующий вывод. Теперь можно остановить загрузку страницы или закрыть ее.

:keep alive for: 1

:keep alive for: 1

data:{"id":1,"message":"Notification"}

:keep alive for: 1

:keep alive for: 1

data:{"id":1,"message":"Notification"}

Вывод в консоли, конечно из-за случайных id будет разный. Я получил такой:

Sent for 3
Sent for 3
Sent for 1
Sent for 0
Sent for 3
Sent for 2
Sent for 1
Heartbeat closed for id: 1
Sent for 3
Notification flux closed

Получать уведомления на Frontend приложении можно с помощью встроенных средств JavaScript или с помощью сторонних библиотек. Я использовал sse.js и JQuery

<!DOCTYPE HTML>
<html>
    <head>
        <script src="jquery.min.js"></script>
    </head>
    <body>
        <h1>Watcher</h1>
        <div class="container"></div>
        <script src="sse.js"></script>
        <script>
            window.onload = function () {
                const source = new SSE("http://localhost:8080/subscribe/1");
                source.addEventListener('message', function (e) {
                if (e.data) {
                    const payload = JSON.parse(e.data);
                    $(".container").append('<p>' + payload.action + '</p>')
                }});
                source.stream();
            };
        </script>
    </body>
</html>