0. 개요
현재 프로젝트의 ERD부터 살펴보면 Item table을 중심으로 연관된 테이블들이 무수히 많다.
Item 별로 검색과 필터링에 필요하기 때문에 아래 코드와 같이
@Override
public Page<ItemSearchResponseDto> searchItem(ItemSearchCondition searchCondition, Member member, Pageable pageable) {
BooleanExpression isLiked = member != null ? new CaseBuilder().when(itemLike.member.eq(member)).then(true).otherwise(false) : Expressions.asBoolean(false);
BooleanBuilder booleanBuilder = new BooleanBuilder();
JPAQuery<ItemSearchResponseDto> mainSearchQuery =
...
.where(booleanBuilder);
JPAQuery<Long> total = queryDslConfig.jpaQueryFactory()
...
.where(booleanBuilder);
if (searchCondition.getCategoryName() != null && !searchCondition.getCategoryName().isEmpty()) {
...
}
if (isNotEmpty(searchCondition.getJobName())) {
...
}
if (isNotEmpty(searchCondition.getSituationName()) || isNotEmpty(searchCondition.getGender()) || isNotEmpty(searchCondition.getEmotionName())) {
...
);
// 키워드가 category에 속한 경우 조인 수행
if (itemCategory.category.name.contains(keyword) != null || category.name.contains(keyword) != null) {
...
}
// 키워드가 itemJob에 속한 경우 조인 수행
if (itemJob.name.contains(keyword) != null) {
...
}
// 키워드가 itemFilter에 속한 경우 조인 수행
if (itemFilter.name.contains(keyword)!= null) {
...
}
}
...
return PageableExecutionUtils.getPage(content, pageable, total::fetchOne);
}
@Override
public Page<SortSearchResponseDto> sortItem(SortCondition sortCondition, Pageable pageable) {
...
// 페이지 처리
return PageableExecutionUtils.getPage(content, pageable, total::fetchOne);
}
searchItem 동적 쿼리에서 여러 테이블을 JOIN 하고 있고
필터링 조건이 추가될때마다 BooleanBuilder가 복잡해지면서
쿼리 실행 속도가 느려지고 있었다.
따라서 Item table을 중심으로 연결된 수많은 연관관계와 계층 구조를
@Entity
@ToString
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ItemSearch {
@Id
Integer id;
String name;
String brand;
String category;
String itemCategory;
Integer price;
String brandUrl;
String kakaoUrl;
String coupangUrl;
String naverUrl;
String description;
String jobName;
String jobChildName;
String age;
String situation;
String emotion;
String gender;
String type;
String relation;
}
위의 ItemSearch 단일 테이블에
검색과 필터링 대상이 되는 SearchCondition 필드들을 모두 압축시킨 역정규화를 진행해보았다.
1. Spring Batch를 도입한 이유
+----------------------------+
| Spring Batch Job |
+----------------------------+
|
+--------------------------------------------------+
| Step (Chunk 기반 처리) |
+--------------------------------------------------+
|
+--------------------------------------------------+
| ItemReader / ItemProcessor / ItemWriter |
+--------------------------------------------------+
|
+------------------+ +------------------+ +------------------+
| FlatFileReader | → | ItemProcessor | → | JpaItemWriter | (CSV 파일 읽고 parsing 후 DB 저장)
| CsvItemReader | → | (CsvItemProcessor) | | FlatFileWriter |
+------------------+ +------------------+ +------------------+
저번 프로젝트에서 Open API 데이터를 Quartz Scheduler로 활용해 insert 해봐서
Spring Batch와 Scheduler 활용 중 고민이 있었는데,
- 대량 데이터 처리 및 효율적인 데이터 변환
- 복잡한 데이터 흐름 처리 (검색, 필터링, 필드 매핑 등)
- 트랜잭션 관리 및 안정성
등을 고려해서 주기적 작업 실행에 유리한 스케줄러보다는 Spring Batch를 선택하게 되었다.
기존 JDBC Insert를 쓰면서 insert 도중 에러가 발생하면
CSV 파일을 파싱하면서 발생한 error 인지,
CSV file 을 불러오는데 발생하는 에러인지
Log를 남기긴 했지만
Spring Batch의 메타테이블들을 통해
현재 진행 중인 배치와 성공 실패 여부, 수행 시간 등 많은 정보를
Log보다 체계적으로 모니터링이가능했다.
위와 같이 특정 Job을 실행하다 에러가 발생하면 EXIT_MESSAGE로 바로 조회가 가능하게끔 관리가 가능해
Log 파일을 열어보는 것보다 모니터링도 수월했고
리스너를 추가해 오류 발생시 개발팀 Slack 채널에 자동으로 알림이 가도록 개발하면 좋겠다는 생각도 들었다.
<기존 JDBC BATCH INSERT>
public class DataParserV2 {
private final JdbcTemplate jdbcTemplate;
@PostConstruct
public void parseAndInsertDataFromCsv() {
try (InputStream inputStream = new ClassPathResource("data.csv").getInputStream();
CSVReader csvReader = new CSVReader(new InputStreamReader(inputStream))) {
...
} catch (IOException e) {
...
}
}
@Transactional
public void parseAndInsertData(List<DataRow> dataRows) {
try {
String insertSql = "INSERT INTO item (id, name, brand, price, description) VALUES (?, ?, ?, ?, ?)";
String categorySql = "INSERT INTO category (name) VALUES (?) " + "ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name";
String itemCategorySql = "INSERT INTO category (name) VALUES (?)";
String itemUrlSql = "INSERT INTO item_url (item_id, url) VALUES (?, ?)";
String itemJobSql = "INSERT INTO item_job (item_id, name, step) VALUES (?, ?, ?)";
String filterSql = "INSERT INTO item_filter (item_id, filter_id, name) VALUES (?, ?, ?)";
List<Object[]> itemBatchArgs;
List<Object[]> categoryBatchArgs;
List<Object[]> itemCategoryBatchArgs;
List<Object[]> itemUrlBatchArgs;
List<Object[]> itemJobBatchArgs = new ArrayList<>();
List<Object[]> filterBatchArgs = new ArrayList<>();
long situationId = 2L;
long emotionId = 3L;
long genderId = 4L;
long preferenceId = 5L;
long typeId = 6L;
long relationId = 7L;
itemBatchArgs = dataRows.stream()
.map(row -> {
...
...
jdbcTemplate.batchUpdate();
} catch (Exception e) {
...
}
}
}
<SPRING BATCH 도입 : ITEM READER, JOB>
@Component
public class CsvItemReader extends FlatFileItemReader<CsvDVO> {
public CsvItemReader() {
this.setResource(new ClassPathResource("/sample/csvJob2_input.csv"));
DefaultLineMapper<CsvDVO> dtoDefaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames("id", "name", "brand", "category","itemCategory","price",
"brandUrl", "kakaoUrl", "coupangUrl", "naverUrl", "description", "jobName", "jobChildName",
"age", "situation", "emotion", "gender", "type", "relation");
delimitedLineTokenizer.setDelimiter(",");
BeanWrapperFieldSetMapper<CsvDVO> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
beanWrapperFieldSetMapper.setTargetType(CsvDVO.class);
dtoDefaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
dtoDefaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
this.setLineMapper(dtoDefaultLineMapper);
}
}
@Bean
public Step csvJpa_batchStep1() throws Exception {
return stepBuilderFactory.get("csvJpa_batchStep1")
.<CsvDVO, ItemSearch>chunk(chunkSize)
.reader(csvJpa_FileReader())
.processor(csvItemProcessor)
.writer(csvJpaJob_dbItemWriter())
.build();
}
@Bean
public CsvItemReader csvJpa_FileReader() {
return csvItemReader;
}
@Bean
public JpaItemWriter<ItemSearch> csvJpaJob_dbItemWriter() {
JpaItemWriter<ItemSearch> jpaItemWriter = new JpaItemWriter<>();
jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
return jpaItemWriter;
}
1.파일 읽기: CsvItemReader를 사용하여 CSV 파일을 읽고 CsvDVO 객체로 변환
2.데이터 변환: CsvItemProcessor는 CsvDVO 객체를 ItemSearch 객체로 변환
3. 데이터 저장: JpaItemWriter로 변환된 ItemSearch 객체들을 DB에 저장
기존 코드에서는 연관관계를 테이블 별로 batchUpdate를 여러 번 호출하며 각 테이블에 데이터를 삽입하기 때문에 조금 더 복잡하고 긴 실행 시간을 거쳐야하지만,
CsvItemReader는 FlatFileItemReader를 활용해 한 번에 CsvDVO 객체를 매핑하고 단순하게 처리가 가능하다.
2. 개선된 점 및 결론
기존에는 여러개의 관계를 처리해야 하여 Join 연산이 지속적으로 발생하였지만
역정규화 후에는 단일 테이블 조회만으로 검색이 가능해져 검색 API 응답 시간이 403ms → 13ms로 크게 감소되었다.
하지만 역정규화를 한 만큼
Item과 관련된 category, job, filter 등등을 한 번에 업데이트해야 할 때,
지금처럼 ItemSearch 단일 테이블에서 모든 관계를 수정해야 하기 때문에 유지보수의 복잡성이 증가할 수 있다는
트레이드오프도 분명 존재한다 ..!
그리고 현재는 1000여개 가량의 CSV 파일을 기반으로 Chunck Size를 100으로 설정해두고 테스트해본 결과였는데
조금 더 대량의 데이터에서 Spring Batch의 병렬 처리를 활용하면 유의미한 최적화가 되지 않았을까 하는 아쉬움도 있었다!