반응형

이전 단계에서 모든 처리를 마치고 이제 메시지만 전송해주면된다. 

그런데 메시지를 전송할때 수신자가 현재 웹소켓에 연결되어있지않을때에 대한 처리를 해주어야했다.

 

getOnlineSession

private List<WebSocketSession> getOnlineSession(List<UserEntity> userEntities) {
    return userEntities.stream()
            .map(receiver -> userSessionMap.get(receiver.getEmail()))
            .filter(Objects::nonNull)
            .toList();
}

 

 

getOfflineUsers

private List<UserEntity> getOfflineUsers(List<UserEntity> userEntities) {
    List<UserEntity> offlineReceivers = userEntities.stream()
            .filter(receiver -> userSessionMap.get(receiver.getEmail()) == null)
            .toList();
    return offlineReceivers;
}

 

위 메서드를 이용해서 온라인인 유저는 세션으로 받아오고, 오프라인인 유저도 찾았다.

private void sendMessageAndkeep(List<UserEntity> userEntities, ChatMessageDto chatMessageDto, UserEntity sender) {
    //연결중인 세션만 찾아서 문자를전송
    getOnlineSession(userEntities).forEach(
            receiverSession -> sendMessage(receiverSession, chatMessageDto.getMessage()));

    //연결중이지않은 유저는 해당 기록을 저장.
    saveOfflineChat(userEntities, chatMessageDto, sender);
}

 

연결되어있는세션은  바로 chatMessegeDto에있는 message를 전송해주고 

연결중이지않은 유저는 유저정보와 메시지를 저장해준다.

 

saveOfflineChat

private void saveOfflineChat(List<UserEntity> userEntities, ChatMessageDto chatMessageDto, UserEntity sender) {
    List<UserEntity> offlineReceivers = getOfflineUsers(userEntities);

    offlineReceivers.forEach(offlineUser -> {
        OfflineChatHistoryEntity offlineChat = OfflineChatHistoryEntity.builder()
                .senderId(sender.getId())
                .receiverId(offlineUser.getId())
                .content(chatMessageDto.getMessage())
                .createdAt(LocalDateTime.now())
                .build();
        offlineChatHistoryRepository.save(offlineChat);
    });
}

 

이렇게 전송되지못한메시지는 저장해두었다가 이후에 해당유저가 웹소켓에 연결될때 다시 전송해준다.

웹소켓에 연결될때 실행되는 afterConnectionEstablished 메서드에 해당 기능을 추가해준다.

@Override
@Transactional
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    //유저의 이메일을 얻어온다.
    String userId = getUserId(session);

    UserEntity userEntity = userRepository.findByEmail(userId)
            .orElseThrow(() -> new ApiException(ErrorCode.BAD_REQUEST, "없는 유저입니다."));


    // 오프라인 유저 리스트에 해당 유저가 있는지 확인
    if (offlineChatHistoryRepository.existsByReceiverId(userEntity.getId())) {
        // chatRepository에서 해당 유저의 오프라인 메시지 정보 조회
        List<OfflineChatHistoryEntity> offlineMessages = offlineChatHistoryRepository.findByReceiverId(
                userEntity.getId());

        // 조회한 메시지 정보를 새로 연결된 세션을 통해 전송
        for (OfflineChatHistoryEntity message : offlineMessages) {
            sendMessage(session, message.getContent());
        }

        // 오프라인 유저 리스트에서 해당 유저 제거
        offlineChatHistoryRepository.deleteAllByReceiverId(userEntity.getId());
    }

    //유저의 이메일과 세션을 매핑해서 서버에서 관리.
    userSessionMap.put(userId, session);
    log.info("{} 환영합니다", userId);
}

 

이렇게 해주면 웹소켓에 연결될때 받지못한 메시지를 받을수있다.

반응형

'공부 임시 저장소' 카테고리의 다른 글

스프링 부트 - spring-security-jwt  (0) 2024.06.25
스프링부트 - aws s3파일저장  (0) 2024.06.25
스프링부트 - 파일 저장  (0) 2024.06.24
사이드 프로젝트) 채팅앱  (0) 2024.06.24
스프링 웹소켓  (1) 2024.06.03
반응형

