본문 바로가기
[개발] 프레임워크/Spring

[Spring F/W] Spring Batch

by Devsong26 2023. 10. 23.

스프링 배치 가이드 프로젝트를 클론하여 공부해 보려고 합니다.

 

설정

 

스펙은 Java 17, HSQLDB, Spring Batch, Gradle 입니다.

(참고, HSQLDB는 인메모리 데이터베이스)

https://start.spring.io/ 에서 dependencies에 HyperSQL Database, Spring Batch를 추가합니다.

zip 파일의 압축을 해제하고 IDE로 프로젝트를 엽니다.

 

스프링 공식 홈페이지에서 제공하는 Spring batch guide 프로젝트를 클론하여 참고합니다.

git clone https://github.com/spring-guides/gs-batch-processing.git

 

학습용 더미 데이터는 가이드 프로젝트의 src/main/resources/sample-data.csv 에 있습니다.

 

가이드 프로젝트는 스프링 부트이며 기동시 자동으로 아래 커맨드를 수행하여 

src/main/resources/schema-all.sql을 실행한다고 합니다.

schema-@@platform@@.sql

 

비지니스 클래스 생성하기

입출력 데이터 포맷용 레코드인 Person이 있습니다.

Person record의 생성자의 파라미터로 firstName, lastName을 입력하여 객체를 생성할 수 있습니다.

package com.example.batchprocessing;

public record Person(String firstName, String lastName) {

}

 


Intermediate Processor 생성하기

배치 처리의 일반적인 패러다임은 데이터를 취득하고, 변환하고, 파이프를 통해 다른 곳으로 전송하는 것입니다.

이름을 대문자로 변환하는 간단한 변환기를 보여줍니다.

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}

PersonItemProcessor는 Spring Batch의 ItemProcessor 인터페이스를 구현합니다.

이것은 배치 잡에 코드를 연결하기 쉽게 만듭니다.

인터페이스의 따르면 Person 객체를 입력받아 성과 이름을 모두 대문자로 변환한 후 새로운 Person 객체를 반환합니다.

 

참고로 입력와 출력의 데이터 타입은 같을 필요가 없습니다.

가끔은 데이터 소스를 읽은 후에 애플리케이션 데이터 플로우는 다른 타입을 필요로 합니다.

 


배치 잡과 함께 두기

스프링 배치는 비지니스 로직에 초점을 맞출 수 있도록, 커스텀 코드 작성을 줄일 수 있는 많은 기능성 클래스를 제공합니다. 잡을 설정하기 위해 src/main/java/com/example/batchprocessing/BatchConfiguration.java 클래스처럼 @Configuration 빈을 정의해야 합니다. 가이드의 예제는 인메모리 데이터베이스를 사용하며 휘발성 데이터를 다룹니다.

 

@Configuration 빈에는 reader, processor, writer를 정의합니다.

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names("firstName", "lastName")
    .targetType(Person.class)
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .beanMapped()
    .build();
}

reader()

- ItemReader 객체를 생성합니다.

- sample-data.csv 파일을 읽고, 각 줄을 Person 객체로 파싱합니다.

 

processor()

- 앞서 정의했던 PersonItemProcessor 객체를 생성합니다.

- 문자열을 대문자로 변환합니다.

 

writer(DataSource)

- ItemWriter 객체를 생성합니다.

- JDBC로 연동되는 관계형 데이터베이스를 대상으로, Spring Boot에 의해 생성된 dataSource 복사본을 자동으로 얻습니다. 이것은 단일 Person을 삽입하기 위해 필요한 SQL 문을 포함하며 Java 레코드 컴포넌트에 의해 구동됩니다.

 

 

같은 @Configuration의 마지막 코드로는 Job과 Step을 정의합니다.

@Bean
public Job importUserJob(JobRepository jobRepository,Step step1, JobCompletionNotificationListener listener) {
  return new JobBuilder("importUserJob", jobRepository)
    .listener(listener)
    .start(step1)
    .build();
}

@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
          FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
  return new StepBuilder("step1", jobRepository)
    .<Person, Person> chunk(3, transactionManager)
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .build();
}

