oneshim-network
네트워크 통신을 담당하는 어댑터 크레이트. gRPC, HTTP/SSE/WebSocket, 압축, 인증, 배치 업로드를 처리합니다.
gRPC 우선 통신
v0.3.0부터 gRPC를 기본 통신 프로토콜로 사용하며, HTTP/2 차단 환경에서는 REST API로 자동 fallback합니다.
역할
- gRPC 클라이언트: 인증/세션/컨텍스트 서비스 (tonic + prost)
- JWT 인증: 로그인/토큰 갱신/로그아웃
- REST API: 컨텍스트/배치 업로드, 피드백 전송 (fallback)
- SSE 스트림: 서버 제안 실시간 수신 (REST fallback)
- WebSocket: 양방향 실시간 통신
- 압축: gzip/zstd/lz4 자동 선택
- 배치 업로드: Lock-free 큐 + 동적 배치 크기
디렉토리 구조
oneshim-network/src/
├── lib.rs # 크레이트 루트
├── auth.rs # TokenManager - JWT 인증
├── http_client.rs # HttpApiClient - REST API
├── sse_client.rs # SseStreamClient - SSE 스트림
├── ws_client.rs # WebSocketClient - 양방향 통신
├── compression.rs # AdaptiveCompressor - 압축
├── batch_uploader.rs # BatchUploader - Lock-free 배치 업로드
├── ai_llm_client.rs # RemoteLlmProvider — AI LLM 의도 해석
├── ai_ocr_client.rs # RemoteOcrProvider — AI OCR 요소 추출
└── grpc/ # gRPC 클라이언트 (feature = "grpc")
├── mod.rs # 모듈 export + GrpcConfig
├── auth_client.rs # GrpcAuthClient - 인증 RPC
├── session_client.rs # GrpcSessionClient - 세션 RPC
├── context_client.rs # GrpcContextClient - 컨텍스트 RPC
└── unified_client.rs # UnifiedClient - gRPC + REST 통합
주요 컴포넌트
TokenManager (auth.rs)
JWT 토큰 관리 및 자동 갱신:
pub struct TokenManager {
base_url: String,
credentials: Credentials,
token_state: RwLock<TokenState>,
http_client: reqwest::Client,
}
struct TokenState {
access_token: Option<String>,
refresh_token: Option<String>,
expires_at: Option<DateTime<Utc>>,
}
impl TokenManager {
// 로그인
pub async fn login(&self) -> Result<(), CoreError>;
// 토큰 갱신 (만료 5분 전 자동)
pub async fn refresh_if_needed(&self) -> Result<(), CoreError>;
// 현재 토큰 반환
pub async fn get_token(&self) -> Result<String, CoreError>;
// 로그아웃
pub async fn logout(&self) -> Result<(), CoreError>;
}
토큰 갱신 전략:
- 만료 5분 전 자동 갱신
- 갱신 실패 시 재로그인 시도
RwLock으로 동시 접근 관리
HttpApiClient (http_client.rs)
REST API 클라이언트 (ApiClient 포트 구현):
pub struct HttpApiClient {
base_url: String,
token_manager: Arc<TokenManager>,
compressor: Arc<AdaptiveCompressor>,
http_client: reqwest::Client,
}
#[async_trait]
impl ApiClient for HttpApiClient {
async fn upload_context(&self, payload: &ContextPayload) -> Result<(), CoreError> {
let compressed = self.compressor.compress(payload)?;
self.http_client
.post(&format!("{}/user_context/", self.base_url))
.header("Authorization", format!("Bearer {}", self.token_manager.get_token().await?))
.header("Content-Encoding", compressed.algorithm.as_str())
.body(compressed.data)
.send()
.await?;
Ok(())
}
async fn upload_batch(&self, batch: &BatchPayload) -> Result<BatchResponse, CoreError>;
async fn send_feedback(&self, suggestion_id: &str, accepted: bool) -> Result<(), CoreError>;
async fn send_heartbeat(&self, session_id: &str) -> Result<(), CoreError>;
}
API 엔드포인트 (REST 표준):
| 메서드 | 경로 | 설명 |
|---|---|---|
| POST | /api/v1/auth/tokens | 토큰 생성 (로그인) |
| DELETE | /api/v1/auth/tokens | 토큰 폐기 (로그아웃) |
| POST | /api/v1/auth/tokens/refresh | 토큰 갱신 |
| GET | /api/v1/auth/tokens/verify | 토큰 검증 |
| POST | /user_context/contexts | 컨텍스트 업로드 |
| POST | /user_context/batches | 배치 업로드 |
| POST | /user_context/suggestions/feedback | 피드백 전송 |
| POST | /user_context/sessions/{id}/heartbeat | 세션 하트비트 |
SseStreamClient (sse_client.rs)
SSE 스트림 클라이언트 (SseClient 포트 구현):
pub struct SseStreamClient {
base_url: String,
token_manager: Arc<TokenManager>,
max_retry_secs: u64,
http_client: reqwest::Client,
}
#[async_trait]
impl SseClient for SseStreamClient {
async fn connect(
&self,
session_id: &str,
tx: mpsc::Sender<SseEvent>,
) -> Result<(), CoreError> {
let url = format!("{}/suggestions/stream/{}", self.base_url, session_id);
loop {
let token = self.token_manager.get_token().await?;
let request = self.http_client.get(&url)
.header("Authorization", format!("Bearer {token}"));
let mut es = EventSource::new(request)?;
while let Some(event) = es.next().await {
match event {
Ok(Event::Message(msg)) => {
let sse_event = self.parse_event(&msg)?;
tx.send(sse_event).await?;
}
Err(e) => {
// 재연결 로직
break;
}
}
}
// Exponential backoff 재연결
self.wait_before_retry(retry_count).await;
}
}
}
재연결 전략:
- Exponential backoff: 1초 → 2초 → 4초 → ... → 최대 30초
- 네트워크 오류 시 자동 재연결
- 토큰 만료 시 갱신 후 재연결
AdaptiveCompressor (compression.rs)
데이터 크기에 따른 자동 압축 알고리즘 선택:
pub struct AdaptiveCompressor {
thresholds: CompressionThresholds,
}
#[derive(Clone)]
pub struct CompressionThresholds {
pub skip_below_bytes: usize, // 1KB 미만: 압축 안 함
pub gzip_below_bytes: usize, // 100KB 미만: gzip
pub zstd_below_bytes: usize, // 1MB 미만: zstd
// 1MB 이상: lz4
}
impl Compressor for AdaptiveCompressor {
fn compress(&self, data: &[u8]) -> Result<CompressedData, CoreError> {
let algorithm = self.select_algorithm(data.len());
let compressed = match algorithm {
Algorithm::None => data.to_vec(),
Algorithm::Gzip => self.compress_gzip(data)?,
Algorithm::Zstd => self.compress_zstd(data)?,
Algorithm::Lz4 => self.compress_lz4(data)?,
};
Ok(CompressedData { data: compressed, algorithm })
}
fn decompress(&self, data: &CompressedData) -> Result<Vec<u8>, CoreError>;
}
알고리즘 선택 기준:
| 데이터 크기 | 알고리즘 | 특징 |
|---|---|---|
| < 1KB | None | 압축 오버헤드 > 이득 |
| 1KB ~ 100KB | gzip | 범용, 호환성 |
| 100KB ~ 1MB | zstd | 높은 압축률 |
| > 1MB | lz4 | 빠른 속도 |
BatchUploader (batch_uploader.rs)
이벤트/프레임 배치 업로드 + 재시도:
pub struct BatchUploader {
api_client: Arc<dyn ApiClient>,
/// Lock-free 큐 — 여러 producer에서 동시 push 가능 (crossbeam SegQueue)
queue: Arc<SegQueue<Event>>,
/// 큐 크기 (lock-free 카운터)
queue_size: AtomicUsize,
session_id: String,
max_batch_size: usize,
max_retries: u32,
/// 동적 배치 크기 활성화
dynamic_batch: bool,
}
배치 전략:
crossbeam::SegQueue기반 lock-free MPSC 큐 — 여러 스레드에서 무경합 enqueueAtomicUsize로 락 없이 큐 크기 추적- 동적 배치 크기: 큐 10개 미만이면 즉시 전송, 50개 초과이면 2배 배치로 빠른 처리
- 실패 시 이벤트 자동 재큐잉, exponential backoff 재시도 (최대 3회)
AI 클라이언트
외부 AI API를 호출하는 두 클라이언트. 모두 oneshim-core의 AiProviderType enum으로 제공자를 분기한다.
// oneshim-core/config.rs
pub enum AiProviderType {
Anthropic, // Claude API
OpenAi, // OpenAI 호환 API
Generic, // 범용 JSON 엔드포인트
}
RemoteLlmProvider (ai_llm_client.rs)
LlmProvider 포트 구현 — 세정된 텍스트를 외부 LLM API로 전송해 UI 자동화 의도를 해석한다.
pub struct RemoteLlmProvider {
http_client: reqwest::Client,
endpoint: String,
api_key: String,
model: String,
provider_type: AiProviderType,
timeout_secs: u64,
}
제공자별 분기:
Anthropic→POST /v1/messages,x-api-key헤더,content[0].text파싱OpenAi/Generic→POST /v1/chat/completions,Bearer토큰,choices[0].message.content파싱
RemoteOcrProvider (ai_ocr_client.rs)
OcrProvider 포트 구현 — 스크린샷(Base64)을 외부 AI Vision API로 전송해 텍스트와 바운딩 박스를 추출한다.
pub struct RemoteOcrProvider {
http_client: reqwest::Client,
endpoint: String,
api_key: String,
model: Option<String>,
provider_type: AiProviderType,
timeout_secs: u64,
}
제공자별 분기:
Anthropic→ Claude Vision 형식 (content[].text줄 단위 파싱)OpenAi/Generic→ 범용 JSON 형식 ({ "results": [...] })
데이터 흐름
gRPC 클라이언트 (Phase 36)
Feature Flag
gRPC 기능은 --features grpc 플래그로 활성화됩니다.
GrpcConfig
pub struct GrpcConfig {
pub enabled: bool, // gRPC 활성화 여부
pub endpoint: String, // gRPC 서버 엔드포인트 (기본: 50052)
pub fallback_ports: Vec<u16>, // 포트 폴백 목록 (기본: [50051, 50053])
pub use_tls: bool, // TLS 사용 여부
pub fallback_to_rest: bool, // 실패 시 REST fallback
}
포트 폴백 전략:
연결 시도 순서:
1. endpoint (50052) → 성공 시 사용
2. fallback_ports[0] (50051) → 1 실패 시 시도
3. fallback_ports[1] (50053) → 2 실패 시 시도
4. REST fallback (8000) → 모든 gRPC 실패 시
서비스별 클라이언트
| 클라이언트 | RPC | 설명 |
|---|---|---|
| GrpcAuthClient | Login, Logout, RefreshToken, ValidateToken | 인증 서비스 |
| GrpcSessionClient | CreateSession, EndSession, Heartbeat | 세션 관리 |
| GrpcContextClient | UploadBatch, SubscribeSuggestions, SendFeedback, ListSuggestions | 컨텍스트/제안 |
UnifiedClient (통합 클라이언트)
gRPC와 REST를 자동으로 전환하는 통합 클라이언트:
pub struct UnifiedClient {
grpc_config: GrpcConfig,
grpc_auth: Option<GrpcAuthClient>,
grpc_session: Option<GrpcSessionClient>,
grpc_context: Option<GrpcContextClient>,
rest_client: HttpApiClient,
}
impl UnifiedClient {
// gRPC 우선, 실패 시 REST fallback
pub async fn upload_batch(&self, batch: &BatchPayload) -> Result<BatchResponse, CoreError> {
if self.grpc_config.enabled {
match self.grpc_context.as_ref().unwrap().upload_batch(batch).await {
Ok(response) => return Ok(response),
Err(e) if self.grpc_config.fallback_to_rest => {
warn!("gRPC 실패, REST fallback: {}", e);
}
Err(e) => return Err(e),
}
}
self.rest_client.upload_batch(batch).await
}
// Server Streaming RPC (SSE 대체)
pub async fn subscribe_suggestions(&self, session_id: &str) -> Result<SuggestionStream, CoreError>;
}
Server Streaming RPC
SubscribeSuggestions는 Server Streaming RPC로 실시간 제안을 수신합니다:
// gRPC Server Streaming
let mut stream = grpc_context.subscribe_suggestions(session_id).await?;
while let Some(suggestion) = stream.message().await? {
tx.send(suggestion).await?;
}
산업 현장 지원:
NO_EMOJI=1환경변수로 ASCII 출력 모드 지원- HTTP/2 차단 환경에서 자동 REST fallback
의존성
| 크레이트 | 용도 |
|---|---|
tonic | gRPC 클라이언트 (feature = "grpc") |
prost | Protocol Buffers (feature = "grpc") |
reqwest | HTTP 클라이언트 |
reqwest-eventsource | SSE 클라이언트 |
tokio-tungstenite | WebSocket |
flate2 | gzip 압축 |
zstd | zstd 압축 |
lz4_flex | lz4 압축 |
serde_json | JSON 직렬화 |
crossbeam | Lock-free 큐 |
관련 문서:
- 클라이언트 개요
- oneshim-core - 포트 인터페이스
- oneshim-suggestion - SSE 이벤트 처리