회원가입은 다른 프로젝트와 크게 다른부분이없어서 spring-security만 정리할때 따로 올려보겠다.

우선은 회원가입 기능이 끝난후에 채팅 기능을 바로 만들어주었다.

 

WebSocketConfig

@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {

    private final SocketTextHandler socketTextHandler;

	@Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(socketTextHandler,"/chat/rooms")
                .setAllowedOrigins("*");
    }
}

 

서버의 /chat/rooms 로 연결하면 웹소켓에 연결되도록 엔드포인트를 만들어주었다.

따로 중간에 인터셉터를 addInterceptor() 로 추가할수있는데 현재는 필요하지않아서 cors 처리만 해주었다.

 

SocketTextHandler

@Component
@RequiredArgsConstructor
@Slf4j
public class SocketTextHandler extends TextWebSocketHandler {
    private final Map<String, WebSocketSession> userSessionMap = new ConcurrentHashMap<>();

    private final UserRepository userRepository;
    private final ChatRoomRepository roomRepository;
    private final OfflineChatHistoryRepository offlineChatHistoryRepository;
    private final UserValidate userValidate;
    private final JwtUtil jwtUtil;
    private final ObjectMapper mapper;
    private final ChatRoomValidator chatRoomValidator;

    @Override
    @Transactional
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 전송할 수신자에 대한 session과 정보.
        ChatMessageDto chatMessageDto = getMessageDto(message);
        //전송유저
        UserEntity senderUser = getUserEntity(session);

