JAVA/Application

[Springboot] 스케줄러 동적 등록/삭제/변경

히어로맛쿠키 2024. 12. 2. 12:43

■ Intro

동적으로 스케줄링 정보를 변경해야 할 일이 있다. 예시로 스케줄링 주기를 중간에 바꿔야 한다든가, 스케줄 내용을 변경해야 한다든가 말이다.
 
값이 동적으로 들어오는 위치를 DB라면, DB의 데이터 변경을 자동으로 알아차린 후 스케줄러를 변경시켜야 한다. 그래서 지난 번에는 어떻게 변경감지를 할지 고민했었고, 폴링으로 감지하기로 결정했다.
 
이번에는 데이터 변경이 감지된 후 스케줄러를 동적으로 변경하는 방법을 정리하겠다.
 


■ 샘플 구조

예시로, 1분마다 DB에서 '주기 값'이 변경되었는지 확인하고, 스케줄러에 반영하는 간단한 샘플 구조를 작성했다. 각자의 비스니스에 맞게 커스텀해서 사용하면 될 것이다.
 

@Component
public class SampleDynamicScheduler {
    private static final Logger logger = LoggerFactory.getLogger(SampleDynamicScheduler.class);
    private final Map<String, SampleObject> cache = new ConcurrentHashMap<>();
    private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
    private final SampleRepository sampleRepository;
    private ThreadPoolTaskScheduler scheduler;

    public SampleDynamicScheduler(SampleRepository sampleRepository) {
        this.sampleRepository = sampleRepository;
    }

    @PostConstruct
    public void init() {
        // 서버에 초기 데이터 저장
        List<SampleObject> newData = sampleRepository.findAll();
        newData.forEach(data -> cache.put(data.getName(), data));

        // 스케줄러 활성화
        scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();

        // 서버 데이터를 가지고 초기 스케줄러 등록
        cache.forEach((key, data) -> scheduleTask(key, data.getInterval()));

        logger.info("scheduler initialized");
    }

    @PreDestroy
    public void destroy() {
        scheduler.shutdown();
    }

    @Scheduled(fixedRate = 60000) // 1 min period polling
    private void pollUpdates() {
        // DB 데이터 확인
        List<SampleObject> newData = sampleRepository.findAll();

        // DB랑 서버 데이터 비교해서 변경사항 있다면 스케줄러 등록 시작
        for(SampleObject data : newData) {
            if (isUpdated(data)) {
                applyScheduler(data);
            }
        }
    }

    private void applyScheduler(SampleObject target) {
        // 서버에 변경된 DB 데이터 업데이트
        cache.put(target.getName(), target);

        // 해당 스케줄러 삭제 후 재등록
        removeScheduleTask(target.getName());
        scheduleTask(target.getName(), target.getInterval());
    }

    private void removeScheduleTask(String key) {
        ScheduledFuture<?> future = scheduledTasks.get(key);
        if (future != null) {
            future.cancel(true);
            scheduledTasks.remove(key);
            logger.info("scheduler removed {}", key);
        }
    }

    private void scheduleTask(String key, String interval) {
        // 스케줄러 주기 결정하기 (예시: 크론식)
        String cron = convertIntervalToCron(interval);

        // 스케줄러 로직 작성
        Runnable task = () -> {
            logger.info("running,, {}", key);
        };

        // 스케줄러 등록
        Trigger trigger = new CronTrigger(cron);
        ScheduledFuture<?> schedule = scheduler.schedule(task, trigger);
        scheduledTasks.put(key, schedule);
        logger.info("schedule task added: {}", key);
    }

    private String convertIntervalToCron(String interval) {
        // 자체 interval 표현을 cron 표현식으로 변경하는 코드
        return "* * * * * *";
    }
    private boolean isUpdated(SampleObject data) {
        // 서버 데이터와, DB 데이터 비교하여
        // 새로 추가/삭제/변경된 건이 있다면 T/F 리턴
        logger.info("A change has been detected for: {}", data.getName());
        return true;
    }
}

 
 


■ 스케줄링 관련 타입

위 구조에서 스케줄러를 관리할 때 사용한 두가지 객체를 확인해보자.
 

□ ThreadPoolScheduler

Spring 프레임워크에서 제공하는 스케줄링 도구로, TaskScheduler 인터페이스의 구현체이다. 스레드 풀을 관리하여 멀티스레드 Task를 예약하고 실행할 수 있도록 설계되었다. 스케줄링 관리를 유연하게 할 수 있도록 도와줘서, 이번에 동적으로 스케줄링 작업을 추가하고 제거할 때 사용하였다. 내 예시 코드에서는 Cron을 사용하였지만 특정 시간, 간격으로 설정할 수도 있다.
 
ThreadPoolScheduler에는 다음과 같은 기능이 있다.
 
- initialize(): 스레드 풀 초기화
- shutdown() 스케줄러 종료
 
- schedule(Runnable task, Trigger trigger): 사용자 정의 트리거를 기반으로 작업을 실행한다. 예를 들어 크론 표현식을 넣으려면 new CronTrigger(cronExpression)을 사용할 수 있다.

@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    ScheduledExecutorService executor = this.getScheduledExecutor();

    try {
        ErrorHandler errorHandler = this.errorHandler;
        if (errorHandler == null) {
            errorHandler = TaskUtils.getDefaultErrorHandler(true);
        }

        return (new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler)).schedule();
    } catch (RejectedExecutionException var5) {
        RejectedExecutionException ex = var5;
        throw new TaskRejectedException(executor, task, ex);
    }
}

 
- schedule(Runnable task, Instance startTime): 특정 시간에 작업을 실행한다. 예를 들면 LocalDateTime을 두번째 파라미터로 넣어서 스케줄러 동작 시간을 지정할 수 있다.

public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
    ScheduledExecutorService executor = this.getScheduledExecutor();
    Duration delay = Duration.between(this.clock.instant(), startTime);

    try {
        return executor.schedule(this.errorHandlingTask(task, false), NANO.convert(delay), NANO);
    } catch (RejectedExecutionException var6) {
        RejectedExecutionException ex = var6;
        throw new TaskRejectedException(executor, task, ex);
    }
}

 
 

□ interface ScheduledFuture

Task를 스케줄링한 결과이다. cancle, get, isCancelled, isDone 기능을 사용할 수 있다. 위에 작성한 샘플 코드에서 보면, scheduler.schedule(task, trigger)한 결과값인 ScheduledFuture를 별도의 map에 보관해서, 나중에 이 task를 취소해야 할 때 map을 통해 ScheduledFuture 인스턴스에 접근해 cancel해주는 용도로 사용했다.

package java.util.concurrent;

/**
 * A delayed result-bearing action that can be cancelled.
 * Usually a scheduled future is the result of scheduling
 * a task with a {@link ScheduledExecutorService}.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future
 */
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

 
 


 
 

반응형