대용량 데이터 export 개선
400,000개의 data export시 청크&병렬 처리로 다운로드 속도를 10~15배 개선하였습니다.
최근 대용량 데이터 Export 기능을 개선하는 작업을 진행했습니다.
이 과정에서 발견한 문제점들과 해결 방법을 공유하고자 합니다.
배경
대용량 데이터 처리의 도전 과제
시스템에서 대용량 데이터를 Export하는 기능이 있었는데, 400,000개의 데이터를 처리해야 하는 상황이었습니다.
하지만 데이터베이스 쿼리에는 100,000개의 limit 제약이 있어서, 한 번에 모든 데이터를 가져올 수 없었습니다.
초기 해결 방안: 청크 처리
이 문제를 해결하기 위해 데이터를 작은 단위로 나누어 처리하는 청크(Chunk) 처리 방식을 도입했습니다.
- 전체 400,000개 데이터를 100,000개씩 4개의 청크로 분할
- 각 청크를 순차적으로 처리하여 limit 제약을 우회
하지만 이 방식에도 문제가 있었습니다.
순차 처리의 한계
- 4개의 청크를 순차적으로 처리하면 전체 처리 시간이 매우 길어짐
- 각 청크가 10분씩 걸린다면 총 40분이 소요됨
- 사용자는 긴 시간 동안 대기해야 함
병렬 처리 도입
속도 개선을 위해 병렬 처리를 도입했습니다. 구체적인 구현 방법은 다음과 같습니다.
1. 커서 기반 페이징으로 데이터 가져오기
DB 쿼리 limit 제약(100,000개)을 우회하기 위해 커서 기반 페이징을 사용합니다.
async def find_export_data_ids(data_query):
data_ids = []
cursor_id = 0
while True:
# 커서를 사용하여 배치 단위로 데이터 가져오기
batch = await data_repository.find_id_and_type_by_query(
data_query,
cursor_id=cursor_id,
limit=BATCH_SIZE # 예: 1000개씩
)
if not batch:
break
# 배치에서 데이터 ID 추출
for data_id, data_type in batch:
data_ids.append(data_id)
# 다음 배치를 위한 커서 업데이트
cursor_id = batch[-1][0]
# 마지막 배치인지 확인
if len(batch) < BATCH_SIZE:
break
return data_ids # 전체 400,000개 반환이 방식으로 limit 제약 없이 모든 데이터를 가져올 수 있습니다.
2. 데이터를 청크로 분할
가져온 전체 데이터 ID 리스트를 처리 단위로 나눕니다.
# 전체 데이터 ID를 가져옴 (커서 기반 페이징으로)
data_ids = await find_export_data_ids(data_query) # 400,000개
# 데이터를 청크 크기로 분할 (예: 100,000개씩)
chunks = ListUtil.split(data_ids, EXPORT_CHUNK_SIZE)
# 결과: [chunk1(100,000개), chunk2(100,000개), chunk3(100,000개), chunk4(100,000개)]3. 병렬 처리 구현
asyncio.gather를 사용하여 모든 청크를 병렬로 처리합니다.
# 각 청크를 처리하는 비동기 함수 정의
async def process_chunk(chunk: list[int]) -> None:
# 파일 쓰기 작업 수행
await write_file(chunk, ...)
# 진행 상황 업데이트
await update_progress(...)
# 모든 청크를 병렬로 실행
await asyncio.gather(*(process_chunk(chunk) for chunk in chunks))이렇게 하면 4개 청크가 동시에 처리되어 순차 처리 대비 시간이 단축됩니다.
4. 동시성 제어 (Semaphore)
하지만 모든 청크를 무제한으로 동시 실행하면 시스템 리소스(디스크 I/O, 메모리)가 부족할 수 있습니다. 따라서 asyncio.Semaphore를 사용하여 동시 실행 수를 제어합니다.
# 최대 3개까지 동시 실행 허용
chunk_semaphore = asyncio.Semaphore(MAX_CONCURRENT_CHUNKS) # 예: 3
async def process_chunk(chunk: list[int]) -> None:
# Semaphore로 동시 실행 수 제어
async with chunk_semaphore:
await write_file(chunk, ...)
# Semaphore 밖에서 진행 상황 업데이트
await update_progress(...)
# 모든 청크를 병렬로 실행 (최대 3개까지만 동시 실행)
await asyncio.gather(*(process_chunk(chunk) for chunk in chunks))동작 방식
- 처음 3개 청크가 동시에 실행됨
- 하나가 완료되면 대기 중인 다음 청크가 시작됨
- 이론적으로 4개 청크를 3개씩 병렬 처리하면 약 13분으로 단축 가능
5. 파일 쓰기 작업의 병렬 처리
각 청크 내부에서도 파일 쓰기 작업을 병렬로 처리합니다
async def write_file(data_ids: list[int], ...):
# 데이터를 처리하여 파일로 쓸 데이터 준비
data_export_bos = await process_export_data(data_ids, ...)
# 파일 쓰기 작업의 동시성 제어 (예: 최대 10개 동시 쓰기)
write_semaphore = asyncio.Semaphore(MAX_CONCURRENT_WRITES)
async def write_file_with_semaphore(content: str, path: str):
async with write_semaphore:
await file_util.write_string(content, path)
# 모든 파일 쓰기 작업을 병렬로 실행
write_tasks = []
for data_export_bo in data_export_bos:
write_tasks.append(
write_file_with_semaphore(content, path)
)
await asyncio.gather(*write_tasks)2단계 병렬 처리 구조
- 청크 레벨 병렬 처리: 여러 청크를 동시에 처리 (최대 3개)
- 파일 레벨 병렬 처리: 각 청크 내부에서 여러 파일을 동시에 쓰기 (최대 10개)
이렇게 하면 전체 처리 시간이 크게 단축됩니다.
하지만 병렬 처리를 구현하면서 예상치 못한 문제들이 발견되었습니다.
성능 개선 효과: 400,000개 데이터 처리 시나리오
설정값과 처리 규모
실제 운영 환경에서 사용하는 설정값은 다음과 같습니다.
- 청크 크기: 1,000개 데이터씩 하나의 청크로 처리
- 병렬 청크 수: 최대 10개 청크를 동시에 처리 (DB 연결 풀 크기의 20% 수준)
- 병렬 파일 쓰기: 각 청크 내부에서 최대 50개 파일을 동시에 쓰기
- 커서 페이징 배치: 50,000개씩 배치로 데이터 조회 (DB limit의 절반)
400,000개 데이터를 처리하는 경우를 예로 들면,
- 총 400개의 청크로 분할 (400,000 ÷ 1,000)
- 각 청크당 평균 2,000개의 파일 생성 (data.json + result.json)
- 전체 약 800,000개의 파일이 생성됨
이전 방식: 순차 처리
개선 전에는 모든 작업을 순차적으로 처리했습니다.
데이터 조회 단계
- 400,000개 데이터를 한 번에 메모리에 로드
- 소요 시간: 약 30-60초
- 메모리 사용량: 매우 높음 (400,000개 객체를 동시에 메모리에 보관)
청크 처리 단계
- 400개 청크를 하나씩 순차 처리
- 각 청크 처리 시간: 약 10초 (DB 쿼리 + 파일 쓰기)
- 총 소요 시간: 400개 × 10초 = 약 66.7분
파일 쓰기 단계
- 각 청크 내부에서도 파일을 순차적으로 쓰기
- 각 파일 쓰기 시간: 약 0.01초
- 총 소요 시간: 800,000개 × 0.01초 = 약 133분
전체 예상 시간: 약 200-220분 (3.3-3.7시간)
사용자는 3시간 이상 기다려야 했고, 메모리 부족으로 인한 시스템 불안정 문제도 발생할 수 있었습니다.
개선된 방식: 병렬 처리
개선 후에는 여러 단계에서 병렬 처리를 적용했습니다.
데이터 조회 단계
- 커서 기반 페이징으로 50,000개씩 8개 배치로 나누어 조회
- 소요 시간: 약 10-15초 (이전 대비 약 75% 감소)
- 메모리 사용량: 낮음 (배치 단위로만 메모리 사용)
청크 처리 단계
- 400개 청크를 10개씩 병렬로 처리
- 각 청크 처리 시간: 약 10초
- 배치 수: 400 ÷ 10 = 40배치
- 총 소요 시간: 40배치 × 10초 = 약 6.7분 (이전 대비 약 90% 감소)
파일 쓰기 단계
- 각 청크 내부에서 50개 파일씩 병렬로 쓰기
- 청크당 2,000개 파일을 50개씩 나누면 40배치
- 각 배치 처리 시간: 약 0.5초 (50개 파일 동시 쓰기)
- 청크당 소요 시간: 40배치 × 0.5초 = 약 20초
- 10개 청크가 동시에 처리되므로, 전체 시간: (400개 ÷ 10) × 20초 = 약 13.3분
전체 예상 시간: 약 15-20분
실제 성능 개선 효과
이론적 계산으로는 약 10배의 성능 향상이 예상되었지만, 실제 운영 환경에서 측정한 결과는 더욱 인상적이었습니다.
실제 브라우저에서 시간을 측정해본 결과, 약 20배의 성능 향상을 확인할 수 있었습니다.
이는 다음과 같은 요인들 때문으로 분석됩니다.
- I/O 대기 시간 감소: 병렬 처리로 디스크 I/O 대기 시간이 겹쳐서 실제 처리 시간 단축
- DB 연결 풀 활용도 향상: 여러 청크가 동시에 DB를 사용하여 연결 풀 활용도 증가
- 시스템 리소스 활용 극대화: CPU, 메모리, 디스크 I/O가 균형있게 활용됨
결과적으로 사용자는 3시간 이상 기다리던 작업을 15-20분 만에 완료할 수 있게 되었고, 시스템의 안정성도 크게 향상되었습니다.
마무리
대용량 데이터 처리 기능을 개선하면서, 단순히 기능을 구현하는 것을 넘어서 성능과 안정성을 고려한 코드 작성의 중요성을 다시 한번 느꼈습니다.
특히 다음과 같은 점들이 중요하다고 생각합니다.
- 동시성 제어의 범위를 최소화: 필요한 부분에만 적용하여 병목 현상 방지
- 부분 실패 허용: 전체 작업이 실패하지 않도록 설계
- 명확한 로직과 주석: 코드를 읽는 사람이 의도를 쉽게 이해할 수 있도록
- 로그 최적화: 필요한 정보만 포함하여 성능과 디버깅 효율 향상
앞으로도 이런 관점에서 코드를 지속적으로 개선해 나가고자 합니다.