        if (chatMessageDto.getChatRoomId() == null) {
            if (chatMessageDto.getReceiverEmail().isEmpty()){
                throw new ApiException(ErrorCode.BAD_REQUEST,"채팅방 또는 수신자 정보가 필요합니다.");
            }
            List<UserEntity> receivers = getReceivers(chatMessageDto);
            // 2. 송신자와 수신자를 해당 방에 가입.
            List<UserEntity> allUser = new ArrayList<>(receivers) {{ add(senderUser); }};
            ChatRoom room = getChatRoom(allUser, chatMessageDto);
            roomRepository.save(room);
            sendMessageAndkeep(receivers, chatMessageDto, senderUser);
        } else {
            // room 이 있는경우 .
            ChatRoom chatRoom = chatRoomValidator.validateChatRoom(roomRepository.findById(chatMessageDto.getChatRoomId()));
            List<ChatRoomUsersEntity> chatRoomUsers = chatRoom.getChatRoomUsers();

            List<UserEntity> currentRoomUsers = chatRoomUsers.stream()
                    .map(ChatRoomUsersEntity::getUser)
                    .filter(user -> !Objects.equals(user, senderUser)).toList();

            sendMessageAndkeep(currentRoomUsers, chatMessageDto, senderUser);
        }
    }

    private static ChatRoom getChatRoom(List<UserEntity> allUser,
                                        ChatMessageDto chatMessageDto) {
        ChatRoom room = new ChatRoom();
        List<ChatRoomUsersEntity> chatRoomUsersList = allUser.stream()
                .map(user -> ChatRoomUsersEntity.builder()
                        .user(user)
                        .joinedAt(LocalDateTime.now())
                        .chatRoom(room)
                        .build())
                .collect(toList());
        room.setChatRoom(chatRoomUsersList,chatMessageDto.getRoomName());
        return room;
    }

    private List<UserEntity> getReceivers(ChatMessageDto chatMessageDto) {
        return chatMessageDto.getReceiverEmail().stream().map(receiver ->
                userValidate.validUser(userRepository.findByEmail(receiver))
        ).toList();
    }

    @Override
    @Transactional
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        //유저의 이메일을 얻어온다.
        String userId = getUserId(session);

        UserEntity userEntity = userRepository.findByEmail(userId)
                .orElseThrow(() -> new ApiException(ErrorCode.BAD_REQUEST, "없는 유저입니다."));

        // 오프라인 유저 리스트에 해당 유저가 있는지 확인
        if (offlineChatHistoryRepository.existsByReceiverId(userEntity.getId())) {
            // chatRepository에서 해당 유저의 오프라인 메시지 정보 조회
            List<OfflineChatHistoryEntity> offlineMessages = offlineChatHistoryRepository.findByReceiverId(
                    userEntity.getId());

            // 조회한 메시지 정보를 새로 연결된 세션을 통해 전송
            for (OfflineChatHistoryEntity message : offlineMessages) {
                sendMessage(session, message.getContent());
            }

            // 오프라인 유저 리스트에서 해당 유저 제거
            offlineChatHistoryRepository.deleteAllByReceiverId(userEntity.getId());
        }

        //유저의 이메일과 세션을 매핑해서 서버에서 관리.
        userSessionMap.put(userId, session);
        log.info("{} 환영합니다", userId);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String userId = getUserId(session);
        userSessionMap.remove(userId);
        log.info("클라이언트와 연결이 해제 되었습니다.");
    }


    private String getUserId(WebSocketSession session) {
        return
                jwtUtil.getUsername(
                        Objects.requireNonNull(session.getHandshakeHeaders().get("Authorization")).get(0)
                                .split(" ")[1]);
    }

    private UserEntity getUserEntity(WebSocketSession session) {
        return userRepository.findByEmail(getUserId(session))
                .orElseThrow(() -> new ApiException(ErrorCode.BAD_REQUEST));
    }

    private ChatMessageDto getMessageDto(TextMessage message) throws JsonProcessingException {
        return mapper.readValue(message.getPayload(), ChatMessageDto.class);
    }

    private void sendMessageAndkeep(List<UserEntity> userEntities, ChatMessageDto chatMessageDto, UserEntity sender) {
        //연결중인 세션만 찾아서 문자를전송
        getOnlineSession(userEntities).forEach(
                receiverSession -> sendMessage(receiverSession, chatMessageDto.getMessage()));

        //연결중이지않은 유저는 해당 기록을 저장.
        saveOfflineChat(userEntities, chatMessageDto, sender);
    }

    private void saveOfflineChat(List<UserEntity> userEntities, ChatMessageDto chatMessageDto, UserEntity sender) {
        List<UserEntity> offlineReceivers = userEntities.stream()
                .filter(receiver -> userSessionMap.get(receiver.getEmail()) == null)
                .toList();

        offlineReceivers.forEach(offlineUser -> {
            OfflineChatHistoryEntity offlineChat = OfflineChatHistoryEntity.builder()
                    .senderId(sender.getId())
                    .receiverId(offlineUser.getId())
                    .content(chatMessageDto.getMessage())
                    .createdAt(LocalDateTime.now())
                    .build();
            offlineChatHistoryRepository.save(offlineChat);
        });
    }

    private List<WebSocketSession> getOnlineSession(List<UserEntity> userEntities) {
        return userEntities.stream()
                .map(receiver -> userSessionMap.get(receiver.getEmail()))
                .filter(Objects::nonNull)
                .toList();
    }


    public <T> void sendMessage(WebSocketSession session, T message) {
        try {
            session.sendMessage(new TextMessage(mapper.writeValueAsString(message)));
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }
}

 

여기서 웹소켓에 일어나는 모든 일들을 처리해주어야 한다. 하나씩 뜯어서 보자.

 

WebSocketHandler - afterConnectionEstablished(WebSocketSession session)

웹소켓 연결시 

@Override
@Transactional
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    //유저의 이메일을 얻어온다.
    String userId = getUserId(session);

    UserEntity userEntity = userRepository.findByEmail(userId)
            .orElseThrow(() -> new ApiException(ErrorCode.BAD_REQUEST, "없는 유저입니다."));

    //유저의 이메일과 세션을 매핑해서 서버에서 관리.
    userSessionMap.put(userId, session);
    log.info("{} 환영합니다", userId);
}

 

getUserId

