□ Intro
Springboot와 Redis를 통하여 메시지 비동기 처리를 할 때 StreamListener와 Consumer를 만나게 된다.
내가 원하는 구독자시스템, Pub/Sub, 메시지 처리 구조를 만들기 위해서는 StreamListenerContainer와 StreamListener, 그리고 Consumer 객체 사이의 관계를 이해하는 것이 필요하다.
□ StreamListener와 Consumer의 관계 이해하기
StreamListener와 Consumer는 1:1로 연결을 맺고 있는 구조다.
StreamListener와 Consumer가 관계지어지면 Subscription 객체를 생성한다.
아래 구조는,
각 Consumer가 서로 다른 StreamListener와 맺어져 있고,
이에 따라 서로 다른 처리를 해야 하는 경우의 구조이다.
위와 달리 단 하나의 StreamListener에 여러 Consumer가 등록되어있을 경우에는,
1. 모든 Conumser가 받은 데이터는 onMessage의 동일한 처리를 거치게 된다.
2. Subscription 객체는 Consumer와 StreamListener연결당 하나씩 생성된다.
아래 그림과 같다.
■ 구독자 클래스 Overview
이제 알아볼 것은 이런 설정이 코드상으로 어떻게 되어있냐는 것이다.
코드에서 구독자 클래스를 확인해보면 (=StreamListener 구현체) 아마 InitializingBean과 DisposableBean도 구현하도록 되어있을 것이다. 즉 세가지를 재정의하게 된다.
1. onMessage() : 레디스 스트림에서 수신한 데이터를 처리하는 곳
2. destroy() : 빈 소멸시의 처리코드
3. afterPropertiesSet() : 빈 생성 후 의존관계 주입이 끝난 뒤의 처리코드
핵심을 정리하자면 이렇다.
구현하려는 구독자 클래스에서, implements StreamListener, InitializingBean, DisposableBean을 하고 메서드 3개를 재정의하자. onMessage에는 수신 데이터 처리를, afterPropertiesSet()에는 Consumer 객체 생성 및 listener과의 연결하여 Subscription 관계 맺기를 하자. 그리고 마지막으로 destroy()에서는 모든 Subscription을 해제하고 listenerContainer.stop() 처리해주면 구독자 클래스를 종료시키는 것이다.
■ afterPropertiesSet()에서 Consumer 및 Listener 초기세팅
1. ListenerContainer를 생성한다.
조만간 이 ListenerContainer에 receive를 등록하면서 Consumer랑 StreamListener간의 관계를 맺어주게 된다.
public StreamMessageListenerContainer createStreamMessageListenerContainer() {
return StreamMessageListenerContainer.create(connectionFactory, 옵션정보);
}
* 참고: connectionFactory는 RedisConfig에 빈으로 등록하도록 한다. 자세한 것은 RedisConfig 설정 방법을 따로 찾아보면 된다.
2. Consumer 객체를 생성한다.
메시지 수신을 하는 Consumer 객체다. 그룹키와 소비자키를 사용해 Consumer 객체를 생성한다.
Consumer consumer = Consumer.from(groupKey, consumerKey);
첫번째 인자의 groupKey는, 동일한 groupKey를 가진 여러 Consumer들을 하나의 논리적 그룹으로 묶는 키다.
두번째 인자의 consumerKey는 개별 Consumer를 식별하는 키다.
참고:
Consumer의 처리 로직이 서로 달라야 할때는 listener와 groupKey를 다르게 해야 한다는 것은 직관적으로 받아들여질 것이다. 그런데 listener와 groupKey가 동일한 여러 Consumer를 두는 상황은 왜 둘까?
- 부하 분산: 데이터가 하나의 Consumer에 몰리지 않도록 다수의 Consumer에 분할하여 전달.
- 장애 대응 및 재처리: 한 Consumer가 장애발생해도 다른 Consumer가 받아서 처리 가능
3. receive 작업 (=Subscription 객체 생성)
리스너 컨테이너에 메시지 수신을 설정한다.
그 전에, 배경 지식으로 receive와 ack를 알도록 하자.
https://docs.spring.io/spring-data/redis/docs/3.1.9/reference/html/#redis.streams.acknowledge
일단 receive 이후에, 구현된 구독자가 수신을 시작하고, onMessage에서 수신을 하더라도 별도의 ack 처리를 하지 않으면 pending 상태로 남아있게 된다. redis의 내부 구조에는 PEL(pending entries list) 구조가 있다. 여기에 아직 ack처리 되지 않은 데이터들이 남아있게 된다. 만약 onMessage에서 ack처리를 잊는다면, 메모리 이슈로 레디스 서버가 다운될 수 있다는 점 유념하자.
여기서는 간편하게 receiveAutoAck()를 보려고 한다.
인자는 3개다. 순서대로 consumer, offset, listener다.
Subscription s = listenerContainer.receiveAutoAck(consumer, streamOffset, this);
* consumer: 앞서 생성한 consumer
* streamOffset: 스트림키, 그리고 메시지를 읽기위한 오프셋(메시지를 읽기 위한)을 지정하여 만들 수 있는 streamOffset 객체다. 예를 들어서 StreamOffset streamOffset = StreamOffset.create(strKey, ReadOffset.lastConsumed()); 과 같다. 이 코드는 마지막으로 소비된 메시지 이후부터 메시지를 읽기위한 오프셋이다. 이 오프셋을 생성할 때 스트림키 지정이 들어감을 기억해두자.
* this:: 꼭 this는 아니고, StreamListener가 들어가면 된다. 이때 this로 설정한 이유는, 이 구독자가 StreamListener를 구현하고 있으며 onMessage에 모든 구독처리 동작이 기재되어 있기 때문이다. 다시 말해 이 클래스에 구현해둔 onMessage로 들어올 데이터를 담당할 Consumer와 Listener를 이 클래스 자체에 기술하기 위함이다.
receive 동작이 반환하는 것은 Subscription 객체로, 메시지 수신이 성공했는지 여부를 나타낸다.
5. 마지막으로, Subscription이 수립되기를 대기한다.
만약 다음과 같이 지정한다면, 1초 내에 Subscription이 정상 수행되도록 대기하는 것이다.
만약 1초 안에 이벤트가 이루어지지 않는다면 TimeoutException이 발생한다.
(예외 처리로는 재시도할 수도 있겠고.. 모니터링할 수도 있고..)
s.await(Duration.ofSeconds(1L));
6. listenerContainer.start()
생성해서 StreamListener에 등록한 Consumer가 수신을 시작할 수 있다!
* recall:
위에서 Subscription s = listenerContainer.receiveAutoAck(consumer, streamOffset, this); 했던 것을 기억하자
■ destroy()
적절하게, subscription.cancel()처리랑 listenerContainer.stop()처리를 해주면 된다.
■ onMessage()
생략한다! 파라미터로 받아온 데이터를, 원하는 대로 처리해주면 된다.
docs에서 가져온 예시만 투척함!
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
□ 요약
위와 같이 listenerContainer에 설정된 consumer-streamListener를 통해 해당 streamListener의 onMessage에서 메시지를 수신하게 된다.
□ 달성가능한 것
1. StreamListenerContainer가 스프링 빈으로 등록됨을 이해한다.
2. StreamListenerContainer에 consumer와 listener를 짝지어 등록함을 이해한다.
3. Consumer들을 서로 다른 listener에 등록하여 저마다의 onMessage에서 처리할 수 있다.
4. 동일한 listener에 등록된 Consumer의 group key별로 서로 다른 로직을 구현할 수 있다.
.
.
Ref.
https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/
https://docs.spring.io/spring-data/redis/docs/3.1.9/reference/html/#redis.streams.acknowledge
https://kingjakeu.github.io/springboot/2022/02/10/spring-boot-redis-stream/
'미분류글' 카테고리의 다른 글
Springboot 3 Migration (from 2.6) (2) | 2025.01.06 |
---|---|
Redis Stream의 구조적 특징(Message Queue) (2) | 2024.08.26 |
[IntelliJ] 폐쇄망 개발환경 세팅 (0) | 2024.08.21 |
[Web][Docker] 동일 ip, 서로 다른 도메인 80포트에 대한 포트포워딩 설정 (1) | 2024.07.16 |
java.rmi.server.ExportException: Listen failed on port: 0; (0) | 2024.06.14 |