대부분의 Redis Stream기반의 EventListener를 구현하는 블로그들은 많지만 그중에서 Listener가 동작하지 않는 문제를 다룬 글은 없길래 한번 작성을 해본다.
필자도 Redis Stream을 토대로 한 EventListener를 구현했는데 갑자기 작동을 하지 않았는데 왜 작동을 하지 않았는지, 어떻게 하면 작동시킬 수 있는지에 대한 글을 적어보려 한다.
다음은 문제 상황이다.
Redis Stream의 EventListener를 사용하다가 기존의 서버는 ec2에 따로 IP를 확인해서 막는등의 보안로직은 작성하지 않았다.
Spring Security를 써서 막아줬다고 생각해서 아무런 문제가 없을 것으로 생각했지만 현실을 그렇게 순순히 흘러가지 않았다.
문제 상황 1.
Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host:
commands to Redis.
This is likely due to an attacker attempting to use Cross Protocol Scripting
to compromise your Redis instance. Connection from 83.97.73.245:36770 aborted.
redis의 docker log를 확인해보면 누군가 무작위로 POST를 통해서 Redis 인스턴스에 접근한다고 알려주고 있다.
내 redis의 redis.conf 를 확인할 시 다음과 같은 상태이다.
port 7001
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 3000
appendonly yes
requirepass {password}
masterauth {password}
protected-mode no
bind 0.0.0.0
cluster-announce-ip {ip주소}
cluster-announce-port 7001
cluster-announce-bus-port 17001
다음과 같이 protected-mode를 통해서 내부적으로 레디스끼리 통신하도록 했고
그 뒤에 모든 ip를 적용할수있는 bind를 적용해 놓았지만 requirepass와 masterauth를 통해서 비밀번호를 통해서 접근할 수 있도록
할 수 있게 되어있었다.
그러나 계속해서 저런 로그가 떴는데 디스코드에 접근하는 모든 IP와 요청들에 대해서 Logging을 진행해 보았다.
[Time] 2024-10-06 08:08:33 [Request URI] "/" [사용자 IP] "198.235.24.20"
[Time] 2024-10-06 07:47:14 [Request URI] "/" [사용자 IP] "95.214.27.33"
[Time] 2024-10-06 07:19:40 [Request URI] "/" [사용자 IP] "93.174.93.12"
등등등.. 지속적으로 엄청난 숫자의 로깅이 찍히는 것을 확인할 수 있었다. 아마 해외에서 계속해서 아무 요청이나 보내면서
해킹을 시도하려는 것 같은데 이처럼 특정 ip에 접속해서 거의 사용되는 모든 포트로 요청을 보내면서 확인을 하는 것 같다.
이렇게 요청을 보내고 redis인스턴스가 있는 포트로 POST 등의 메서드를 날리게 되면 redis에서 악의적인 공격이라고 생각하고
잠깐 redis 인스턴스를 멈추고, 이는 내 StreamListener가 동작하지 않는 문제를 유발하게 되었다.
하지만 이런 경우가 아니더라도, 네트워크 오류, 레디스 자체의 문제 등등의 여러 가지 문제점에 있어서도
고가용성을 위해 redis cluster를 사용했고, 이에 기반한 redis Stream Listener를 구현했는데 redis Listener가 동작하지 않는다는 것은 말이 안 된다.
좀 더 자세히 들어가자면, Spring Application에서 동작하는 로직에서 redis Stream으로 메시지를 보내는 상황에서는
저렇게 공격이 들어와도 Redis Stream안에 메시지가 "잘 들어간다".
그러나 Listener의 입장에서 Redis가 문제가 조금이라도 생기게 된다면 바로 구독하고 있는 Stream에서 연결을 끊어버리게 된다.
그 이유를 찾아보다가 github에서 사람들이 건의한 2가지 issue를 보게 되었다.
https://github.com/spring-projects/spring-data-redis/issues/2833
`StreamMessageListenerContainer` stops consuming messages after connection timeout to Redis · Issue #2833 · spring-projects/sp
Problem I have been following this guide for implementing asynchronous message listeners for Redis streams. The listeners work as expected at first. However, if a temporary connection timeout occur...
github.com
요약하자면 다음과 같다.
마찬가지로 StreamMessageListenerContainer를 생성해서 구현을 했는데 문제 혹은 예외가 생긴 후
StreamMessageListenerContainer에서 메시지를 처리하지 못한다는 것이다.
그래서 다른 사용자들이 건의를 한 것이 있는데 바로 아래 댓글이다.
https://github.com/spring-projects/spring-data-redis/issues/2833#issuecomment-2178236036
`StreamMessageListenerContainer` stops consuming messages after connection timeout to Redis · Issue #2833 · spring-projects/sp
Problem I have been following this guide for implementing asynchronous message listeners for Redis streams. The listeners work as expected at first. However, if a temporary connection timeout occur...
github.com
해결하는 방법은 단순하다.
아래 코드에서 cancelOnError를 false로 변경하면 문제상황이 발생해도 구독을 취소하지 않는 것이다.
Subscription subscription =
listenerContainer.register(
StreamMessageListenerContainer.StreamReadRequest
.builder(StreamOffset
.create(REDIS_STREAM_KEY_REFRESH_RELATION, ReadOffset.lastConsumed()))
// here
.cancelOnError(t -> false)
.consumer(Consumer.from(REDIS_STREAM_KEY_REFRESH_RELATION_GROUP_NAME,
REDIS_STREAM_KEY_REFRESH_RELATION_CONSUMER_NAME))
.autoAcknowledge(true).build(),
streamListener);
그리고 이렇게 댓글을 적어놓은 사용자 또한 이러한 기본값이 true로, 즉 구독을 취소하게 되어있는데
false로 기본값이 바뀌어야 하는 것 아닌가?라고 생각을 적어놓았다. 위 issue에서는 이것을 만드신 개발자가
따로 언급하지 않았으니 같은 문제로 issue를 올린 다른 사용자를 가져와서 확인해 보았다.
https://github.com/spring-projects/spring-data-redis/issues/2919
Switch `StreamMessageListenerContainer` default to not unsubscribe on simple exception · Issue #2919 · spring-projects/spring-
While using Redis Stream, I encountered an issue where the consumer faced an exception while consuming, leading to the cancellation of subscription by the StreamMessageListenerContainer. I found th...
github.com
다음 issue에 들어가 보면 똑같이 예외가 발생 시 구독취소로 인해 동작하지 않으니, 구독을 취소하지 않도록 하는 방법은 어떤가에
대해서 언급한 내용이다.
여기서 개발에 참여한 마크 팔루치라는 분은 StreamMessageListenerContainer안에 StreamReadRequest 객체를 통해서
사용자 정의 클래스를 지정해서 해결을 하라고 언급한다.
StreamMessageListenerContainer
.StreamReadRequest
.builder(StreamOffset)
.errorHandler(…)
.cancelOnError(…)
.build();
하지만 여기서 궁금증이 생긴다. 왜 이분은 예외가 발생할 시 구독을 취소하는 것을 기본값으로 두었을까?
스트림 읽기로 인한 오류는 그룹 할당이 없는 등의 스트림 설정 오류를 의미하며,
이러한 가정을 염두에 두고 복구할 수 있는 방법이 없으므로 구독을 취소하기로 결정했습니다.라고 말하시고
구독자가 없으면 메시지가 사라지는 Pub/Sub와 달리 Stream에서는 계속해서 메세지가 쌓이게 되므로 예외를 무시하고 계속하는
상황은 가능하지만, 비즈니스 코드에는 이러한 방식이 좋지 않다는 것이다.
확실한 이해를 돕기 위해서 직접 구독을 취소하지 않는 방식, cacelOnError( e -> false )로 지정하고
실패를 해보기로 하자.
이런 경우 무제한 Polling으로 인해 과도한 polling으로 인해 리소스가 과도하게 소모되면서 결국 application속도부터 느려지게 된다. 아마 이런 경우를 대비해서 개발자 분이 기본값을 구독을 끊는 것으로 하고, 사용자가 적절히 처리하도록 설정하도록 하신 게 아닐까? 생각을 해본다.
그러면 이제 코드를 어떻게 작성해야 하는지 보자.
이전에 포스트 한 RedisStream에 대한 코드와 다르다.
먼저 Listener에서 각 Stream에 대한 Key와 Consumer Group을 지정해줘야 한다.
사실 저렇게 하드코딩식으로 이름을 지정하는 부분은 좋지 않은 방식이나, 프로젝트의 규모가 작아 대충 때려 넣은 점은 넘어가도록 하고 자신의 프로젝트 규모에 맞게 코드를 작성하도록 하고 유기적으로 변동시킬 수 있으면 있는 식으로 구성하도록 하자.
2. 해결 코드
@PostConstruct
public void createConsumer() {
createStreamConsumerGroup("coursePoint", "coursePointGroup");
createStreamConsumerGroup("courseFree", "courseFreeGroup");
}
public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) {
boolean streamExists = Boolean.TRUE.equals(redistemplateForCluster.hasKey(streamKey));
if (!streamExists) {
redistemplateForCluster.execute((RedisCallback<Void>) connection -> {
byte[] streamKeyBytes = streamKey.getBytes();
byte[] consumerGroupNameBytes = consumerGroupName.getBytes();
connection.execute("XGROUP", "CREATE".getBytes(), streamKeyBytes, consumerGroupNameBytes,
"0".getBytes(), "MKSTREAM".getBytes());
return null;
});
} else if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redistemplateForCluster.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) {
return redistemplateForCluster
.opsForStream().groups(streamKey).stream()
.anyMatch(group -> group.groupName().equals(consumerGroupName));
}
위 코드는 Stream이 존재하는지 확인하고, 없으면 새로 생성하는 코드이다.
redis의 명령어 중 Stream 명령어인 XGROUP CREATE 명령어로 그룹이 존재하지 않을 시 생성하고 MKSTREAM으로
나중에 Stream을 구독할 시 구독한 후에 메시지를 읽어올 수 있도록 하는 코드를 실행한다.
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> startPointListener() {
pointListenerContainer = createStreamSubscription(
"coursePoint", "coursePointGroup", "instance-1", pointEventListener
);
return pointListenerContainer;
}
그 후 StreamMessageListenerContainer를 만드는데 각 stream key, 그룹, consumer이름, Listener로 등록할 객체를 넣어주게 만들었다. 그 후 다음 방식으로 StreamMessageListenerContainer를 커스텀하게 된다.
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> createStreamSubscription(
String streamKey, String consumerGroup, String consumerName,
StreamListener<String, MapRecord<String, String, String>> eventListener) {
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1L))
.errorHandler(e -> {
log.error("Error in listener: {}", e.getMessage());
restartSubscription(streamKey, consumerGroup, consumerName, eventListener);
}).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(redisConnectionFactoryForCluster, containerOptions);
container.register(
StreamMessageListenerContainer.StreamReadRequest.builder(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
.cancelOnError(t -> true) // 오류 발생 시 구독 취소
.consumer(Consumer.from(consumerGroup, consumerName))
.autoAcknowledge(true)
.build(), eventListener);
container.start();
log.info("Listener container started for stream: {}", streamKey);
return container;
}
여기서 동작하지 않을 경우 설정해야 하는 부분은 StreamMessageListenerConatinerOptions에
자신만의 errorHandler를 추가하면 된다.
여기서는 재구독을 실행하도록 구성했는데 이때 무한 폴링이 되는 경우를 방지하기 위해서 restartSubscription을 다음과 같이
구현했다.
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private void restartSubscription(String streamKey, String consumerGroup, String consumerName,
StreamListener<String, MapRecord<String, String, String>> eventListener) {
scheduler.schedule(() -> {
log.info("Restarting subscription for stream: {}", streamKey);
stopContainer(streamKey);
createStreamSubscription(streamKey, consumerGroup, consumerName, eventListener).start();
}, 5, TimeUnit.SECONDS); // 일정 시간 후 재시작
}
private void stopContainer(String streamKey) {
if ("coursePoint".equals(streamKey) && pointListenerContainer != null && pointListenerContainer.isRunning()) {
pointListenerContainer.stop();
log.info("Stopped point listener container");
}
if ("courseFree".equals(streamKey) && freeListenerContainer != null && freeListenerContainer.isRunning()) {
freeListenerContainer.stop();
log.info("Stopped free listener container");
}
}
재구독하는 로직을 backoff로 시간을 정하고 시간마다 시작하고 싶어 scheduler 객체를 지정하고, 5초마다 다시 redis stream에
재구독하도록 지정했다. 마찬가지로 이런 하드코딩된 key값들은 알아서 지정해 보도록 하자.
Listener 클래스를 만드는 방식은
https://kiru-dev-study.tistory.com/17 다음에 있다.
Redis Stream을 사용한 EventPublisher, EventListener 구현
https://playground.sopt.org/projects/179 SOPT | 프로젝트 둘러보기자세한 내용이 궁금하신가요?playground.sopt.org다음과 같은 Product에 참여를 해 서버 개발자로 참여를 했었는데요.!그런데 여기서 의문점이
kiru-dev-study.tistory.com
다음과 같은 방식으로 구현을 하게 되면 적절하게 redis에 예외가 생겨 연결이 끊겨도 적절하게 구독할 수 있는 방법을 마련할 수 있고 레디스에 문제가 생겼을 경우 적절히 에러를 처리할 수 있다.
최종 코드는
https://github.com/TeamDATEROAD/DATEROAD-SERVER/blob/develop/dateroad-api/src/main/java/org/dateroad/config/RedisStreamSubscriber.java 다음에서 확인할 수 있다.
'Spring' 카테고리의 다른 글
가상 스레드 vs 반응형 프로그래밍 [Spring/Java] (2) | 2024.11.22 |
---|---|
[Spring] 응? 이게 왜 롤백이 안되지? - 비동기와 @Transaction (4) | 2024.10.21 |
[Spring] Redis Stream을 사용한 EventPublisher, EventListener 구현 (0) | 2024.08.05 |
Swagger로 사랑받는 개발자 되기 [ Spring ] (0) | 2024.08.01 |
Spring 검색조회 필터링 구현 방법 [JPA Specification] (0) | 2024.07.31 |