job은 Step들로 구성되며 Step은 reader, processor, writer를 포함할 수 있습니다.

Step은 여러개를 정의할 수 있습니다만 이 가이드는 한가지 뿐입니다. 

 

스텝에는 한 번에 얼마나 많은 데이터를 쓸 수 있는지 정의합니다.

한가지 예시로 한 번에 3개의 레코드를 사용합니다.

스텝에서 사용되는 reader, processor, writer는 먼저 정의가 돼야 bean을 주입받을 수 있습니다.

 

chunk는 제네릭 메서드이므로 <I, O> 입력과 출력에 타입을 <Person, Person>처럼 선언해야 합니다.

이것은 각 '청크'의 처리에 대한 입력과 출력 타입을 나타내며 ItemReader<Person>과 ItemWriter<Person>라는 의미입니다.

 

Job이 완료됐을 때 알림을 받을 수 있는 방법으로 JobCompletionNotificationListener를 정의합니다.

위치: src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{{}}> in the database.", person));
    }
  }
}

 

JobCompletionNotificationListener는 Job이 BatchStatus.COMPLETE가 됐을 때를 리스닝하고 jdbcTemplate으로 결과를 조사한다. 

 


실행할 수 있는 애플리케이션 만들기

배치 프로세싱이 웹앱과 war파일에 내장시킬 수도 있지만 standalone 애플리케이션을 만들어 간단하게 시연할 수 있습니다. 간단한 예시는 별도의 수정없이도 기동할 수 있습니다. 

위치: src/main/java/com/example/batchprocessing/BatchProcessingApplication.java

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
  }
}

@SpringBootApplication은 아래의 사항들을 추가하는 간편한 어노테이션입니다.

- @Configuration: 애플리케이션 컨테스트를 위한 빈 정의 소스 클래스

- @EnableAutoConfiguration: 클래스패스 셋팅에 기반한 빈, 다른 빈, 다양한 프로퍼티 셋팅을 추가한다고 스프링 부트에 알리는 설정입니다.

- @ComponentScan: 다른 컴포넌트, 설정, 루트 패키지 안에 위치한 서비스,  컨트롤러를 찾으라고 스프링에게 알리는 설정입니다.

 

main() 메서드는 애플리케이션을 기동하기 위해 SpringApplication.run() 메서드를 사용합니다.

이 웹 애플리케이션은 순수 자바로 구성되었으며 어떠한 기반 시설이나 인프라, XML도 사용되지 않았습니다.

 

SpringApplication.exit(), System.exit() 메서드는 Job 완료 시 JVM이 종료되도록 보장합니다.

 

시연 목적으로 jdbcTemplate, 데이터베이스 쿼리를 생성하고 Job Insert시 사람 이름을 콘솔에 출력합니다.

 

 

실행가능한 Jar 빌드하기

빌드 툴의 명령어를 이용하여 애플리케이션을 실행할 수 있습니다. 필요한 의존성, 클래스, resources를 모두 포함하는

단일 실행가능한 jar 파일도 만들고 실행할 수 있습니다. 

실행 가능한 JAR 파일을 생성하면 개발 생명 주기 동안, 다양한 환경에서 서비스를 애플리케이션으로 배포, 버전 관리, 출시하기가 훨씬 쉬워집니다.

 

ㄴGradle을 사용하므로 ./gradlew bootRun 으로 애플케이션을 직접 실행하거나 ./gradlew build를 통해 jar파일을 만든 후 아래의 명령어로 jar 파일을 실행할 수 있습니다.

java -jar build/libs/gs-batch-processing-0.1.0.jar

 

정상적으로 기동이 된다면 아래처럼 콘솔에 출력할 것입니다.

Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
Found <{Person[firstName=JILL, lastName=DOE]}> in the database.
Found <{Person[firstName=JOE, lastName=DOE]}> in the database.
Found <{Person[firstName=JUSTIN, lastName=DOE]}> in the database.
Found <{Person[firstName=JANE, lastName=DOE]}> in the database.
Found <{Person[firstName=JOHN, lastName=DOE]}> in the database.