본문으로 건너뛰기

oneshim-storage

로컬 데이터 저장을 담당하는 크레이트. SQLite, 스키마 마이그레이션, 보존 정책을 관리합니다.


역할

  • 로컬 저장소: 오프라인 데이터 저장
  • 큐 관리: 미전송 이벤트/프레임 관리
  • 스키마 마이그레이션: 버전 관리
  • 보존 정책: 오래된 데이터 정리

디렉토리 구조

oneshim-storage/src/
├── lib.rs # 크레이트 루트
├── sqlite.rs # SqliteStorage - 저장소 구현
└── migration.rs # 스키마 마이그레이션

주요 컴포넌트

SqliteStorage (sqlite.rs)

SQLite 기반 로컬 저장소 (StorageService 포트 구현):

pub struct SqliteStorage {
pool: Pool<SqliteConnectionManager>,
}

impl SqliteStorage {
pub fn new(db_path: &str) -> Result<Self, CoreError> {
let manager = SqliteConnectionManager::file(db_path)
.with_init(|conn| {
// WAL 모드 활성화
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
conn.execute_batch("PRAGMA synchronous=NORMAL;")?;
Ok(())
});

let pool = Pool::builder()
.max_size(10)
.build(manager)?;

// 스키마 마이그레이션 실행
Migration::run(&pool)?;

Ok(Self { pool })
}
}

#[async_trait]
impl StorageService for SqliteStorage {
async fn save_event(&self, event: &ContextEvent) -> Result<(), CoreError> {
let conn = self.pool.get()?;

conn.execute(
"INSERT INTO events (event_id, session_id, event_type, timestamp, application, window_title, metadata, sent)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 0)",
params![
event.event_id,
event.session_id,
serde_json::to_string(&event.event_type)?,
event.timestamp.to_rfc3339(),
event.application,
event.window_title,
serde_json::to_string(&event.metadata)?,
],
)?;

Ok(())
}

async fn save_frame(&self, frame: &ProcessedFrame) -> Result<(), CoreError> {
let conn = self.pool.get()?;

conn.execute(
"INSERT INTO frames (frame_id, timestamp, payload_type, payload_data, sent)
VALUES (?1, ?2, ?3, ?4, 0)",
params![
uuid::Uuid::new_v4().to_string(),
frame.timestamp.to_rfc3339(),
self.payload_type_str(&frame.payload),
self.serialize_payload(&frame.payload)?,
],
)?;

Ok(())
}

async fn get_pending_events(&self) -> Result<Vec<ContextEvent>, CoreError> {
let conn = self.pool.get()?;

let mut stmt = conn.prepare(
"SELECT event_id, session_id, event_type, timestamp, application, window_title, metadata
FROM events WHERE sent = 0 ORDER BY timestamp ASC LIMIT 100"
)?;

let events = stmt.query_map([], |row| {
Ok(ContextEvent {
event_id: row.get(0)?,
session_id: row.get(1)?,
event_type: serde_json::from_str(&row.get::<_, String>(2)?).unwrap(),
timestamp: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?).unwrap().with_timezone(&Utc),
application: row.get(4)?,
window_title: row.get(5)?,
metadata: serde_json::from_str(&row.get::<_, String>(6)?).unwrap(),
})
})?.collect::<Result<Vec<_>, _>>()?;

Ok(events)
}

async fn mark_as_sent(&self, event_ids: &[String]) -> Result<(), CoreError> {
if event_ids.is_empty() {
return Ok(());
}

let conn = self.pool.get()?;
let placeholders = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!("UPDATE events SET sent = 1 WHERE event_id IN ({})", placeholders);

conn.execute(&sql, rusqlite::params_from_iter(event_ids))?;
Ok(())
}

async fn cleanup_old_data(&self, before: DateTime<Utc>) -> Result<usize, CoreError> {
let conn = self.pool.get()?;
let before_str = before.to_rfc3339();

let events_deleted = conn.execute(
"DELETE FROM events WHERE timestamp < ?1 AND sent = 1",
params![before_str],
)?;

let frames_deleted = conn.execute(
"DELETE FROM frames WHERE timestamp < ?1 AND sent = 1",
params![before_str],
)?;

Ok(events_deleted + frames_deleted)
}
}

Migration (migration.rs)

스키마 버전 관리:

pub struct Migration;

impl Migration {
pub fn run(pool: &Pool<SqliteConnectionManager>) -> Result<(), CoreError> {
let conn = pool.get()?;

// 버전 테이블 생성
conn.execute_batch("
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
);
INSERT OR IGNORE INTO schema_version (version) VALUES (0);
")?;

let current_version: i32 = conn.query_row(
"SELECT version FROM schema_version",
[],
|row| row.get(0),
)?;

// 마이그레이션 실행
if current_version < 1 {
Self::migrate_v1(&conn)?;
}

Ok(())
}

fn migrate_v1(conn: &Connection) -> Result<(), CoreError> {
conn.execute_batch("
-- 이벤트 테이블
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp TEXT NOT NULL,
application TEXT,
window_title TEXT,
metadata TEXT,
sent INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);

-- 프레임 테이블
CREATE TABLE IF NOT EXISTS frames (
frame_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
payload_type TEXT NOT NULL,
payload_data BLOB,
sent INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);

-- 인덱스
CREATE INDEX IF NOT EXISTS idx_events_sent ON events(sent);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_frames_sent ON frames(sent);
CREATE INDEX IF NOT EXISTS idx_frames_timestamp ON frames(timestamp);

-- 버전 업데이트
UPDATE schema_version SET version = 1;
")?;

Ok(())
}
}

