본문 바로가기

카테고리 없음

Spring Batch로 역정규화 해보기

0. 개요

 

 

현재 프로젝트의 ERD부터 살펴보면 Item table을 중심으로 연관된 테이블들이 무수히 많다. 

 

 

Item 별로 검색과 필터링에 필요하기 때문에 아래 코드와 같이

 

@Override
public Page<ItemSearchResponseDto> searchItem(ItemSearchCondition searchCondition, Member member, Pageable pageable) {
    BooleanExpression isLiked = member != null ? new CaseBuilder().when(itemLike.member.eq(member)).then(true).otherwise(false) : Expressions.asBoolean(false);

    BooleanBuilder booleanBuilder = new BooleanBuilder();

    JPAQuery<ItemSearchResponseDto> mainSearchQuery = 
    		...
            .where(booleanBuilder);


    JPAQuery<Long> total = queryDslConfig.jpaQueryFactory()
            ...
            .where(booleanBuilder);

    if (searchCondition.getCategoryName() != null && !searchCondition.getCategoryName().isEmpty()) {
        ...
    }

    if (isNotEmpty(searchCondition.getJobName())) {
    	...
    }

    if (isNotEmpty(searchCondition.getSituationName()) || isNotEmpty(searchCondition.getGender()) || isNotEmpty(searchCondition.getEmotionName())) {
        ...
        );

        // 키워드가 category에 속한 경우 조인 수행
        if (itemCategory.category.name.contains(keyword) != null || category.name.contains(keyword) != null) {
            ...
        }

        // 키워드가 itemJob에 속한 경우 조인 수행
        if (itemJob.name.contains(keyword) != null) {
            ...
        }

        // 키워드가 itemFilter에 속한 경우 조인 수행
        if (itemFilter.name.contains(keyword)!= null) {
            ...
        }
    }

    ...

    return PageableExecutionUtils.getPage(content, pageable, total::fetchOne);
}

@Override
public Page<SortSearchResponseDto> sortItem(SortCondition sortCondition, Pageable pageable) {
    ...

    // 페이지 처리
    return PageableExecutionUtils.getPage(content, pageable, total::fetchOne);
}

 

searchItem 동적 쿼리에서 여러 테이블을 JOIN 하고 있고

 

필터링 조건이 추가될때마다 BooleanBuilder가 복잡해지면서

 

쿼리 실행 속도가 느려지고 있었다.

 

 

 

따라서 Item table을 중심으로 연결된 수많은 연관관계와 계층 구조를

@Entity
@ToString
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ItemSearch {
    @Id
    Integer id;
    String name;
    String brand;
    String category;
    String itemCategory;
    Integer price;
    String brandUrl;
    String kakaoUrl;
    String coupangUrl;
    String naverUrl;
    String description;
    String jobName;
    String jobChildName;
    String age;
    String situation;
    String emotion;
    String gender;
    String type;
    String relation;
}

 

 

위의 ItemSearch 단일 테이블

 

검색과 필터링 대상이 되는 SearchCondition 필드들을 모두 압축시킨 역정규화를 진행해보았다.

 

 

1. Spring Batch를 도입한 이유

 

                      +----------------------------+
                      |      Spring Batch Job      |
                      +----------------------------+
                                |
        +--------------------------------------------------+
        |            Step (Chunk 기반 처리)                 |
        +--------------------------------------------------+
                                |
        +--------------------------------------------------+
        |          ItemReader / ItemProcessor / ItemWriter  |
        +--------------------------------------------------+
                                |
        +------------------+   +------------------+   +------------------+
        | FlatFileReader   | → |  ItemProcessor   | → | JpaItemWriter    | (CSV 파일 읽고 parsing 후 DB 저장)
        | CsvItemReader    | → |  (CsvItemProcessor) | | FlatFileWriter   | 
        +------------------+   +------------------+   +------------------+

 

저번 프로젝트에서 Open API 데이터를 Quartz Scheduler로 활용해 insert 해봐서

 

Spring Batch와 Scheduler 활용 중 고민이 있었는데,

 

  • 대량 데이터 처리 및 효율적인 데이터 변환
  • 복잡한 데이터 흐름 처리 (검색, 필터링, 필드 매핑 등)
  • 트랜잭션 관리 및 안정성

등을 고려해서 주기적 작업 실행에 유리한 스케줄러보다는 Spring Batch를 선택하게 되었다.

 

 

 

기존 JDBC Insert를 쓰면서 insert 도중 에러가 발생하면

 

CSV 파일을 파싱하면서 발생한 error 인지,

 

CSV file 을 불러오는데 발생하는 에러인지 

 

Log를 남기긴 했지만

 

메타테이블들

 

 

Spring Batch의 메타테이블들을 통해

 

현재 진행 중인 배치와 성공 실패 여부, 수행 시간 등 많은 정보를

 

Log보다 체계적으로 모니터링이가능했다.

 

 

위와 같이 특정 Job을 실행하다 에러가 발생하면 EXIT_MESSAGE로 바로 조회가 가능하게끔 관리가 가능해

 

Log 파일을 열어보는 것보다 모니터링도 수월했고

 

리스너를 추가해 오류 발생시 개발팀 Slack 채널에 자동으로 알림이 가도록 개발하면 좋겠다는 생각도 들었다.

 

 

 

<기존 JDBC BATCH INSERT>

