본문 바로가기

project

[spring] Websocket Stomp 두번의 프로젝트 회고

 

의도치 않게 두번의 프로젝트 안에서 실시간 처리를 담당하게 되면서

Spring 프로젝트에서 Websocket Stomp를 적용해본 트러블 슈팅 과정

 

세세하게 적어보겠습니다 !

 


1. 플레이리스트 공유 플랫폼 프로젝트

 

 

방에 들어간 이후

✓ 오른쪽에 있는 플레이리스트 재생 버튼을 누르면 -> 플레이리스트에 있는 노래가 재생되고 -> 같은 방에 있는 인원들은 실시간 채팅이 가능하게 하는 구현을 맡았었습니다.

 

 

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws") // apic 접속 url ->  ws://localhost:8080/ws
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setUserDestinationPrefix("/sub"); //message 받을때 subscriber
        registry.setApplicationDestinationPrefixes("/pub"); //message 보낼때 publisher
    }

}

 

✓ WebSocket Broker Config 설정은 다음과 같이 크게 어렵지 않습니다.

   

   1. implementsWebSocketMessageBrokerConfigurer

- 웹 소켓 연결을 구성하기 위한 메서드를 구현하고 제공합니다.

 

    2. registerStompEndpoints 메서드

- 클라이언트가 웹 소켓 서버에 연결하는 데 사용할 웹 소켓 엔드 포인트를 등록합니다.

  엔드포인트 구성에 withSockJS ()를 사용합니다.

  SockJS는 웹 소켓을 지원하지 않는 브라우저에 폴백 옵션(어떤 기능이 약해지거나 제대로 동작하지 않을 때, 이에 대처하는 기능 또는 동작)을 활성화하는 데 사용됩니다.

 

STOMP를 사용하는 이유? 
WebSocket은 통신 프로토콜 일뿐입니다. 특정 주제를 구독한 사용자에게만 메시지를 보내는 방법 또는 특정 사용자에게 메시지를 보내는 방법과 같은 내용은 정의하지 않습니다. 이러한 기능을 위해서는 STOMP가 필요합니다.

   

    3. configureMessageBroker 메서드

- 한 클라이언트에서 다른 클라이언트로 메시지를 라우팅 하는 데 사용될 메시지 브로커를 구성하고 있습니다.

 

   4. registry.setApplicationDestinationPrefixes("/sub");

- "/sub" 시작되는 메시지가 message-handling methods으로 라우팅 되어야 한다는 것을 명시합니다.

 

    5. registry.enableSimpleBroker("/pub");

"/pub" 시작되는 메시지가 메시지 브로커로 라우팅 되도록 정의합니다.

  메시지 브로커는 특정 주제를 구독 한 연결된 모든 클라이언트에게 메시지를 broadcast 합니다.

해당 메세지를 어느 주소로 보낼지 prefix를 원하는 것 지정해주시고 convertAndSend해서 메세지를 발행할 Destination Prefix도 configureMessageBroker 메소드에서 지정해주시면 끝입니다. 

 

// 유저 퇴장 시에는 EventListener 을 통해서 유저 퇴장을 확인
@EventListener
public void webSocketDisconnectListener(SessionDisconnectEvent event) {
    log.info("DisConnEvent {}", event);

    StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

    String memberName = (String) headerAccessor.getSessionAttributes().get("MemberName");
    String roomId = (String) headerAccessor.getSessionAttributes().get("roomId");

    // 채팅방 유저 -1
    ChatRoom room = chatService.findVerifiedRoomId(roomId);
    room.getUserlist().remove(memberName);
    chatRoomRepository.save(room);

    log.info("headAccessor {}", headerAccessor);

    ChatMessage chatMessage = ChatMessage.builder()
            .type(ChatMessage.MessageType.LEAVE)
            .memberName(memberName)
            .roomId(roomId)
            .message(memberName + "님 퇴장하셨습니다.")
            .build();

    template.convertAndSend("/sub/chat/room/" + chatMessage.getRoomId(), chatMessage);
}

 