스키마 구조

events 테이블

컬럼타입설명
event_idTEXT (PK)이벤트 고유 ID
session_idTEXT세션 ID
event_typeTEXT이벤트 타입 (JSON)
timestampTEXTISO 8601 타임스탬프
applicationTEXT애플리케이션 이름
window_titleTEXT윈도우 제목
metadataTEXT추가 메타데이터 (JSON)
sentINTEGER전송 여부 (0/1)
created_atTEXT생성 시간

frames 테이블

컬럼타입설명
frame_idTEXT (PK)프레임 고유 ID
timestampTEXTISO 8601 타임스탬프
payload_typeTEXTfull/delta/thumbnail/metadata
payload_dataBLOB압축된 이미지 데이터
sentINTEGER전송 여부 (0/1)
created_atTEXT생성 시간

Edge Intelligence 테이블 (V6-V7)

work_sessions

컬럼타입설명
idINTEGER (PK)세션 ID
started_atTEXT시작 시간
ended_atTEXT종료 시간
primary_appTEXT주요 앱
categoryTEXT앱 카테고리
stateTEXTactive/completed
interruption_countINTEGER인터럽션 횟수
deep_work_secsINTEGER깊은 작업 시간 (초)
duration_secsINTEGER총 지속 시간 (초)

interruptions

컬럼타입설명
idINTEGER (PK)인터럽션 ID
interrupted_atTEXT중단 시간
from_appTEXT이전 앱
from_categoryTEXT이전 카테고리
to_appTEXT전환 앱
to_categoryTEXT전환 카테고리
snapshot_frame_idINTEGER스크린샷 ID
resumed_atTEXT복귀 시간
resumed_to_appTEXT복귀 앱

focus_metrics

컬럼타입설명
dateTEXT (PK)날짜 (YYYY-MM-DD)
total_active_secsINTEGER총 활동 시간
deep_work_secsINTEGER깊은 작업 시간
communication_secsINTEGER소통 시간
context_switchesINTEGER컨텍스트 전환 횟수
interruption_countINTEGER인터럽션 횟수
focus_scoreREAL집중 점수 (0-100)

local_suggestions

컬럼타입설명
idINTEGER (PK)제안 ID
suggestion_typeTEXT제안 타입
payloadTEXTJSON 페이로드
created_atTEXT생성 시간
shown_atTEXT표시 시간
dismissed_atTEXT무시 시간
acted_atTEXT실행 시간

보존 정책

impl SqliteStorage {
/// 오래된 데이터 정리 (이미 전송된 것만)
pub async fn apply_retention_policy(&self, config: &StorageConfig) -> Result<(), CoreError> {
let cutoff = Utc::now() - Duration::days(config.retention_days as i64);
let deleted = self.cleanup_old_data(cutoff).await?;

tracing::info!("보존 정책 적용: {}개 레코드 삭제", deleted);
Ok(())
}

/// 저장소 크기 확인
pub async fn get_storage_size_mb(&self) -> Result<u64, CoreError> {
let conn = self.pool.get()?;

let page_count: i64 = conn.query_row(
"SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
[],
|row| row.get(0),
)?;

Ok((page_count / 1024 / 1024) as u64)
}
}

기본 설정:

  • retention_days: 30일
  • max_storage_mb: 500MB
  • 미전송 데이터는 보존 정책에서 제외

데이터 흐름


성능 최적화

PRAGMA 설정

설정효과
journal_mode=WALWrite-Ahead Logging동시 읽기/쓰기
synchronous=NORMAL일반 동기화쓰기 성능 향상
cache_size=80008000 페이지메모리 캐시 확대
temp_store=MEMORY메모리임시 테이블 성능
mmap_size=268435456256MBI/O 감소

인덱스 최적화 (V7)

인덱스컬럼용도
idx_events_sent_timestamp(is_sent, timestamp)미전송 이벤트 조회 +25%
idx_work_sessions_state_started(state, started_at)활성 세션 조회
idx_interruptions_resumed(resumed_at) WHERE NULL미복귀 인터럽션

N+1 쿼리 최적화

RETURNING clause를 사용하여 SELECT+UPDATE를 1개 쿼리로 결합:

-- Before (2 queries):
SELECT started_at FROM work_sessions WHERE id = ?1;
UPDATE work_sessions SET ended_at = ?1, duration_secs = ?2 WHERE id = ?3;

-- After (1 query with RETURNING):
UPDATE work_sessions
SET ended_at = ?1,
state = 'completed',
duration_secs = CAST((julianday(?1) - julianday(started_at)) * 86400 AS INTEGER)
WHERE id = ?2
RETURNING duration_secs;

적용된 메서드:

  • end_work_session(): +50% 성능
  • end_idle_period(): +50% 성능

배치 저장 API

/// 여러 이벤트를 한 트랜잭션으로 저장
pub fn save_events_batch(&self, events: &[Event]) -> Result<usize, CoreError>

의존성

크레이트용도
rusqliteSQLite 바인딩
r2d2커넥션 풀
r2d2_sqliteSQLite 커넥션 매니저
uuid프레임 ID 생성

관련 문서: