사용 기술

- WebSocket STOMP

- Redis

 

구현 목록

- WebSocketConfig

- HandShakeInterceptor

- DisconnectEventListener

- QueueController

- QueueService

 


build.gradle

implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.boot:spring-boot-starter-websocket'

 

application.yml

spring:
  application:
    name: siljeun
  data:
    redis:
      host: localhost
      port: 6379

 

WebSocketConfig

클라이언트 <-> 서버 간 통신에 사용되는 엔드포인트 설정

@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  private final JwtHandShakeInterceptor jwtHandShakeInterceptor;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws")
        .addInterceptors(jwtHandShakeInterceptor)
        .setAllowedOriginPatterns("*");
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableSimpleBroker("/topic");
    registry.setApplicationDestinationPrefixes("/app");
  }
}
더보기

@EnableWebSocketMessageBroker

- STOMP 기반 WebSocket 메시지 브로커 활성화

 

implements WebSocketMessageBrokerConfigurer

- WebSocket 설정 커스터마이징 (엔드포인트 등록, 브로커 설정)

 

@Override

public void registerStompEndpoints(StompEndpointRegistry registry){}

- 클라이언트가 WebSocket에 접속할 수 있는 엔드포인트 경로 설정

- 클라이언트는 여기에 등록한 경로를 url에 입력하여 소켓 연결 요청

- `ws://localhost:8080/ws`

 

.setAllowedOriginPatterns("*");

- WebSocket 통신 시 CORS(Cross-Origin Resource Sharing) 정책을 설정하는 메서드

- CORS : 다른 출처(Origin)의 웹 애플리케이션이 자원을 요청할 때 브라우저가 이를 제한하는 보안 정책

- *로 설정하면 모든 인증되지 않은 도메인에서 접근 가능

- 실서비스에서는 아래처럼 사용

.setAllowedOriginPatterns("https://your-frontend.com", "https://admin.your-frontend.com")

 

@Override

public void configureMessageBroker(MessageBrokerRegistry registry){}

- 메시지 브로커 설정 메서드

- 서버 <-> 클라이언트 간의 메시지 라우팅 경로 구성

 

registry.enableSimpleBroker("/topic");

- 내장 메시지 브로커 활성화

- 서버가 클라이언트로 메시지를 publish(발행)할 때 사용하는 경로 prefix

- 메시지 destination : `/topic/queue/schedule1/user1`

- 위 경로로 특정 구독자에게 메시지 전달 가능(scheduleId가 schedule1인 대기열의 user1에게 메시지 전달)

 

registry.setApplicationDestinationPrefixes("/app");

- 클라이언트가 서버로 메시지를 send(전송)할 때 사용하는 경로 prefix

- 메시지 destination : `/app/addQueue`

- 서버는 @MessageMapping("/addQueue")로 메시지 수신 가능

 

HandShakeInterceptor

- 클라이언트가 소켓 연결을 요청하면, 요청한 사용자의 토큰 검증 & 티켓팅하려는 schedule(공연 회차)의 유효성 검증

- 서버에 전달할 데이터를 attributes에 저장하여, 서버에서 어느 시점이든 꺼내쓸 수 있도록 처리

WebSocket 세션은 쿠키나 세션으로 상태를 유지하는 HTTP와 달리 상태가 없기 때문에 attributes로 상태를 유지할 수 있다. 

@Component
@RequiredArgsConstructor
public class JwtHandShakeInterceptor implements HandshakeInterceptor {

  private final JwtUtil jwtUtil;
  private final ScheduleRepository scheduleRepository;

  @Override
  public boolean beforeHandshake(
      ServerHttpRequest request,
      ServerHttpResponse response,
      WebSocketHandler wsHandler,
      Map<String, Object> attributes) throws Exception {

    HttpHeaders headers = request.getHeaders();
    URI uri = request.getURI();
    MultiValueMap<String, String> params = UriComponentsBuilder.fromUri(uri).build()
        .getQueryParams();

    String token = headers.getFirst("Authorization");
    String scheduleId = params.getFirst("scheduleId");

    if (!jwtUtil.validateToken(token)) {
      return false;
    }

    if (StringUtils.isBlank(scheduleId) || !scheduleRepository.existsById(
        Long.valueOf(scheduleId))) {
      return false;
    }

    String username = jwtUtil.getUsername(token);
    attributes.put("username", username);
    attributes.put("scheduleId", scheduleId);

    return true;
  }

  @Override
  public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
      WebSocketHandler wsHandler, Exception exception) {

  }
}
더보기

implements HandshakeInterceptor

- Spring WebSocket에서 제공하는 인터셉터 인터페이스로, 웹소켓 연결 초기 handshake 과정에서 동작

 

@Override
public boolean beforeHandshake(){}

- 웹소켓 연결 요청이 들어왔을 때 가장 먼저 실행

- 내부 로직에서 반환값이 false이면 연결 거부

- attributes는 WebSocket 세션에 저장되어 이후 @MessageMapping에서 참조 가능

 