소켓에서는 SessionConnectEvent와 SessionDisconnectEvent 가 발생하는데 @EventListener 어노테이션이 적용되어 있는 메서드의 인수로 SessionConnectedEvent SessionDisconnectEvent를 받으시면 됩니다.

 

✓ 여기에서 이슈가 발생했는데  

브라우저에서 메세지 전달 즉 웹소켓이 브라우저에서 3분이 지나면 websocket.onclose 함수가 지속적으로 호출되고 있었다는 점입니다.

 

사용자가 채팅방에 입장한 후 다른 탭으로 옮겨가거나 채팅방안에서 아무런 활동을 하지 않으면 3분 마다 웹소켓이 자동으로 끊어지고 있었는데요.

 

 

 

이 끊김을 해결하고자 Socket class를 살펴보니 endpoint와 timeout 설정하는게 있어서 

 

 

아래와 같이 저희 인스턴스의 ip와 port, 4000초로 timeout을 설정해줬지만 해결되지 않았습니다. 

 

다시 살펴보니 해당 connect 메서드에서 timeout는 연결 시도에 대한 타임아웃 값을 나타내는 것 같았어요.

 

연결 시도가 해당 시간 내에 성공하지 않으면 SocketTimeoutException 예외가 발생하는

 

즉, timeout 값은 연결 시도의 최대 지속 시간을 나타내는거라 유휴 제한 시간을 늘리고 싶었던 저에게는 솔루션이 되지 못했습니다 !

 

 

✓ 더 찾아보니 ALB를 통해 유휴 제한 시간을 직접 늘려주는 방향이 있었습니다. 

 

기본 설정이 60초인데 60초마다 통신하지 않는 클라이언트의 소켓을 끊어버리기 때문에 지속적인 SessionDisconnectEvent 가 발생했던거죠.

 

4000 초로 늘리니 더이상 해당 이슈가 발생하지 않았습니다. 

 


2. 원하는 일을 하며 삶을 사랑하자! 대화 및 피드백 통계 서비스 134talk 프로젝트

 

 

 

두번째 프로젝트는 온라인/오프라인 워크샵에서 쓰이는 대화 및 통계 피드백 서비스를 구현한 프로젝트인데요

✓ 여기에서는 기존 rest api로 진행했던 콜을 모두 Websocket Stomp로 처리해주는 여러번의 리팩토링이 있었습니다.

 

 

 

 

키워드 선택이나 질문 순서 등록은 충분히 rest api로 진행해도 되지 않을까 생각했었지만

 

그렇게 되면 실시간 처리를 세세하게 관리할 수가 없었습니다.

 

 

대화방 소켓 연결

 

실시간 처리라고 언급한 것은 이 방을 나갔다가 다시 들어올 경우 "type" 필드를 'RE_ENTER'로 넘겨줘야 했고 "socketFlag"는 총 6단계를 설정했는데 아래와 같습니다.


 

//0 → 첫 대기화면 진입

//1 → 자기소개 모두 참여 (activeFlag 모두 true로 판별), 1 response return + activeFlag 다 false로 초기화

//2 → 자기소개 화면

//3 → (activeFlag 모두 true로 판별)


 

자기 자신만 자기소개를 완료하는 것만 프론트 단에서 처리해주면 될 경우 rest api로 충분히 구현 가능하지만,

저희는 다른 사람이 자기소개를 완료했다는 것도 실시간으로 다 받아서 모두의 화면에 동일하게 뿌려줘야 했기 때문에 socketFlag 라는 변수를 만들어 모든 step을 스탬프 찍듯이 기록하며 프론트단과 데이터를 주고 받았습니다.

 

 

✓ 아 그리고 또 입장 | 재입장 시