private String getUserId(WebSocketSession session) {
    return
            jwtUtil.getUsername(
                    Objects.requireNonNull(session.getHandshakeHeaders().get("Authorization")).get(0)
                            .split(" ")[1]);
}

 

1. 세션의 헤더에있는 jwt토큰을 받아와서 유저의 아이디를 읽는다.

2. 유저의 아이디로 db에있는 유저정보를 찾아온다. 

3. 유저의 아이디와,유저의 세션을 map형식으로 저장해서 관리.

 

이제 유저가 웹소켓에 연결되었으니 문자를 보내거나 받을수있다.

 

WebSocketHandler - handleTextMessage(WebSocketSession session,TextMessage message)

@Override
    @Transactional
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 전송할 수신자에 대한 session과 정보.
        ChatMessageDto chatMessageDto = getMessageDto(message);
        //전송유저
        UserEntity senderUser = getUserEntity(session);

        if (chatMessageDto.getChatRoomId() == null) {
            if (chatMessageDto.getReceiverEmail().isEmpty()){
                throw new ApiException(ErrorCode.BAD_REQUEST,"채팅방 또는 수신자 정보가 필요합니다.");
            }
            List<UserEntity> receivers = getReceivers(chatMessageDto);
            // 2. 송신자와 수신자를 해당 방에 가입.
            List<UserEntity> allUser = new ArrayList<>(receivers) {{ add(senderUser); }};
            ChatRoomEntity room = getChatRoom(allUser, chatMessageDto);
            roomRepository.save(room);
            sendMessageAndkeep(receivers, chatMessageDto, senderUser);
        } else {
            // room 이 있는경우 .
            ChatRoomEntity chatRoomEntity = chatRoomValidator.validateChatRoom(roomRepository.findById(chatMessageDto.getChatRoomId()));
            List<ChatRoomUsersEntity> chatRoomUsers = chatRoomEntity.getChatRoomUsers();

            List<UserEntity> currentRoomUsers = chatRoomUsers.stream()
                    .map(ChatRoomUsersEntity::getUser)
                    .filter(user -> !Objects.equals(user, senderUser)).toList();

            sendMessageAndkeep(currentRoomUsers, chatMessageDto, senderUser);
        }
    }

 

ChatMessageDto

public class ChatMessageDto {
    // 메시지  타입 : 입장, 채팅
    public enum MessageType {
        ENTER, TALK, CREATE
    }

    private MessageType messageType; // 메시지 타입
    private Long chatRoomId; // 방 번호
    private String roomName;
    private String message; // 메시지
    private List<String> receiverEmail; // 수신자 이메일
}

getMessageDto

private ChatMessageDto getMessageDto(TextMessage message) throws JsonProcessingException {
    return mapper.readValue(message.getPayload(), ChatMessageDto.class);
}

 

1. 유저가보낸 메시지를 messageDto 로 변환한다. 

 

사용자가 메시지를 보낼때 json 형태로 보내주어야한다. 

{

     "messageType" : "TALK",     

    "chatRoomId" : "2",

    "roomName":"방이름",

    "message":"Hello world!",

    "receiverEmail": ["친구1","친구2","친구3"]

}

이 부분은 프론트에서 사용자가 방을 만드는건지, 들어가는지와 누구에게보내는지등의 정보를 수집해서 json으로 만들어서 웹소켓으로 보내주어야한다.

 

2. 서버에서 messgeDto에 있는 정보로 메시지와 채팅방 정보를 처리해준다. 

메시지를 보낼때 보내는 대상이 필요한데 채팅방이나 수신자 두개중에 하나의 정보는 필수로 필요하다.

채팅방이 없는경우 수신자의 정보가 있게되는데  수신자의 정보로 db에서 해당 유저를 받아오고.

새로 채팅방을 만들어서 송신자와 수신자를 넣어준다. 

해당 방을 db에 저장하고 메시지내용을 수신자에게 전송해준다.

 

