""" CSV 가져오기 — 우리 표준 포맷. 표준 포맷 (v2.7.1+): date,clock_in,clock_out,lunch_minutes,dinner_minutes,memo 2026-04-01,09:00:00,18:30:00,60,0,"메모" - 헤더 첫 줄 필수 - date: YYYY-MM-DD - clock_in/out: HH:MM:SS 또는 HH:MM (clock_out이 clock_in보다 빠르면 익일로 간주) - lunch_minutes: 정수 (0이면 점심 미포함) - dinner_minutes: 정수 (옵션, 0/누락이면 저녁 미포함) - memo: 선택 (따옴표 가능) 기존 일자와 충돌 시 import 호출자가 'overwrite'/'skip' 정책 결정. """ from __future__ import annotations import csv from datetime import datetime from pathlib import Path from typing import List, Dict, Iterator, Tuple def parse_csv(path: str) -> List[Dict]: """CSV 파일을 dict 리스트로 파싱. 검증 실패 시 ValueError.""" rows = [] p = Path(path) if not p.exists(): raise FileNotFoundError(f"파일 없음: {path}") with open(p, 'r', encoding='utf-8-sig', newline='') as f: reader = csv.DictReader(f) required = {'date', 'clock_in'} if not required.issubset(reader.fieldnames or []): raise ValueError( f"헤더에 필수 필드 누락: {required - set(reader.fieldnames or [])}\n" f"필수 헤더: date,clock_in,clock_out,lunch_minutes,memo" ) for i, row in enumerate(reader, start=2): # 데이터 시작 줄 번호 (1=헤더) try: clean = _normalize_row(row) rows.append(clean) except ValueError as e: raise ValueError(f"줄 {i}: {e}") return rows def _parse_minutes(raw: str, field_name: str) -> int: """0 이상 정수 파싱. 빈 값이면 0.""" s = (raw or '').strip() if not s: return 0 try: v = int(s) if v < 0: raise ValueError except ValueError: raise ValueError(f"{field_name}는 0 이상 정수여야 함: '{raw}'") return v def _normalize_row(row: Dict) -> Dict: """단일 행 검증 + 정규화.""" date_str = (row.get('date') or '').strip() if not date_str: raise ValueError("date 비어있음") try: datetime.strptime(date_str, '%Y-%m-%d') except ValueError: raise ValueError(f"date 형식 오류: '{date_str}' (YYYY-MM-DD 필요)") ci = _normalize_time(row.get('clock_in', '').strip(), 'clock_in') co_raw = (row.get('clock_out') or '').strip() co = _normalize_time(co_raw, 'clock_out') if co_raw else None lunch = _parse_minutes(row.get('lunch_minutes', ''), 'lunch_minutes') dinner = _parse_minutes(row.get('dinner_minutes', ''), 'dinner_minutes') memo = (row.get('memo') or '').strip() return { 'date': date_str, 'clock_in': ci, 'clock_out': co, 'lunch_minutes': lunch, 'dinner_minutes': dinner, 'memo': memo, } def _normalize_time(s: str, field_name: str) -> str: """'HH:MM' 또는 'HH:MM:SS' → 'HH:MM:SS'.""" if not s: raise ValueError(f"{field_name} 비어있음") parts = s.split(':') if len(parts) == 2: s = f"{s}:00" elif len(parts) != 3: raise ValueError(f"{field_name} 형식 오류: '{s}' (HH:MM[:SS] 필요)") try: datetime.strptime(s, '%H:%M:%S') except ValueError: raise ValueError(f"{field_name} 시간 파싱 실패: '{s}'") return s def import_records(db, rows: List[Dict], on_conflict: str = 'skip') -> Tuple[int, int, int]: """파싱된 rows를 DB에 일괄 입력. Args: db: Database 인스턴스 rows: parse_csv 결과 on_conflict: 'skip' | 'overwrite' Returns: (added, updated, skipped) 수 """ if on_conflict not in ('skip', 'overwrite'): raise ValueError("on_conflict는 'skip' 또는 'overwrite'") added = updated = skipped = 0 from datetime import timedelta from core.time_calculator import TimeCalculator work_minutes = db.get_work_minutes() lunch_default = db.get_setting_int('lunch_duration_minutes', 60) dinner_default = db.get_setting_int('dinner_duration_minutes', 60) for row in rows: existing = db.get_work_record(row['date']) if existing and on_conflict == 'skip': skipped += 1 continue if existing and on_conflict == 'overwrite': # 기존 레코드 삭제 후 재추가 (단순화) conn = db.get_connection() cursor = conn.cursor() try: cursor.execute("DELETE FROM overtime_bank WHERE date = ?", (row['date'],)) cursor.execute("DELETE FROM break_records WHERE date = ?", (row['date'],)) cursor.execute("DELETE FROM work_records WHERE date = ?", (row['date'],)) conn.commit() except Exception: conn.rollback() raise finally: conn.close() updated += 1 else: added += 1 wid = db.add_work_record(row['date'], row['clock_in'], is_manual=True) if row.get('clock_out'): ci_dt = datetime.strptime(f"{row['date']} {row['clock_in']}", '%Y-%m-%d %H:%M:%S') co_dt = datetime.strptime(f"{row['date']} {row['clock_out']}", '%Y-%m-%d %H:%M:%S') # 자정 경계: 퇴근이 출근보다 빠르면 익일로 간주 if co_dt <= ci_dt: co_dt += timedelta(days=1) lunch_min = row.get('lunch_minutes') or 0 dinner_min = row.get('dinner_minutes') or 0 calc = TimeCalculator( work_minutes=work_minutes, lunch_duration_minutes=lunch_min or lunch_default, dinner_duration_minutes=dinner_min or dinner_default, ) include_lunch = lunch_min > 0 include_dinner = dinner_min > 0 total = (co_dt - ci_dt).total_seconds() / 3600 ot_actual, ot_earned = calc.calculate_overtime( ci_dt, co_dt, include_lunch=include_lunch, include_dinner=include_dinner, ) db.update_clock_out(row['date'], row['clock_out'], total, ot_actual, ot_earned) if include_lunch: db.update_lunch_break(row['date'], True) if include_dinner: db.update_dinner_break(row['date'], True) if ot_earned > 0: db.add_overtime_earned(wid, ot_earned, row['date']) return added, updated, skipped