해당 사용자가 어느 단계까지 process를 마쳤는지 socketFlag 번호로 단계를 확인 후 화면 처리도 해줘야 했기 때문에 flag의 역할이 중요했습니다.

 

 

키워드 선택

 

키워드 선택 또한 키워드 선택 -> 질문 카드 순서 등록 -> 모든 사용자들의 순서 등록 완료했을때의 socketFlag를 아래와 같이 구분해서 명시했습니다.


 

//4 → 키워드 선택 완료

//5 → 질문 카드 순서 등록

//6 → 모든 채팅방 안에 사용자들 카드 순성 등록 완료 (allRegistered true일때 )


처음에는 사용자 개인별 rest 콜을 하면 되지 않을까 했지만 그렇게 되면 누가 키워드 선택까지 완료했고, 누구는 질문 카드 순서 등록까지 완료했고를 다 파악하기 어려웠기 때문에,

모든 step 에 flag 를 명시하고 화면 처리를 해줘야 했습니다.

 

아무래도 단순 채팅 구현이 아닌 단계별 처리를 실시간으로 화면에 반영했기 때문에 구현에 까다로움이 있었던 것 같습니다 !

 

 

그럼 이제 websocket config 쪽으로 넘어가서 코드 리팩토링 과정을 남겨볼게요 !
@Configuration
@RequiredArgsConstructor
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(1);
        taskScheduler.initialize();
        return taskScheduler;
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOrigins(
                        "http://localhost:3000",
                        "https://134.works"
                )
                .setAllowedOriginPatterns("*")
                .addInterceptors(new HttpHandShakeInterceptor())
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/sub")
                .setTaskScheduler(taskScheduler())
                .setHeartbeatValue(new long[]{10000, 10000});
        registry.setApplicationDestinationPrefixes("/pub");
    }

    /**
     * Web socket transport.
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(160 * 64 * 1024);    // default : 64 * 1024
        registration.setSendTimeLimit(20 * 10000);            // default : 10 * 10000
        registration.setSendBufferSizeLimit(10 * 512 * 1024); // default : 512 * 1024
    }

}

 

첫번째 프로젝트에서 사용한 WebSocketBrokerConfig에서

  • .addInterceptors(new HttpHandShakeInterceptor())로 handshake interceptor를 MessageBroker에 추가한 부분이랑
  • configureWebSocketTransport 메서드를 오버라이드한 부분이 조금 추가되었는데요. 

 

✓ ConfigureWebSocketTransport 

  1. setMessageSizeLimit(160 * 64 * 1024): WebSocket 메시지 크기 제한을 설정합니다. 이 설정은 WebSocket 메시지의 최대 크기를 나타내며, 여기서는 160MB로 설정되어 있습니다. 이 크기를 초과하는 메시지가 수신되면 WebSocket 연결이 닫힐 수 있습니다. 메시지 크기를 제한함으로써 서버 리소스 과부하를 방지할 수 있습니다.
  2. setSendTimeLimit(20 * 10000): WebSocket 메시지 송신 시간 제한을 설정합니다. 이 설정은 메시지를 송신하는 데 걸리는 최대 시간을 나타냅니다. 여기서는 20초로 설정되어 있으며, 이 시간 내에 메시지를 송신하지 못하면 송신 시간 초과 예외가 발생할 수 있습니다. 이 설정을 통해 장시간 메시지 전송으로 인한 블록을 방지할 수 있습니다.
  3. setSendBufferSizeLimit(10 * 512 * 1024): WebSocket 송신 버퍼 크기 제한을 설정합니다. 이 설정은 WebSocket 메시지를 송신하는 데 사용되는 버퍼의 최대 크기를 나타냅니다. 여기서는 10MB로 설정되어 있으며, 이 크기를 초과하는 메시지를 송신하려고 시도하면 송신 버퍼 초과 예외가 발생할 수 있습니다. 이 설정을 통해 과도한 메모리 사용을 방지할 수 있습니다.

 

socket 자체가 주기적으로 끊기고 default limit이 있기 때문에 default 설정보다 조금 늘려서 설정했습니다.

 

✓ 다음으로는 HttpHandShakeInterceptorSession Handler를 생성해줬는데요.

 

웹 소켓이 연결이 되었을 때, 연결 요청에 대한 전처리 및 후처리를 담당하는 핸드셰이크 인터셉터를 만들어 요청의 세션 정보를 같이 넘겨주도록 하는 class 입니다.

 

 

@Component
@Slf4j
public class SessionHandler extends StompSessionHandlerAdapter {

    private StompSession session;

    /**
     * exception
     *
     * @param session   the session
     * @param command   the command
     * @param headers   the headers
     * @param payload   the payload
     * @param exception the exception
     */
    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        log.error("exception occured -> Reason : {}", exception.getMessage());
        log.error("stomp headers : [{}], Payload : [{}]", headers, new String(payload));
    }

    /**
     * subscribed url
     */
    public void subscribe(String destination) {
        session.subscribe(destination, this);
        log.debug("[ {} ] subscribed", destination);
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        if (headers.getDestination().equals("/pub/enter")) {
            return ChatEnterResponseDto.class;
        } else if (headers.getDestination().equals("/pub/select/keyword")) {
            return SocketFlagResponseDto.class;
        } else if (headers.getDestination().equals("/pub/question-order")) {
            return AllRegisteredDto.class;
        }
        return Object.class;
    }

}

 