getChatRoom

  private static ChatRoomEntity getChatRoom(List<UserEntity> allUser,
                                              ChatMessageDto chatMessageDto) {
        ChatRoomEntity room = new ChatRoomEntity();
        List<ChatRoomUsersEntity> chatRoomUsersList = allUser.stream()
                .map(user -> ChatRoomUsersEntity.builder()
                        .user(user)
                        .joinedAt(LocalDateTime.now())
                        .chatRoomEntity(room)
                        .build())
                .collect(toList());
        room.setChatRoom(chatRoomUsersList,chatMessageDto.getRoomName());
        return room;
    }

 

@Entity
@Table(name = "chat_room")
public class ChatRoomEntity extends BaseEntity {
    private String name;
    private LocalDateTime createdAt;

    @OneToMany(mappedBy = "chatRoom",cascade = CascadeType.ALL)
    private List<ChatRoomUsersEntity> chatRoomUsers;

    public void setChatRoom(List<ChatRoomUsersEntity> chatRoomUsers,String name){
        this.chatRoomUsers = chatRoomUsers;
        this.name = name;
        this.createdAt = LocalDateTime.now();
    }
}

채팅방을 만들고 

유저의 엔티티를 어떤 유저가 어떤채팅방에 가입되어있는지 저장하는 chatRoomUsersEntity로 변경해준다. 

이 값을 chatRoom에 set 해주고 이렇게 만든 room을 저장해주게된다. 

 

3. 채팅방이 있는경우

messageDto가 포함한 roomId로 room엔티티를 받아온다.

해당 room에 있는 유저를 받아오고 받아온유저에서 메시지를 보내는 송신자는 제외해준다.

수신자에게 메시지를 보낸다.

 

entity정

 

chatRoomEntity

@Entity
@Table(name = "chat_room")
public class ChatRoomEntity extends BaseEntity {
    private String name;
    private LocalDateTime createdAt;

    @OneToMany(mappedBy = "chatRoom",cascade = CascadeType.ALL)
    private List<ChatRoomUsersEntity> chatRoomUsers;

    public void setChatRoom(List<ChatRoomUsersEntity> chatRoomUsers,String name){
        this.chatRoomUsers = chatRoomUsers;
        this.name = name;
        this.createdAt = LocalDateTime.now();
    }
}

 

생성된 채팅방의 정보를 저장.

 

chatRoomUsersEntity

@Entity
@Table(name = "chat_room_users")
public class ChatRoomUsersEntity extends BaseEntity {
    @ManyToOne
    @JoinColumn(name="userId")
    private UserEntity user;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "chatRoomId")
    private ChatRoomEntity chatRoomEntity;
    private LocalDateTime joinedAt;
}

 

어떤 유저가 어떤 방에 가입되어있는지에대해서 저장

외래키로 chat_room_id를 가지고있고 이 외래키로 room에 접근할수있다.

 

다음에는 메시지를 보낼때 해당 수신자가 연결되어있지않을때에 대한 처리를 해주겠다.

반응형
반응형

이전에 웹소켓을 사용해봤던 경험으로 이번에 사이드프로젝트로 프론트한분과 같이 채팅앱을 구현해보기로했다.

간단히 웹소켓을 만들고 채팅을 연결만 했던것에서 발전시켜서 채팅방을 만들고 채팅 기록을 남길수있도록 처리해주었다.

 

크게 만들어야할 기능들을 정리해보겠다.

1. 채팅기능 

2. 채팅방 

3. 채팅내용기록

4. 회원가입

 

이를 위해서 데이터베이스도 따로 erd로 정리를 해보았다.

 

 

회원가입 친구등록 그외 필요한 기능들을 위해서 db를 정의해 놓았다. 

 

라이브러리

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-security'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.mysql:mysql-connector-j'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.security:spring-security-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
    implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.2.0'
    implementation 'io.jsonwebtoken:jjwt-api:0.12.3'
    implementation 'io.jsonwebtoken:jjwt-impl:0.12.3'
    implementation 'io.jsonwebtoken:jjwt-jackson:0.12.3'
}

 

jpa, spring-security , validation , web, websocket , mysql을 사용하고있다.

 

이제 본격적으로 코드를 하나씩 작성해보자.

반응형

+ Recent posts