oneshim-storage
로컬 데이터 저장을 담당하는 크레이트. SQLite, 스키마 마이그레이션, 보존 정책을 관리합니다.
역할
- 로컬 저장소: 오프라인 데이터 저장
- 큐 관리: 미전송 이벤트/프레임 관리
- 스키마 마이그레이션: 버전 관리
- 보존 정책: 오래된 데이터 정리
디렉토리 구조
oneshim-storage/src/
├── lib.rs # 크레이트 루트
├── sqlite.rs # SqliteStorage - 저장소 구현
└── migration.rs # 스키마 마이그레이션
주요 컴포넌트
SqliteStorage (sqlite.rs)
SQLite 기반 로컬 저장소 (StorageService 포트 구현):
pub struct SqliteStorage {
pool: Pool<SqliteConnectionManager>,
}
impl SqliteStorage {
pub fn new(db_path: &str) -> Result<Self, CoreError> {
let manager = SqliteConnectionManager::file(db_path)
.with_init(|conn| {
// WAL 모드 활성화
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
conn.execute_batch("PRAGMA synchronous=NORMAL;")?;
Ok(())
});
let pool = Pool::builder()
.max_size(10)
.build(manager)?;
// 스키마 마이그레이션 실행
Migration::run(&pool)?;
Ok(Self { pool })
}
}
#[async_trait]
impl StorageService for SqliteStorage {
async fn save_event(&self, event: &ContextEvent) -> Result<(), CoreError> {
let conn = self.pool.get()?;
conn.execute(
"INSERT INTO events (event_id, session_id, event_type, timestamp, application, window_title, metadata, sent)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 0)",
params![
event.event_id,
event.session_id,
serde_json::to_string(&event.event_type)?,
event.timestamp.to_rfc3339(),
event.application,
event.window_title,
serde_json::to_string(&event.metadata)?,
],
)?;
Ok(())
}
async fn save_frame(&self, frame: &ProcessedFrame) -> Result<(), CoreError> {
let conn = self.pool.get()?;
conn.execute(
"INSERT INTO frames (frame_id, timestamp, payload_type, payload_data, sent)
VALUES (?1, ?2, ?3, ?4, 0)",
params![
uuid::Uuid::new_v4().to_string(),
frame.timestamp.to_rfc3339(),
self.payload_type_str(&frame.payload),
self.serialize_payload(&frame.payload)?,
],
)?;
Ok(())
}
async fn get_pending_events(&self) -> Result<Vec<ContextEvent>, CoreError> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(
"SELECT event_id, session_id, event_type, timestamp, application, window_title, metadata
FROM events WHERE sent = 0 ORDER BY timestamp ASC LIMIT 100"
)?;
let events = stmt.query_map([], |row| {
Ok(ContextEvent {
event_id: row.get(0)?,
session_id: row.get(1)?,
event_type: serde_json::from_str(&row.get::<_, String>(2)?).unwrap(),
timestamp: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?).unwrap().with_timezone(&Utc),
application: row.get(4)?,
window_title: row.get(5)?,
metadata: serde_json::from_str(&row.get::<_, String>(6)?).unwrap(),
})
})?.collect::<Result<Vec<_>, _>>()?;
Ok(events)
}
async fn mark_as_sent(&self, event_ids: &[String]) -> Result<(), CoreError> {
if event_ids.is_empty() {
return Ok(());
}
let conn = self.pool.get()?;
let placeholders = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!("UPDATE events SET sent = 1 WHERE event_id IN ({})", placeholders);
conn.execute(&sql, rusqlite::params_from_iter(event_ids))?;
Ok(())
}
async fn cleanup_old_data(&self, before: DateTime<Utc>) -> Result<usize, CoreError> {
let conn = self.pool.get()?;
let before_str = before.to_rfc3339();
let events_deleted = conn.execute(
"DELETE FROM events WHERE timestamp < ?1 AND sent = 1",
params![before_str],
)?;
let frames_deleted = conn.execute(
"DELETE FROM frames WHERE timestamp < ?1 AND sent = 1",
params![before_str],
)?;
Ok(events_deleted + frames_deleted)
}
}
Migration (migration.rs)
스키마 버전 관리:
pub struct Migration;
impl Migration {
pub fn run(pool: &Pool<SqliteConnectionManager>) -> Result<(), CoreError> {
let conn = pool.get()?;
// 버전 테이블 생성
conn.execute_batch("
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
);
INSERT OR IGNORE INTO schema_version (version) VALUES (0);
")?;
let current_version: i32 = conn.query_row(
"SELECT version FROM schema_version",
[],
|row| row.get(0),
)?;
// 마이그레이션 실행
if current_version < 1 {
Self::migrate_v1(&conn)?;
}
Ok(())
}
fn migrate_v1(conn: &Connection) -> Result<(), CoreError> {
conn.execute_batch("
-- 이벤트 테이블
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp TEXT NOT NULL,
application TEXT,
window_title TEXT,
metadata TEXT,
sent INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
-- 프레임 테이블
CREATE TABLE IF NOT EXISTS frames (
frame_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
payload_type TEXT NOT NULL,
payload_data BLOB,
sent INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
-- 인덱스
CREATE INDEX IF NOT EXISTS idx_events_sent ON events(sent);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_frames_sent ON frames(sent);
CREATE INDEX IF NOT EXISTS idx_frames_timestamp ON frames(timestamp);
-- 버전 업데이트
UPDATE schema_version SET version = 1;
")?;
Ok(())
}
}
스키마 구조
events 테이블
| 컬럼 | 타입 | 설명 |
|---|---|---|
event_id | TEXT (PK) | 이벤트 고유 ID |
session_id | TEXT | 세션 ID |
event_type | TEXT | 이벤트 타입 (JSON) |
timestamp | TEXT | ISO 8601 타임스탬프 |
application | TEXT | 애플리케이션 이름 |
window_title | TEXT | 윈도우 제목 |
metadata | TEXT | 추가 메타데이터 (JSON) |
sent | INTEGER | 전송 여부 (0/1) |
created_at | TEXT | 생성 시간 |
frames 테이블
| 컬럼 | 타입 | 설명 |
|---|---|---|
frame_id | TEXT (PK) | 프레임 고유 ID |
timestamp | TEXT | ISO 8601 타임스탬프 |
payload_type | TEXT | full/delta/thumbnail/metadata |
payload_data | BLOB | 압축된 이미지 데이터 |
sent | INTEGER | 전송 여부 (0/1) |
created_at | TEXT | 생성 시간 |
Edge Intelligence 테이블 (V6-V7)
work_sessions
| 컬럼 | 타입 | 설명 |
|---|---|---|
id | INTEGER (PK) | 세션 ID |
started_at | TEXT | 시작 시간 |
ended_at | TEXT | 종료 시간 |
primary_app | TEXT | 주요 앱 |
category | TEXT | 앱 카테고리 |
state | TEXT | active/completed |
interruption_count | INTEGER | 인터럽션 횟수 |
deep_work_secs | INTEGER | 깊은 작업 시간 (초) |
duration_secs | INTEGER | 총 지속 시간 (초) |
interruptions
| 컬럼 | 타입 | 설명 |
|---|---|---|
id | INTEGER (PK) | 인터럽션 ID |
interrupted_at | TEXT | 중단 시간 |
from_app | TEXT | 이전 앱 |
from_category | TEXT | 이전 카테고리 |
to_app | TEXT | 전환 앱 |
to_category | TEXT | 전환 카테고리 |
snapshot_frame_id | INTEGER | 스크린샷 ID |
resumed_at | TEXT | 복귀 시간 |
resumed_to_app | TEXT | 복귀 앱 |
focus_metrics
| 컬럼 | 타입 | 설명 |
|---|---|---|
date | TEXT (PK) | 날짜 (YYYY-MM-DD) |
total_active_secs | INTEGER | 총 활동 시간 |
deep_work_secs | INTEGER | 깊은 작업 시간 |
communication_secs | INTEGER | 소통 시간 |
context_switches | INTEGER | 컨텍스트 전환 횟수 |
interruption_count | INTEGER | 인터럽션 횟수 |
focus_score | REAL | 집중 점수 (0-100) |
local_suggestions
| 컬럼 | 타입 | 설명 |
|---|---|---|
id | INTEGER (PK) | 제안 ID |
suggestion_type | TEXT | 제안 타입 |
payload | TEXT | JSON 페이로드 |
created_at | TEXT | 생성 시간 |
shown_at | TEXT | 표시 시간 |
dismissed_at | TEXT | 무시 시간 |
acted_at | TEXT | 실행 시간 |
보존 정책
impl SqliteStorage {
/// 오래된 데이터 정리 (이미 전송된 것만)
pub async fn apply_retention_policy(&self, config: &StorageConfig) -> Result<(), CoreError> {
let cutoff = Utc::now() - Duration::days(config.retention_days as i64);
let deleted = self.cleanup_old_data(cutoff).await?;
tracing::info!("보존 정책 적용: {}개 레코드 삭제", deleted);
Ok(())
}
/// 저장소 크기 확인
pub async fn get_storage_size_mb(&self) -> Result<u64, CoreError> {
let conn = self.pool.get()?;
let page_count: i64 = conn.query_row(
"SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
[],
|row| row.get(0),
)?;
Ok((page_count / 1024 / 1024) as u64)
}
}
기본 설정:
retention_days: 30일max_storage_mb: 500MB- 미전송 데이터는 보존 정책에서 제외
데이터 흐름
성능 최적화
PRAGMA 설정
| 설정 | 값 | 효과 |
|---|---|---|
journal_mode=WAL | Write-Ahead Logging | 동시 읽기/쓰기 |
synchronous=NORMAL | 일반 동기화 | 쓰기 성능 향상 |
cache_size=8000 | 8000 페이지 | 메모리 캐시 확대 |
temp_store=MEMORY | 메모리 | 임시 테이블 성능 |
mmap_size=268435456 | 256MB | I/O 감소 |
인덱스 최적화 (V7)
| 인덱스 | 컬럼 | 용도 |
|---|---|---|
idx_events_sent_timestamp | (is_sent, timestamp) | 미전송 이벤트 조회 +25% |
idx_work_sessions_state_started | (state, started_at) | 활성 세션 조회 |
idx_interruptions_resumed | (resumed_at) WHERE NULL | 미복귀 인터럽션 |
N+1 쿼리 최적화
RETURNING clause를 사용하여 SELECT+UPDATE를 1개 쿼리로 결합:
-- Before (2 queries):
SELECT started_at FROM work_sessions WHERE id = ?1;
UPDATE work_sessions SET ended_at = ?1, duration_secs = ?2 WHERE id = ?3;
-- After (1 query with RETURNING):
UPDATE work_sessions
SET ended_at = ?1,
state = 'completed',
duration_secs = CAST((julianday(?1) - julianday(started_at)) * 86400 AS INTEGER)
WHERE id = ?2
RETURNING duration_secs;
적용된 메서드:
end_work_session(): +50% 성능end_idle_period(): +50% 성능
배치 저장 API
/// 여러 이벤트를 한 트랜잭션으로 저장
pub fn save_events_batch(&self, events: &[Event]) -> Result<usize, CoreError>
의존성
| 크레이트 | 용도 |
|---|---|
rusqlite | SQLite 바인딩 |
r2d2 | 커넥션 풀 |
r2d2_sqlite | SQLite 커넥션 매니저 |
uuid | 프레임 ID 생성 |
관련 문서:
- 클라이언트 개요
- oneshim-core - StorageService 포트
- oneshim-network - 배치 업로드