[Springboot] 스케줄러 동적 등록/삭제/변경
■ Intro
동적으로 스케줄링 정보를 변경해야 할 일이 있다. 예시로 스케줄링 주기를 중간에 바꿔야 한다든가, 스케줄 내용을 변경해야 한다든가 말이다.
값이 동적으로 들어오는 위치를 DB라면, DB의 데이터 변경을 자동으로 알아차린 후 스케줄러를 변경시켜야 한다. 그래서 지난 번에는 어떻게 변경감지를 할지 고민했었고, 폴링으로 감지하기로 결정했다.
이번에는 데이터 변경이 감지된 후 스케줄러를 동적으로 변경하는 방법을 정리하겠다.
■ 샘플 구조
예시로, 1분마다 DB에서 '주기 값'이 변경되었는지 확인하고, 스케줄러에 반영하는 간단한 샘플 구조를 작성했다. 각자의 비스니스에 맞게 커스텀해서 사용하면 될 것이다.
@Component
public class SampleDynamicScheduler {
private static final Logger logger = LoggerFactory.getLogger(SampleDynamicScheduler.class);
private final Map<String, SampleObject> cache = new ConcurrentHashMap<>();
private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
private final SampleRepository sampleRepository;
private ThreadPoolTaskScheduler scheduler;
public SampleDynamicScheduler(SampleRepository sampleRepository) {
this.sampleRepository = sampleRepository;
}
@PostConstruct
public void init() {
// 서버에 초기 데이터 저장
List<SampleObject> newData = sampleRepository.findAll();
newData.forEach(data -> cache.put(data.getName(), data));
// 스케줄러 활성화
scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
// 서버 데이터를 가지고 초기 스케줄러 등록
cache.forEach((key, data) -> scheduleTask(key, data.getInterval()));
logger.info("scheduler initialized");
}
@PreDestroy
public void destroy() {
scheduler.shutdown();
}
@Scheduled(fixedRate = 60000) // 1 min period polling
private void pollUpdates() {
// DB 데이터 확인
List<SampleObject> newData = sampleRepository.findAll();
// DB랑 서버 데이터 비교해서 변경사항 있다면 스케줄러 등록 시작
for(SampleObject data : newData) {
if (isUpdated(data)) {
applyScheduler(data);
}
}
}
private void applyScheduler(SampleObject target) {
// 서버에 변경된 DB 데이터 업데이트
cache.put(target.getName(), target);
// 해당 스케줄러 삭제 후 재등록
removeScheduleTask(target.getName());
scheduleTask(target.getName(), target.getInterval());
}
private void removeScheduleTask(String key) {
ScheduledFuture<?> future = scheduledTasks.get(key);
if (future != null) {
future.cancel(true);
scheduledTasks.remove(key);
logger.info("scheduler removed {}", key);
}
}
private void scheduleTask(String key, String interval) {
// 스케줄러 주기 결정하기 (예시: 크론식)
String cron = convertIntervalToCron(interval);
// 스케줄러 로직 작성
Runnable task = () -> {
logger.info("running,, {}", key);
};
// 스케줄러 등록
Trigger trigger = new CronTrigger(cron);
ScheduledFuture<?> schedule = scheduler.schedule(task, trigger);
scheduledTasks.put(key, schedule);
logger.info("schedule task added: {}", key);
}
private String convertIntervalToCron(String interval) {
// 자체 interval 표현을 cron 표현식으로 변경하는 코드
return "* * * * * *";
}
private boolean isUpdated(SampleObject data) {
// 서버 데이터와, DB 데이터 비교하여
// 새로 추가/삭제/변경된 건이 있다면 T/F 리턴
logger.info("A change has been detected for: {}", data.getName());
return true;
}
}
■ 스케줄링 관련 타입
위 구조에서 스케줄러를 관리할 때 사용한 두가지 객체를 확인해보자.
□ ThreadPoolScheduler
Spring 프레임워크에서 제공하는 스케줄링 도구로, TaskScheduler 인터페이스의 구현체이다. 스레드 풀을 관리하여 멀티스레드 Task를 예약하고 실행할 수 있도록 설계되었다. 스케줄링 관리를 유연하게 할 수 있도록 도와줘서, 이번에 동적으로 스케줄링 작업을 추가하고 제거할 때 사용하였다. 내 예시 코드에서는 Cron을 사용하였지만 특정 시간, 간격으로 설정할 수도 있다.
ThreadPoolScheduler에는 다음과 같은 기능이 있다.
- initialize(): 스레드 풀 초기화
- shutdown() 스케줄러 종료
- schedule(Runnable task, Trigger trigger): 사용자 정의 트리거를 기반으로 작업을 실행한다. 예를 들어 크론 표현식을 넣으려면 new CronTrigger(cronExpression)을 사용할 수 있다.
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = this.getScheduledExecutor();
try {
ErrorHandler errorHandler = this.errorHandler;
if (errorHandler == null) {
errorHandler = TaskUtils.getDefaultErrorHandler(true);
}
return (new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler)).schedule();
} catch (RejectedExecutionException var5) {
RejectedExecutionException ex = var5;
throw new TaskRejectedException(executor, task, ex);
}
}
- schedule(Runnable task, Instance startTime): 특정 시간에 작업을 실행한다. 예를 들면 LocalDateTime을 두번째 파라미터로 넣어서 스케줄러 동작 시간을 지정할 수 있다.
public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
ScheduledExecutorService executor = this.getScheduledExecutor();
Duration delay = Duration.between(this.clock.instant(), startTime);
try {
return executor.schedule(this.errorHandlingTask(task, false), NANO.convert(delay), NANO);
} catch (RejectedExecutionException var6) {
RejectedExecutionException ex = var6;
throw new TaskRejectedException(executor, task, ex);
}
}
□ interface ScheduledFuture
Task를 스케줄링한 결과이다. cancle, get, isCancelled, isDone 기능을 사용할 수 있다. 위에 작성한 샘플 코드에서 보면, scheduler.schedule(task, trigger)한 결과값인 ScheduledFuture를 별도의 map에 보관해서, 나중에 이 task를 취소해야 할 때 map을 통해 ScheduledFuture 인스턴스에 접근해 cancel해주는 용도로 사용했다.
package java.util.concurrent;
/**
* A delayed result-bearing action that can be cancelled.
* Usually a scheduled future is the result of scheduling
* a task with a {@link ScheduledExecutorService}.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future
*/
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}