@Override
public void afterHandshake(){}

- handshake 끝난 후 호출

 

DisconnectEventListener

- 소켓 연결이 끊어졌을 때 해당 사용자를 큐에서 제거 (다음 사용자가 TTL을 기다리지 않고 바로 대기를 끝낼 수 있도록)

- 인터셉터에서 attributes에 저장한 데이터를 꺼내서 사용함으로써, 소켓 연결이 끊어진 시점에서도 어떤 사용자와의 연결이 끊어졌는지 파악 가능

@Component
@RequiredArgsConstructor
public class StompDisconnectEventListener {

  private final WaitingQueueService waitingQueueService;

  @EventListener
  public void handleDisconnect(SessionDisconnectEvent event) {
    StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());

    String username = (String) accessor.getSessionAttributes().get("username");
    Long scheduleId = Long.valueOf((String) accessor.getSessionAttributes().get("scheduleId"));

    if (username != null && scheduleId != null) {
      waitingQueueService.deleteWaitingUser(scheduleId, username);
      waitingQueueService.deleteSelectingUser(scheduleId, username);
    }
  }
}
더보기

@EventListener

public void handleDisconnect(SessionDisconnectEvent event) {}

- @EventListener 어노테이션으로 SessionDisconnectEvent가 감지될 때 메서드를 자동 호출

- STOMP 연결을 담당하는 StompSubProtocolHandler는 WebSocketSession이 종료되면 SessionDisconnectEvent를 생성해 Spring에 발행

- WebSocketSession이 종료되는 경우는 클라이언트가 창을 닫거나, 네트워크가 끊기는 경우에 해당

 

StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());

- 이벤트 객체에서 STOMP 메시지를 꺼내고, 헤더나 세션 정보를 쉽게 추출할 수 있도록 도와주는 유틸

- StompHeaderAccessor를 통해 소켓 연결 시에 데이터를 저장했던 attributes에 쉽게 접근할 수 있다.

 

StompHeaderAccessor를 사용하지 않고 attributes에서 데이터를 추출하려면 아래와 같은 복잡한 과정이 필요하다.

코드 길이는 비슷해보이지만 simpSessionAttributes와 같은 헤더 키를 알아야 하고, 형변환이 복잡하다는 단점이 있다.

MessageHeaders headers = event.getMessage().getHeaders();
Map<String, Object> sessionAttributes = (Map<String, Object>) headers.get("simpSessionAttributes");
String username = (String) sessionAttributes.get("username");

 

QueueController

- 소켓 연결 후 메시지 수신할 때 사용하는 컨트롤러

@RestController
@RequiredArgsConstructor
public class WaitingQueueController {

  private final WaitingQueueService waitingQueueService;

  @MessageMapping("/addQueue")
  public void addQueue(Message<?> message) {
    SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);

    String username = (String) accessor.getSessionAttributes().get("username");
    Long scheduleId = Long.valueOf((String) accessor.getSessionAttributes().get("scheduleId"));
    
    waitingQueueService.addWaitingQueue(scheduleId, username);
    System.out.println("연결 성공");
  }
}
더보기

@MessageMapping("/addQueue")

- 클라이언트가 `/app/addQueue` 경로로 STOMP 메시지를 보내면 메서드가 실행된다.(HTTP 프로토콜과 유사한 방식)

 

public void addQueue(Message<?> message) {}

- 메시지의 헤더 정보를 활용하기 위해 Message<?> message 타입으로 파라미터를 전달받는다.

- Message<?>는 body뿐만 아니라 세션과 헤더의 전체 정보를 감싼 래퍼 객체이고, 파라미터에 이렇게 타입을 명시해주면 스프링이 내부에서 Message<?> 객체로 감싸서 전달해준다.

- 만약 addQueue 메서드 안에서 세션 정보(attributes) 필요없이 메시지 본문만 필요하다면 아래와 같은 방식으로 파라미터를 전달받을 수 있다.

@MessageMapping("/reserveSeat")
public void reserveSeat(@Payload SeatRequest seatRequest) {
    // 메시지 본문(JSON)을 자동으로 매핑해줌
}

 

QueueService_기본 틀

@Slf4j
@Service
@RequiredArgsConstructor
@Transactional
public class WaitingQueueService {

  private final StringRedisTemplate redisTemplate;
  private final SimpMessagingTemplate messagingTemplate;
  private final ScheduleRepository scheduleRepository;

  private static final long ttlMillis = 900000L; // ttl 15분
  private static final long acceptedRank = 1000L; // 좌석 선택 최대 수용 인원 1000명
  public static final String prefixKeyForWaitingQueue = "waiting:schedule:";
  public static final String prefixKeyForSelecingQueue = "selecting:schedule:";
  
  ...

}
더보기

StringRedisTemplate

  • RedisTemplate<String, String>의 구현체
  • Redis에 문자열(key-value) 기반으로 데이터를 저장/조회할 때 사용
  • 내부적으로 StringSerializer를 기본으로 사용하여 데이터를 직렬화/역직렬화하지 않고 문자 그대로 처리하므로, 모든 key와 value가 String 타입인 경우 사용