public class DataParserV2 {

    private final JdbcTemplate jdbcTemplate;

    @PostConstruct
    public void parseAndInsertDataFromCsv() {
        try (InputStream inputStream = new ClassPathResource("data.csv").getInputStream();
             CSVReader csvReader = new CSVReader(new InputStreamReader(inputStream))) {
            ...
        } catch (IOException e) {
            ...
        }
    }

    @Transactional
    public void parseAndInsertData(List<DataRow> dataRows) {
        try {
            String insertSql = "INSERT INTO item (id, name, brand, price, description) VALUES (?, ?, ?, ?, ?)";
            String categorySql = "INSERT INTO category (name) VALUES (?) " + "ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name";
            String itemCategorySql = "INSERT INTO category (name) VALUES (?)";
            String itemUrlSql = "INSERT INTO item_url (item_id, url) VALUES (?, ?)";
            String itemJobSql = "INSERT INTO item_job (item_id, name, step) VALUES (?, ?, ?)";
            String filterSql = "INSERT INTO item_filter (item_id, filter_id, name) VALUES (?, ?, ?)";

            List<Object[]> itemBatchArgs;
            List<Object[]> categoryBatchArgs;
            List<Object[]> itemCategoryBatchArgs;
            List<Object[]> itemUrlBatchArgs;
            List<Object[]> itemJobBatchArgs = new ArrayList<>();
            List<Object[]> filterBatchArgs = new ArrayList<>();

            long situationId = 2L;
            long emotionId = 3L;
            long genderId = 4L;
            long preferenceId = 5L;
            long typeId = 6L;
            long relationId = 7L;

            itemBatchArgs = dataRows.stream()
                    .map(row -> {
                        ...
  			...
            jdbcTemplate.batchUpdate();

        } catch (Exception e) {
            ...
        }
    }
}

 

<SPRING BATCH 도입 : ITEM READER, JOB>

@Component
public class CsvItemReader extends FlatFileItemReader<CsvDVO> {

    public CsvItemReader() {
        this.setResource(new ClassPathResource("/sample/csvJob2_input.csv"));

        DefaultLineMapper<CsvDVO> dtoDefaultLineMapper = new DefaultLineMapper<>();

        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setNames("id", "name", "brand", "category","itemCategory","price",
                "brandUrl", "kakaoUrl", "coupangUrl", "naverUrl", "description", "jobName", "jobChildName",
                "age", "situation", "emotion", "gender", "type", "relation");
        delimitedLineTokenizer.setDelimiter(",");

        BeanWrapperFieldSetMapper<CsvDVO> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        beanWrapperFieldSetMapper.setTargetType(CsvDVO.class);
        dtoDefaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        dtoDefaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        this.setLineMapper(dtoDefaultLineMapper);
    }
}

 

@Bean
public Step csvJpa_batchStep1() throws Exception {
    return stepBuilderFactory.get("csvJpa_batchStep1")
            .<CsvDVO, ItemSearch>chunk(chunkSize)
            .reader(csvJpa_FileReader())
            .processor(csvItemProcessor)
            .writer(csvJpaJob_dbItemWriter())
            .build();
}

@Bean
public CsvItemReader csvJpa_FileReader() {
    return csvItemReader;
}


@Bean
public JpaItemWriter<ItemSearch> csvJpaJob_dbItemWriter() {
    JpaItemWriter<ItemSearch> jpaItemWriter = new JpaItemWriter<>();
    jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
    return jpaItemWriter;
}

 

 

1.파일 읽기: CsvItemReader를 사용하여 CSV 파일을 읽고 CsvDVO 객체로 변환

 

2.데이터 변환: CsvItemProcessor는 CsvDVO 객체를 ItemSearch 객체로 변환

 

3. 데이터 저장: JpaItemWriter로 변환된 ItemSearch 객체들을 DB에 저장

 

 

기존 코드에서는 연관관계를 테이블 별로 batchUpdate를 여러 번 호출하며 각 테이블에 데이터를 삽입하기 때문에 조금 더 복잡하고 긴 실행 시간을 거쳐야하지만,

 

CsvItemReader는 FlatFileItemReader를 활용해 한 번에 CsvDVO 객체를 매핑하고 단순하게 처리가 가능하다.

 

 

 

2. 개선된 점 및 결론

< 1. 기존 search test >

 

<2. batch search test>

 

 

 

기존에는 여러개의 관계를 처리해야 하여 Join 연산이 지속적으로 발생하였지만

 

역정규화 후에는 단일 테이블 조회만으로 검색이 가능해져 검색 API 응답 시간이 403ms → 13ms로 크게 감소되었다.

 

 

하지만 역정규화를 한 만큼 

 

Item과 관련된 category, job, filter 등등을 한 번에 업데이트해야 할 때,

 

지금처럼 ItemSearch 단일 테이블에서 모든 관계를 수정해야 하기 때문에 유지보수의 복잡성이 증가할 수 있다는

 

트레이드오프도 분명 존재한다 ..!

 

 

그리고 현재는 1000여개 가량의 CSV 파일을 기반으로 Chunck Size 100으로 설정해두고 테스트해본 결과였는데

 

조금 더 대량의 데이터에서 Spring Batch의 병렬 처리를 활용하면 유의미한 최적화가 되지 않았을까 하는 아쉬움도 있었다!