도입하게 된 이유

 

이번 프로젝트에서의 소켓의 이유 없는 끊김이 기본 Websocket request-response 로그 필터로 찍히지 않았고,

저번 프로젝트에서 ALB로 유휴 제한 시간을 늘려줘서 해결하기에는 이번에는 ALB를 따로 생성하지 않았기 때문에

Stomp 세션의 에러 원인을 세션 연결 전처리 및 후처리를 담당하는 Session Handler Filter (+SocketEventListener)를 만들고 HealthCheck Api를 통해 프론트 측과 세션 끊어짐을 정기적으로 감지 가능하게 하여 능동적인 대응을 해봤습니다.

 

 

@Component
@Slf4j
@RequiredArgsConstructor
public class SocketEventListener {
    private final ChatService chatService;

    private static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

    /**
     * Handle session connected events.
     *
     * @param event the event
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectedEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
        //GenericMessage msg = (GenericMessage) headerAccessor.getMessageHeaders().get("simpConnectMessage");

        log.info("Received a new web socket connection. Session ID : [{}]", headerAccessor.getSessionId());
    }

    /**
     * Handle session disconnected events.
     *
     * @param event the event
     */
    @EventListener
    public void handleWebSocketDisConnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String sessionId = findBrowserSessionId(headerAccessor.getSessionId());
        if(sessionId != null) {
            sessionMap.remove(headerAccessor.getSessionId());
        }

        Optional<Object> userIdOpt = Optional.ofNullable(headerAccessor.getSessionAttributes().get("userId"));
        Optional<Object> roomIdOpt = Optional.ofNullable(headerAccessor.getSessionAttributes().get("roomId"));
        if (userIdOpt.isPresent() && roomIdOpt.isPresent()) {
            long userId = (Long) userIdOpt.get();
            long roomId = (Long) roomIdOpt.get();
            chatService.disconnectUserSetFalse(userId, roomId);
            log.info("verify the user changed to false :: {}", chatService.userStatus(userId, roomId));
        }

