Programming/Spring

[Spring] SSeEitter 사용하기 SSE(Server-Sent-Event)

rw- 2024. 11. 29. 16:13
728x90

SE(Server-Sent-Event)

SSE  정의에 따르면, 웹 애플리케이션이 단방향 이벤트 스트림을 처리하고 서버가 데이터를 방출할 때마다 업데이트를 수신할 수 있도록 하는 http 표준입니다. 간단히 말해서, 단방향 이벤트 스트리밍을 위한 메커니즘입니다.

전통적으로 웹 페이지는 서버에 요청을 보내서 새로운 데이터를 수신해야 합니다. 즉, 페이지가 서버에 데이터를 요청하는 것입니다. 서버에서 보낸 이벤트를 사용하면 서버가 웹 페이지에 메시지를 푸시하여 언제든지 웹 페이지에 새로운 데이터를 보낼 수 있습니다.

 

 

 

Javascript에서 SSE 연결하기

/connect 를 호출하여 EventSource 객체로 SSE 연결을 시도한다.

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<!DOCTYPE html>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>Hello Spring</title>
    <script src="/js/jquery-1.8.3.min.js"></script>
</head>
<body>
    <h1>Server-Sent Events (SSE) Example</h1>
    <input type="text" id="userId" value="test@gm.com"/>
    <button type="button" onClick="connect()">connect</button>
    <button type="button" onClick="disconnect()">disconnect</button>
    <div id="messages"></div>

    <script type="text/javascript">

    var eventSource = "";

    function connect() {
        let userId = $("#userId").val();
        let connectUrl = "/connect/" + userId;
        // EventSource 객체로 SSE 연결
        eventSource = new EventSource(connectUrl);

        // 이벤트가 발생할 때마다 메시지를 처리하는 이벤트 리스너
        eventSource.addEventListener('message', function(event) {
            console.log(event);
            // data으로 설정한 값 받기
            const data = JSON.parse(event.data);
            console.log(data);
            var message = event.data;
            var messagesDiv = document.getElementById('messages');
            var newMessage = document.createElement('p');
            newMessage.textContent = message;
            messagesDiv.appendChild(newMessage);
        });

        // 연결 오류 처리
        eventSource.onerror = function(event) {
            console.error('SSE connection error', event);
        };
    }

    function disconnect() {
      //eventSource.close();
      console.log(eventSource);
      console.log('connection is closed');
    }
    </script>
</body>
</html>

 

 

NotificationDTO 작성

@Getter
@Setter
public class NotificationDTO {
  String eventId;
  String data;
}

 

 

Controller 작성

connect: client와 sse 연결

/snedAll: 호출 시 연결되어 있는 모든 eventSource에 SEND

/sendToJson: 호출 시 특정 eventId 값을 가지고 있는 eventSource에 SEND

@RestController
public class SseController {

  @Autowired
  NotificationService notificationService;

  @GetMapping(value = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  @ResponseBody
  public SseEmitter connect(@PathVariable String userId, @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId)
      throws IOException, InterruptedException {
    return notificationService.connect(userId, lastEventId);
  }

  @PostMapping(value = "/sendAll")
  @ResponseBody
  public void send(@RequestBody NotificationDTO notificationDTO) {
    notificationService.send(notificationDTO);
  }

  @PostMapping(value = "/sendToJson")
  @ResponseBody
  public void sendToJson(@RequestBody NotificationDTO notificationDTO) throws IOException {
    notificationService.sendToJson(notificationDTO);
  }
}

 

 

Service 작성

@Service
@RequiredArgsConstructor
@Log4j2
public class NotificationService {

  static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

  public SseEmitter connect(final String userId, final String lastEventId)
      throws IOException, InterruptedException {
    // 매 연결마다 고유의 이벤트 ID를 부여
    String eventId = userId + "_" + System.currentTimeMillis();

    // SseEmitter 인스턴스 생성 후 Map에 저장
    SseEmitter emitter = new SseEmitter(System.currentTimeMillis());
    sseEmitterMap.put(eventId, emitter);

    // 이벤트 전송 시
    emitter.onCompletion(() -> {
      log.info("onCompletion callback");
    });

    // 이벤트 스트림 연결 끊길 시
    emitter.onTimeout(() -> {
      log.info("onTimeout callback");
      emitter.complete();
    });

    // 첫 연결 시 503 Service Unavailable 방지용 더미 Event 발송
    NotificationDTO notificationDTO = new NotificationDTO();
    notificationDTO.setEventId(eventId);
    notificationDTO.setData("알림 서버 연결 성공. [userId: " + userId + "]");
    emitter.send(SseEmitter.event().name("message").id(eventId).data(notificationDTO));
    return emitter;
  }

  public void send(NotificationDTO notificationDTO) {
    String eventId = notificationDTO.getEventId();

    sseEmitterMap.forEach((key, emitter) -> {
      try {
        emitter.send(SseEmitter.event().name("message").id(eventId).data(notificationDTO));
      } catch (Exception e) {
        log.error("Failed to send notification", e);
      }
    });
  }

  public void sendToJson(NotificationDTO notificationDTO) throws IOException {
    String eventId = notificationDTO.getEventId();
    String data = notificationDTO.getData();
    try {
      SseEmitter emitter = sseEmitterMap.get(eventId);
      emitter.send(SseEmitter.event().name("message").id(eventId).data(notificationDTO));
    } catch (IOException e) {
      throw new RuntimeException("알림 서버 연결 오류");
    }
  }
}

 

 

참고
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
https://grapeup.com/blog/how-to-build-real-time-notification-service-using-server-sent-events-sse/#
https://afuew.tistory.com/22#⭐ Repository-1
https://blablacoding.tistory.com/101

 

728x90
반응형