SimpMessagingTemplate

  • Spring에서 서버 → 클라이언트(STOMP) 로 메시지를 보낼 때 사용하는 템플릿 클래스
  • STOMP 주소로 클라이언트에게 메시지 전송
messagingTemplate.convertAndSend("/topic/room/" + roomId, messageDto);

 

QueueService_Redis 연결 확인

  @PostConstruct
  public void testRedisConnection() {
    String pong = redisTemplate.getConnectionFactory().getConnection().ping();
    log.info("Redis 연결 상태: {}", pong);
  }

 

QueueService_addQueue

컨트롤러에서 호출하는 메서드로, 파라미터로 전달받은 사용자를 대기열에 저장

// 예매 대기 시작
public void addWaitingQueue(Long scheduleId, String username) {
  ZSetOperations<String, String> zSet = redisTemplate.opsForZSet();

  Schedule schedule = scheduleRepository.findById(scheduleId)
      .orElseThrow(() -> new CustomException(ErrorCode.NOT_FOUND_SCHEDULE));

  if (LocalDateTime.now().isBefore(schedule.getTicketingStartTime())) {
    throw new CustomException(ErrorCode.NOT_TICKETING_TIME);
  }

  String key = prefixKeyForWaitingQueue + scheduleId;
  long createdAt = System.currentTimeMillis();

  if (zSet.score(key, username) == null) {
    zSet.add(key, username, createdAt);
  }

  sendWaitingNumber(key, username, scheduleId);
}
더보기

ZSetOperations<String, String> zSet = redisTemplate.opsForZSet();

- Redis의 Sorted Set(ZSet) 자료구조를 다루기 위한 객체 생성

- ZSet은 (value, score) 형태로 저장되며, score에 따라 자동 정렬

 

String key = prefixKeyForWaitingQueue + scheduleId;

- (key, value)를 (waiting:schedule:1, username)으로 설정하여, scheduleId가 1인 공연의 대기열임을 key로 나타낸다.

 

if (zSet.score(key, username) == null) {
  zSet.add(key, username, createdAt);
}

- zSet.score(key, username)이 null이면 아직 등록되지 않은 상태, 즉 대기열에 들어온 적 없는 사용자이므로 이 경우에만 대기열에 등록하는 로직인 zSet.add()를 수행

- zSet에 등록할 때는 key, value, score를 파라미터로 데이터를 넣어준다.

- 입장 요청 시각(createdAt)을 score로 저장해 대기 순번 관리에 활용

 

 

QueueService_deleteUser

파라미터로 전달받은 사용자를 대기열에서 삭제

// 대기 끝 or 소켓 연결 해제되면 대기열에서 삭제
  public void deleteWaitingUser(Long scheduleId, String username) {
    String key = prefixKeyForWaitingQueue + scheduleId;
    redisTemplate.opsForZSet().remove(key, username);
  }
더보기

redisTemplate.opsForZSet().remove(key, username);

- 해당 ZSet의 key에 저장된 값 중 username을 삭제

 

QueueService_sendWaitingNumber

// 대기중인 특정 사용자에게 랭킹 및 대기번호 전송
  public void sendWaitingNumber(String key, String username, Long scheduleId) {
    ZSetOperations<String, String> zSet = redisTemplate.opsForZSet();

    Long rank = zSet.rank(key, username);

    if (rank == null) {
      throw new CustomException(ErrorCode.QUEUE_INSERT_FAIL);
    }

    rank = rank + 1;

    // 내 순위와 현재 좌석 선택 중인 사용자 수의 합이 수용 인원보다 적으면 대기 X
    Long selectingQueueSize = zSet.size(prefixKeyForSelecingQueue + scheduleId);
    selectingQueueSize = (selectingQueueSize == null) ? 0 : selectingQueueSize;

    if (rank + selectingQueueSize <= acceptedRank) {
      String destination = "/topic/queue/" + scheduleId + "/" + username;
      MyQueueInfoResponse response = new MyQueueInfoResponse(scheduleId, username, rank,
          true);
      messagingTemplate.convertAndSend(destination, response);

      addSelectingQueue(scheduleId, username);
      deleteWaitingUser(scheduleId, username);

      return;
    }

    String destination = "/topic/queue/" + scheduleId + "/" + username;
    MyQueueInfoResponse response = new MyQueueInfoResponse(scheduleId, username, rank,
        false);
    messagingTemplate.convertAndSend(destination, response);
  }
더보기

Long rank = zSet.rank(key, username);

- key에 저장된 username의 순위(0부터 시작) 조회

 

Long selectingQueueSize = zSet.size(prefixKeyForSelecingQueue + scheduleId);

- ZSet에 몇 명(몇 개의 데이터)가 있는지 반환하는 함수

 

String destination = "/topic/queue/" + scheduleId + "/" + username;

- 서버에서 메시지를 보낼 목적지 경로 설정

 

messagingTemplate.convertAndSend(destination, response);

- destination에 response 전송