        log.info("Web socket session closed. Message : [{}]", event.getMessage());
    }

    /**
     * Find session id by session id.
     *
     * @param sessionId
     * @return
     */
    public String findBrowserSessionId(String sessionId) {
        String session = null;

        for (Map.Entry<String, WebSocketSession> entry : sessionMap.entrySet()) {
            if (entry.getKey().equals(sessionId)) {
                session = entry.getKey();
            }
        }

        return session;
    }


    public MessageHeaders createHeaders(SimpMessageHeaderAccessor headerAccessor, long userId, long roomId) {
        headerAccessor.getSessionAttributes().put("userId", userId);
        headerAccessor.getSessionAttributes().put("roomId", roomId);
        headerAccessor.setLeaveMutable(true);

        log.info("Header Accessor log info :: {}", headerAccessor.getSessionAttributes());
        return headerAccessor.getMessageHeaders();
    }

}

 

 

SocketEventListener class도 따로 생성을 해줘서 코드 리팩토링을 할 수 있었는데요.

 

✓ 여기서 sessionMap 에 관해서 동기화 관련 리팩토링 작업을 한번 진행해줬는데요. 

 

 

lielocks로 작성한 사람이 저입니다!

 

 

해당 class에서 팀원분이 코드리뷰를 남겨주셨는데, ConcurrentHashMap이 수정 작업에서는 synchro가 걸려있어서 멀티 스레드 내에서는 안전하다는 의견을 주셨습니다.

 

좋은 피드백을 남겨주신 덕분에 불필요한 동기화 작업을 제거해줘서 

private static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

 

해당으로 sessionMap을 생성해주었습니다.

 

 

다음으로는 

// SocketEventListener class 만들어 주기 전 Controller 코드
@RestController
@Slf4j
@RequiredArgsConstructor
public class ChatController {
    private final ChatService chatService;
    private final KeywordService keywordService;
    private final SimpMessagingTemplate template;

    @MessageMapping("/enter")
    public void message(@Payload ChatEnterDto chatEnterDto, SimpMessageHeaderAccessor headerAccessor) {
        try {
            ChatEnterResponseDto responseDto = chatService.sendChatMessage(chatEnterDto);
            template.convertAndSend(StompConstants.getOnlyRoomEnterDestination(chatEnterDto.getRoomId()), responseDto);
            log.info("response :: {}", responseDto);

            headerAccessor.getSessionAttributes().put("userId", chatEnterDto.getUserId());
            headerAccessor.getSessionAttributes().put("roomId", chatEnterDto.getRoomId());
            log.info("current header accessor attributes :: {}", headerAccessor.getSessionAttributes());
        }
        catch (CustomException e) {
            if (e.getCustomError() == CustomError.CHATROOM_DOES_NOT_EXIST) {
                chatroomNotExist(chatEnterDto.getRoomId());
            } else if (e.getCustomError() == CustomError.USER_DOES_NOT_EXIST) {
                userNotExist(chatEnterDto.getRoomId());
            } else if (e.getCustomError() == CustomError.CHATROOM_USER_ALREADY_JOINED) {
                blockSameUser(chatEnterDto.getRoomId());
            }
        }
    }

 

✓ 이게 SocketEventListener 클래스 DI 받기 전 코드인데 ,

headerAccessor.getSessionAttributes().put("userId", chatEnterDto.getUserId());
headerAccessor.getSessionAttributes().put("roomId", chatEnterDto.getRoomId());
  • 해당 코드를 controller 메서드마다 반복적으로 넣어줘야 했었는데 이걸 이벤트 기반으로 따로 빼니까 
listener.createHeaders(headerAccessor, chatEnterDto.getUserId(), chatEnterDto.getRoomId());

해당 코드로 수정할 수 있었습니다. 

 

 

그리고 

 

@EventListener
    public void handleWebSocketDisConnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String sessionId = findBrowserSessionId(headerAccessor.getSessionId());
        if(sessionId != null) {
            sessionMap.remove(headerAccessor.getSessionId());
        }

        Optional<Object> userIdOpt = Optional.ofNullable(headerAccessor.getSessionAttributes().get("userId"));
        Optional<Object> roomIdOpt = Optional.ofNullable(headerAccessor.getSessionAttributes().get("roomId"));
        if (userIdOpt.isPresent() && roomIdOpt.isPresent()) {
            long userId = (Long) userIdOpt.get();
            long roomId = (Long) roomIdOpt.get();
            chatService.disconnectUserSetFalse(userId, roomId);
            log.info("verify the user changed to false :: {}", chatService.userStatus(userId, roomId));
        }

