https://playground.sopt.org/projects/179
SOPT | 프로젝트 둘러보기
자세한 내용이 궁금하신가요?
playground.sopt.org
다음과 같은 Product에 참여를 해 서버 개발자로 참여를 했었는데요.!
그런데 여기서 의문점이 하나 있었는데
데이트 코스를 생성하는 과정에서 다음과 같은 과정을 거쳐야 했었습니다
1. 데이트 코스 생성 [CREATE]
2. 코스를 생성하면 User에게 코스를 생성했으니 Point값 추가 [ UPDATE ]
3. 그후 Point 내역을 보여주기 위한 Point 테이블에 Point 생성 [CREATE]
근데 여기서 의문점이였던게 어라라???
POST API한개로 다른 테이블을 조회해서 UPDATE와 CREATE를 전부 수행해도 되는것인가...?에 대한 의문이였습니다.
Restful한 API를 설계하고싶었고 그에 따라서 하나의 요청에 CRUD중 하나만 수행하도록 만들고싶었기도 했습니다.
결론적으로 하나의 POST API로 여러 테이블에 대한 CREATE와 UPDATE를 수행하는것은 가능하고 트랜잭션관리나 용이성등을 고려해서 잘 짜게 되면 효율적인 설계로 할수 있었지만 결론적으로 하나의 API에서 하나의 기능을 수행하는 담당역할을 진행하고 싶어 Point를 유저에 대한 Update내용을 Event기반으로 작성하기로 했습니다.
기존에 Redis를 사용하기도 했고 Redis Stream에서 작동하는 복구성으로 인해 Redis Stream을 사용하기로 정했습니다.
여기서 Redis Stream은 서버가 다운되더라도 기존에 message를 받아 놓게 된다면 다시 재시작되어도 기존에 읽지 못했던 부분부터 다시 실행시켜 복구를 시킬수가 있어 복구성이 좋다고 판단하였고 Point내역을 보여주는 부분은 알람과 같이 그렇게 중요하지 않은 부분이라 생각해 Redis Stream을 사용해 Event 를 사용해보자! 라고 판단했습니다 ㅎㅅㅎ
하지만 Event기반 방식은 추적하기 어렵고 테스트하기 어렵기 때문에 주의해서 코드를 작성하시기 바랍니다!!
1. 의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
다음과 같은 의존성을 gradle에 추가해줍니다.
2. Redis Stream Config 작성
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfig {
private final PointEventListener pointEventListener;
private final FreeEventListener freeEventListener;
@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private int port;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) {
if (Boolean.FALSE.equals(redisTemplate().hasKey(streamKey))) {
RedisAsyncCommands<String, String> commands = (RedisAsyncCommands<String, String>) Objects.requireNonNull(
redisTemplate()
.getConnectionFactory())
.getConnection()
.getNativeConnection();
CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
.add(CommandKeyword.CREATE)
.add(streamKey)
.add(consumerGroupName)
.add("0")
.add("MKSTREAM");
// MKSTREAM 옵션을 사용하여 스트림과 그룹을 생성
commands.dispatch(CommandType.XGROUP, new StatusOutput<>(StringCodec.UTF8), args).toCompletableFuture()
.join();
}
// Stream 존재 시, ConsumerGroup 존재 여부 확인 후 ConsumerGroup을 생성
else {
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redisTemplate().opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}
// ConsumerGroup 존재 여부 확인
public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = redisTemplate()
.opsForStream().groups(streamKey).stream().iterator();
while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if (xInfoGroup.groupName().equals(consumerGroupName)) {
return true;
}
}
return false;
}
@Bean
public Subscription pointSubscription() {
createStreamConsumerGroup("coursePoint", "courseGroup");
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
redisConnectionFactory(),
containerOptions);
Subscription subscription = container.receiveAutoAck(Consumer.from("courseGroup", "instance-1"),
StreamOffset.create("coursePoint", ReadOffset.lastConsumed()), pointEventListener);
container.start();
return subscription;
}
@Bean
public Subscription freeSubscription() {
createStreamConsumerGroup("courseFree", "courseGroup");
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
redisConnectionFactory(),
containerOptions);
Subscription subscription = container.receiveAutoAck(Consumer.from("courseGroup", "instance-2"),
StreamOffset.create("courseFree", ReadOffset.lastConsumed()), freeEventListener);
container.start();
return subscription;
}
}
위 코드는 다음과 같은 내용으로 이루어져 있습니다!
RedisTemplate
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
- RedisTemplate: Redis와 상호작용하기 위한 템플릿입니다. 여기서 String 키와 Object 값을 사용하는 템플릿을 생성합니다.
- setConnectionFactory: Redis에 연결하기 위한 팩토리를 설정합니다.
- setKeySerializer와 setValueSerializer: Redis에 저장할 때 사용할 직렬화 방식을 설정합니다. 여기서는 문자열 직렬화를 사용합니다.
필드
- PointEventListener와 FreeEventListener: Redis Streams에서 수신한 메시지를 처리하는 리스너입니다. 이 클래스에서 메시지를 수신하면 해당 리스너가 호출됩니다.
- host와 port: Redis 서버의 호스트와 포트를 설정하기 위한 필드입니다. @Value 어노테이션을 사용하여 application.properties 또는 application.yml에서 값을 주입받습니다.
RedisConnectionFactory
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
- RedisConnectionFactory: Redis에 연결하기 위한 팩토리입니다. Lettuce 클라이언트를 사용하여 Redis에 연결합니다.
Stream Consumer Group 생성
public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) {
// ...
}
- 이 메서드는 주어진 스트림 키와 소비자 그룹 이름으로 소비자 그룹을 생성합니다.
- 먼저, 스트림이 존재하지 않으면 Redis에 새로운 스트림과 소비자 그룹을 생성합니다.
- MKSTREAM 옵션을 사용하여 새로운 스트림을 생성할 때 사용합니다.
- 스트림이 존재하는 경우, 소비자 그룹이 존재하는지 확인하고 필요에 따라 소비자 그룹을 생성합니다.
소비자 그룹 존재 여부 확인
public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) {
// ...
}
- 이 메서드는 주어진 스트림 키에 대해 소비자 그룹이 존재하는지 확인합니다.
- Redis의 스트림 정보를 조회하여 소비자 그룹 이름과 비교합니다.
Subscription 설정
@Bean
public Subscription pointSubscription() {
// ...
}
- pointSubscription 메서드는 "coursePoint" 스트림에 대한 소비자 그룹을 생성하고 메시지를 수신할 수 있는 구독을 설정합니다.
- StreamMessageListenerContainer를 사용하여 Redis 스트림에서 메시지를 수신하고 처리하는 컨테이너를 생성합니다.
- receiveAutoAck 메서드를 사용하여 자동으로 메시지를 확인하고, 메시지를 수신하는 리스너(pointEventListener)를 등록합니다.
Free Subscription 설정
@Bean
public Subscription freeSubscription() {
// ...
}
- freeSubscription 메서드는 "courseFree" 스트림에 대한 소비자 그룹을 생성하고 메시지를 수신할 수 있는 구독을 설정합니다.
- pointSubscription과 유사하게 스트림 메시지를 수신하고 처리하는 리스너(freeEventListener)를 등록합니다.
다음과 같은 과정을 통해서 Redis Stream과 Redis Stream에 Subscription을 등록할수 있습니다!
이제 이벤트의 Publisher와 Listnener는 어떻게 구현하는 지 봐야겠죠
EventListener
@Component
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public class PointEventListener implements StreamListener<String, MapRecord<String, String, String>> {
private final UserRepository userRepository;
private final PointRepository pointRepository;
@Override
@Transactional
public void onMessage(final MapRecord<String, String, String> message) {
Map<String, String> map = message.getValue();
Long userId = Long.valueOf(map.get("userId"));
TransactionType type = TransactionType.valueOf(map.get("type"));
User user = getUser(userId);
int point = Integer.parseInt(map.get("point"));
String description = map.get("description");
switch (type) {
case POINT_GAINED:
user.setTotalPoint(user.getTotalPoint() + point);
break;
case POINT_USED:
user.setTotalPoint(user.getTotalPoint() - point);
break;
default:
throw new UnauthorizedException(FailureCode.INVALID_TRANSACTION_TYPE);
}
pointRepository.save(Point.create(user,point,type,description));
userRepository.save(user);
}
private User getUser(Long userId) {
return userRepository.findById(userId).orElseThrow(
() -> new EntityNotFoundException(FailureCode.USER_NOT_FOUND)
);
}
}
다음과 같은 과정을 통해 읽은 메세지를 읽어들인후 user의 상태를 update하게 됩니다. 물론 여기다가 Point의 Save도 넣어놓긴했는데 기존에 생각했던 방식과 다른점은 Point를 Save하는 과정이 Listener에 추가된것인데 PointListener를 만들었기 때문에 Point에 관련한 로직은 PointListener에서 담당하는것이 맞다고 생각해 이렇게 변경하게 되었습니다!
Event Publisher
public void publishEvenUserPoint(final Long userId, PointUseReq pointUseReq) {
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("userId", userId.toString());
fieldMap.put("point", Integer.toString(pointUseReq.getPoint()));
fieldMap.put("type", pointUseReq.getType().toString());
fieldMap.put("description", pointUseReq.getDescription());
redisTemplate.opsForStream().add("coursePoint", fieldMap);
}
다음과 같이 Redis의 String Template을 사용해 String으로 값을 전달해주고 Listnener에서 그 값을 읽어들여 Point 관련 로직을 처리하도록 구현했습니다.
이렇게 구현을 하게 되면 후에 EDA기반 아키텍쳐로 변경할때 좀더 쉽게 작성할수 있다고 판단을 했고 이러한 과정에서
Redis Stream을 사용해 Event작동 방식을 알아보고 좀더 유연한 대처가 가능할것으로 판단했습니다
2024-10-18 기준 Redis Cluster + Redis Stream EventListener를 구현해서 사용하실때 Listener가 동작하지 않는다면 아래 블로그를 참고해주세요
https://kiru-dev-study.tistory.com/19
[Spring] Redis기반의 EventStreamListener가 동작하지 않는 문제
대부분의 Redis Stream기반의 EventListener를 구현하는 블로그들은 많지만 그중에서 Listener가 동작하지 않는 문제를 다룬 글은 없길래 한번 작성을 해본다. 필자도 Redis Stream을 토대로한 EventListener를
kiru-dev-study.tistory.com
혹시 Redis Stream에 더 궁금하시다면 아래 사이트에서 확인해보시는것도 좋을 것 같습니다
https://redis.io/docs/latest/develop/data-types/streams/
Redis Streams
Introduction to Redis streams
redis.io
'Spring' 카테고리의 다른 글
[Spring] 응? 이게 왜 롤백이 안되지? - 비동기와 @Transaction (4) | 2024.10.21 |
---|---|
[Spring] Redis기반의 EventStreamListener가 동작하지 않는 문제 (2) | 2024.10.15 |
Swagger로 사랑받는 개발자 되기 [ Spring ] (0) | 2024.08.01 |
Spring 검색조회 필터링 구현 방법 [JPA Specification] (0) | 2024.07.31 |
편리한 객체간 매핑을 위한 MapStruct 적용기 .feat 당근클론코딩 (1) | 2024.05.08 |