Airflow에서의 heavy data 처리에 대한 단게적 개선
· 8 min read
외부 데이터 소스로부터 가져온 raw-data들에 대해 중복 데이터 제거 및 데이터 셋간의 관계 설정 및 데이터 클랜징 처리등의 것들을 하면서 대용량 데이터의 처리에 대해 단계적으로 개선한 내용을 간략히 정리하여 공유 합니다. 본 글의 내용은 성능 병목을 개선하기 위한 단계별 전략을 일반적인 케이스로 정리한 가이드입니다. 각 단계는 실제로 성능 향상에 효과적인 접근법을 순차적으로 나열한 것입니다.
1단계: 기본 ORM 기반 배치 Upsert
- ORM을 사용해 한 번에 여러 개의 레코드를 upsert 처리
- 예시: SQLAlchemy + PostgreSQL
ON CONFLICT - 문제점:
- 대량의 데이터를 처리할 경우 SQL 파라미터 개수 제한 초과 (
32767) - ORM overhead로 인한 느린 처리 속도
- 대량의 데이터를 처리할 경우 SQL 파라미터 개수 제한 초과 (
Samples
# imports
import io
import logging
import re
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper
from sqlalchemy.sql import Executable
from typing import List, Union
@dataclass
class NormalizedHeavyRecord:
...
async def upsert_many(self, records: List[DrugDocumentParagraphRecord]) -> int:
if not records:
return 0
total_upserted = 0
batch_size = 50 # first need to try with small size, and then increase it as you check working fine.
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
stmt = self._build_upsert_stmt(batch)
result = await self._execute_statement(stmt)
total_upserted += self._get_row_count(result)
return total_upserted
def _get_row_count(self, result: Result) -> int:
# return len(result.fetchall())
return len(result.scalars().all())
def _build_upsert_stmt(
self, records: Union[NormalizedHeavyRecord, List[NormalizedHeavyRecord]]
) -> Executable:
is_bulk = isinstance(records, list)
model = NormalizedHeavyRecord
# Extract all column keys except the primary key (`id`)
# content_hash is unique key for the data, so you need to build and save it with the data
mapper = class_mapper(model)
column_keys = [col.key for col in mapper.columns if col.key not in ("id", "content_hash")]
insert_stmt = pg_insert(model)
if is_bulk:
values = [vars(record) for record in records]
else:
values = vars(records)
# Remove _sa_instance_state (added by SQLAlchemy ORM)
if is_bulk:
for v in values:
v.pop("_sa_instance_state", None)
else:
values.pop("_sa_instance_state", None)
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=["content_hash"],
set_={key: getattr(insert_stmt.excluded, key) for key in column_keys}
).returning(NormalizedHeavyRecord.id)
return update_stmt.values(values)
async def _execute_statement(self, stmt: Executable) -> Result:
result: Result = await self.session.execute(stmt)
await self.session.commit()
return result