        log.info("Web socket session closed. Message : [{}]", event.getMessage());
    }

 

✓ SessionDisconnectEvent를 받아서 해당 세션의 정보를 활용해서 서비스 코드를 작성하려고 할때 주의할 점이 있는데요.

 

저는 "userId"와 "roomId"를 받아서 chatService 단에서 해당 정보를 false로 set하는 서비스 코드를 disconnectEvent handle 메서드에 넣어줬는데, 여기서 NPE 가 발생할 가능성이 매우 높으니 null 체크하시는 걸 추천드립니다.

 

SessionDisconnectEvent가 동일한 session 으로 여러번 반복이 되는 경우도 많아서요.

(사용자가 다른 탭으로 옮겨가거나, socket time limit을 초과하였거나, 사용자가 해당 브라우저 창에서 아무 활동도 하고 있지 않을때 등등의 상황에 DisconnectEvent 동일한 세션 내에서 계속 발생되면)

해당 session 의 headerAccessor에 sessionAttribute가 이미 비어있는데 또 DisconnectEvent 발생해버리면

-> 이미 session attribute가 비어있는 상태에서 getSessionAttributes 하면 NPE 발생

 

 


+ Optional 안티 패턴을 사용해서  정정합니다.

 

[ 올바른 Optional 사용법 가이드 ]

  • Optional 변수에 Null을 할당하지 말아라
  • 값이 없을 때 Optional.orElseX()로 기본 값을 반환하라
  • 단순히 값을 얻으려는 목적으로만 Optional을 사용하지 마라
  • 생성자, 수정자, 메소드 파라미터 등으로 Optional을 넘기지 마라
  • Collection의 경우 Optional이 아닌 빈 Collection을 사용하라
  • 반환 타입으로만 사용하라

 

해당 부분을 참고해서 handleWebSocketDisconnectListener 메서드의 Optional 안티 패턴을 수정해보겠습니다.

 

 

/**
 * Handle session disconnected events.
 *
 * @param event the event
 */
@EventListener
    public void handleWebSocketDisConnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
        Map<String, Object> sessionAttributes = headerAccessor.getSessionAttributes();

        String sessionId = findBrowserSessionId(headerAccessor.getSessionId());

        if(sessionId != null) {
            sessionMap.remove(headerAccessor.getSessionId());
        }

        if (sessionAttributes.get("userId") != null && sessionAttributes.get("roomId") != null) {
            chatService.disconnectUserSetFalse((Long) sessionAttributes.get("userId"), (Long) sessionAttributes.get("roomId"));
        }
        log.info("Web socket session closed. Message : [{}]", event.getMessage());
    }

 

  • 단순히 값을 얻으려는 목적으로만 Optional을 사용하지 마라
  • 반환 타입으로만 사용하라

이 부분을 중점으로 

userIdOptroomIdOpt 변수 선언하면서 값 가져오겠다고 Optional 쓰는 것 자체가 안티 패턴인 것 같아 

로컬 변수 선언을 생략하여 Optional 대신 null 체크로 리팩토링하였습니다.

 

 


 

이렇게 두번정도 Websocket Stomp를 활용한 에러 핸들링 회고를 남겨보았는데요.

 

단순 채팅이 아닌 모든 step 을 실시간으로 받고 넘겨줘야 하는 실시간 처리를 소켓으로 처리하는게 쉽지는 않은 것 같다고 느꼈습니다. ㅎㅎ

 

하지만 재미있었던 프로젝트들이라 꼭 회고로 남기고 싶었는데 드디어 발행해서 기분이 좋습니다!!

 

 

 

 

< reference : https://mangkyu.tistory.com/203 >