본문으로 건너뛰기

oneshim-network

네트워크 통신을 담당하는 어댑터 크레이트. gRPC, HTTP/SSE/WebSocket, 압축, 인증, 배치 업로드를 처리합니다.

gRPC 우선 통신

v0.3.0부터 gRPC를 기본 통신 프로토콜로 사용하며, HTTP/2 차단 환경에서는 REST API로 자동 fallback합니다.


역할

  • gRPC 클라이언트: 인증/세션/컨텍스트 서비스 (tonic + prost)
  • JWT 인증: 로그인/토큰 갱신/로그아웃
  • REST API: 컨텍스트/배치 업로드, 피드백 전송 (fallback)
  • SSE 스트림: 서버 제안 실시간 수신 (REST fallback)
  • WebSocket: 양방향 실시간 통신
  • 압축: gzip/zstd/lz4 자동 선택
  • 배치 업로드: Lock-free 큐 + 동적 배치 크기

디렉토리 구조

oneshim-network/src/
├── lib.rs # 크레이트 루트
├── auth.rs # TokenManager - JWT 인증
├── http_client.rs # HttpApiClient - REST API
├── sse_client.rs # SseStreamClient - SSE 스트림
├── ws_client.rs # WebSocketClient - 양방향 통신
├── compression.rs # AdaptiveCompressor - 압축
├── batch_uploader.rs # BatchUploader - Lock-free 배치 업로드
├── ai_llm_client.rs # RemoteLlmProvider — AI LLM 의도 해석
├── ai_ocr_client.rs # RemoteOcrProvider — AI OCR 요소 추출
└── grpc/ # gRPC 클라이언트 (feature = "grpc")
├── mod.rs # 모듈 export + GrpcConfig
├── auth_client.rs # GrpcAuthClient - 인증 RPC
├── session_client.rs # GrpcSessionClient - 세션 RPC
├── context_client.rs # GrpcContextClient - 컨텍스트 RPC
└── unified_client.rs # UnifiedClient - gRPC + REST 통합

주요 컴포넌트

TokenManager (auth.rs)

JWT 토큰 관리 및 자동 갱신:

pub struct TokenManager {
base_url: String,
credentials: Credentials,
token_state: RwLock<TokenState>,
http_client: reqwest::Client,
}

struct TokenState {
access_token: Option<String>,
refresh_token: Option<String>,
expires_at: Option<DateTime<Utc>>,
}

impl TokenManager {
// 로그인
pub async fn login(&self) -> Result<(), CoreError>;

// 토큰 갱신 (만료 5분 전 자동)
pub async fn refresh_if_needed(&self) -> Result<(), CoreError>;

// 현재 토큰 반환
pub async fn get_token(&self) -> Result<String, CoreError>;

// 로그아웃
pub async fn logout(&self) -> Result<(), CoreError>;
}

토큰 갱신 전략:

  • 만료 5분 전 자동 갱신
  • 갱신 실패 시 재로그인 시도
  • RwLock으로 동시 접근 관리

HttpApiClient (http_client.rs)

REST API 클라이언트 (ApiClient 포트 구현):

pub struct HttpApiClient {
base_url: String,
token_manager: Arc<TokenManager>,
compressor: Arc<AdaptiveCompressor>,
http_client: reqwest::Client,
}

#[async_trait]
impl ApiClient for HttpApiClient {
async fn upload_context(&self, payload: &ContextPayload) -> Result<(), CoreError> {
let compressed = self.compressor.compress(payload)?;

self.http_client
.post(&format!("{}/user_context/", self.base_url))
.header("Authorization", format!("Bearer {}", self.token_manager.get_token().await?))
.header("Content-Encoding", compressed.algorithm.as_str())
.body(compressed.data)
.send()
.await?;

Ok(())
}

async fn upload_batch(&self, batch: &BatchPayload) -> Result<BatchResponse, CoreError>;
async fn send_feedback(&self, suggestion_id: &str, accepted: bool) -> Result<(), CoreError>;
async fn send_heartbeat(&self, session_id: &str) -> Result<(), CoreError>;
}

API 엔드포인트 (REST 표준):

메서드경로설명
POST/api/v1/auth/tokens토큰 생성 (로그인)
DELETE/api/v1/auth/tokens토큰 폐기 (로그아웃)
POST/api/v1/auth/tokens/refresh토큰 갱신
GET/api/v1/auth/tokens/verify토큰 검증
POST/user_context/contexts컨텍스트 업로드
POST/user_context/batches배치 업로드
POST/user_context/suggestions/feedback피드백 전송
POST/user_context/sessions/{id}/heartbeat세션 하트비트

SseStreamClient (sse_client.rs)

SSE 스트림 클라이언트 (SseClient 포트 구현):

pub struct SseStreamClient {
base_url: String,
token_manager: Arc<TokenManager>,
max_retry_secs: u64,
http_client: reqwest::Client,
}

#[async_trait]
impl SseClient for SseStreamClient {
async fn connect(
&self,
session_id: &str,
tx: mpsc::Sender<SseEvent>,
) -> Result<(), CoreError> {
let url = format!("{}/suggestions/stream/{}", self.base_url, session_id);

loop {
let token = self.token_manager.get_token().await?;
let request = self.http_client.get(&url)
.header("Authorization", format!("Bearer {token}"));

let mut es = EventSource::new(request)?;

while let Some(event) = es.next().await {
match event {
Ok(Event::Message(msg)) => {
let sse_event = self.parse_event(&msg)?;
tx.send(sse_event).await?;
}
Err(e) => {
// 재연결 로직
break;
}
}
}

// Exponential backoff 재연결
self.wait_before_retry(retry_count).await;
}
}
}

재연결 전략:

  • Exponential backoff: 1초 → 2초 → 4초 → ... → 최대 30초
  • 네트워크 오류 시 자동 재연결
  • 토큰 만료 시 갱신 후 재연결

AdaptiveCompressor (compression.rs)

데이터 크기에 따른 자동 압축 알고리즘 선택:

pub struct AdaptiveCompressor {
thresholds: CompressionThresholds,
}

#[derive(Clone)]
pub struct CompressionThresholds {
pub skip_below_bytes: usize, // 1KB 미만: 압축 안 함
pub gzip_below_bytes: usize, // 100KB 미만: gzip
pub zstd_below_bytes: usize, // 1MB 미만: zstd
// 1MB 이상: lz4
}

impl Compressor for AdaptiveCompressor {
fn compress(&self, data: &[u8]) -> Result<CompressedData, CoreError> {
let algorithm = self.select_algorithm(data.len());
let compressed = match algorithm {
Algorithm::None => data.to_vec(),
Algorithm::Gzip => self.compress_gzip(data)?,
Algorithm::Zstd => self.compress_zstd(data)?,
Algorithm::Lz4 => self.compress_lz4(data)?,
};
Ok(CompressedData { data: compressed, algorithm })
}

fn decompress(&self, data: &CompressedData) -> Result<Vec<u8>, CoreError>;
}

알고리즘 선택 기준:

데이터 크기알고리즘특징
< 1KBNone압축 오버헤드 > 이득
1KB ~ 100KBgzip범용, 호환성
100KB ~ 1MBzstd높은 압축률
> 1MBlz4빠른 속도

BatchUploader (batch_uploader.rs)

이벤트/프레임 배치 업로드 + 재시도:

pub struct BatchUploader {
api_client: Arc<dyn ApiClient>,
/// Lock-free 큐 — 여러 producer에서 동시 push 가능 (crossbeam SegQueue)
queue: Arc<SegQueue<Event>>,
/// 큐 크기 (lock-free 카운터)
queue_size: AtomicUsize,
session_id: String,
max_batch_size: usize,
max_retries: u32,
/// 동적 배치 크기 활성화
dynamic_batch: bool,
}

배치 전략:

  • crossbeam::SegQueue 기반 lock-free MPSC 큐 — 여러 스레드에서 무경합 enqueue
  • AtomicUsize로 락 없이 큐 크기 추적
  • 동적 배치 크기: 큐 10개 미만이면 즉시 전송, 50개 초과이면 2배 배치로 빠른 처리
  • 실패 시 이벤트 자동 재큐잉, exponential backoff 재시도 (최대 3회)

AI 클라이언트

외부 AI API를 호출하는 두 클라이언트. 모두 oneshim-coreAiProviderType enum으로 제공자를 분기한다.

// oneshim-core/config.rs
pub enum AiProviderType {
Anthropic, // Claude API
OpenAi, // OpenAI 호환 API
Generic, // 범용 JSON 엔드포인트
}

RemoteLlmProvider (ai_llm_client.rs)

LlmProvider 포트 구현 — 세정된 텍스트를 외부 LLM API로 전송해 UI 자동화 의도를 해석한다.

pub struct RemoteLlmProvider {
http_client: reqwest::Client,
endpoint: String,
api_key: String,
model: String,
provider_type: AiProviderType,
timeout_secs: u64,
}

제공자별 분기:

  • AnthropicPOST /v1/messages, x-api-key 헤더, content[0].text 파싱
  • OpenAi / GenericPOST /v1/chat/completions, Bearer 토큰, choices[0].message.content 파싱

RemoteOcrProvider (ai_ocr_client.rs)

OcrProvider 포트 구현 — 스크린샷(Base64)을 외부 AI Vision API로 전송해 텍스트와 바운딩 박스를 추출한다.

pub struct RemoteOcrProvider {
http_client: reqwest::Client,
endpoint: String,
api_key: String,
model: Option<String>,
provider_type: AiProviderType,
timeout_secs: u64,
}

제공자별 분기:

  • Anthropic → Claude Vision 형식 (content[].text 줄 단위 파싱)
  • OpenAi / Generic → 범용 JSON 형식 ({ "results": [...] })

데이터 흐름


gRPC 클라이언트 (Phase 36)

Feature Flag

gRPC 기능은 --features grpc 플래그로 활성화됩니다.

GrpcConfig

pub struct GrpcConfig {
pub enabled: bool, // gRPC 활성화 여부
pub endpoint: String, // gRPC 서버 엔드포인트 (기본: 50052)
pub fallback_ports: Vec<u16>, // 포트 폴백 목록 (기본: [50051, 50053])
pub use_tls: bool, // TLS 사용 여부
pub fallback_to_rest: bool, // 실패 시 REST fallback
}

포트 폴백 전략:

연결 시도 순서:
1. endpoint (50052) → 성공 시 사용
2. fallback_ports[0] (50051) → 1 실패 시 시도
3. fallback_ports[1] (50053) → 2 실패 시 시도
4. REST fallback (8000) → 모든 gRPC 실패 시

서비스별 클라이언트

클라이언트RPC설명
GrpcAuthClientLogin, Logout, RefreshToken, ValidateToken인증 서비스
GrpcSessionClientCreateSession, EndSession, Heartbeat세션 관리
GrpcContextClientUploadBatch, SubscribeSuggestions, SendFeedback, ListSuggestions컨텍스트/제안

UnifiedClient (통합 클라이언트)

gRPC와 REST를 자동으로 전환하는 통합 클라이언트:

pub struct UnifiedClient {
grpc_config: GrpcConfig,
grpc_auth: Option<GrpcAuthClient>,
grpc_session: Option<GrpcSessionClient>,
grpc_context: Option<GrpcContextClient>,
rest_client: HttpApiClient,
}

impl UnifiedClient {
// gRPC 우선, 실패 시 REST fallback
pub async fn upload_batch(&self, batch: &BatchPayload) -> Result<BatchResponse, CoreError> {
if self.grpc_config.enabled {
match self.grpc_context.as_ref().unwrap().upload_batch(batch).await {
Ok(response) => return Ok(response),
Err(e) if self.grpc_config.fallback_to_rest => {
warn!("gRPC 실패, REST fallback: {}", e);
}
Err(e) => return Err(e),
}
}
self.rest_client.upload_batch(batch).await
}

// Server Streaming RPC (SSE 대체)
pub async fn subscribe_suggestions(&self, session_id: &str) -> Result<SuggestionStream, CoreError>;
}

Server Streaming RPC

SubscribeSuggestions는 Server Streaming RPC로 실시간 제안을 수신합니다:

// gRPC Server Streaming
let mut stream = grpc_context.subscribe_suggestions(session_id).await?;
while let Some(suggestion) = stream.message().await? {
tx.send(suggestion).await?;
}

산업 현장 지원:

  • NO_EMOJI=1 환경변수로 ASCII 출력 모드 지원
  • HTTP/2 차단 환경에서 자동 REST fallback

의존성

크레이트용도
tonicgRPC 클라이언트 (feature = "grpc")
prostProtocol Buffers (feature = "grpc")
reqwestHTTP 클라이언트
reqwest-eventsourceSSE 클라이언트
tokio-tungsteniteWebSocket
flate2gzip 압축
zstdzstd 압축
lz4_flexlz4 압축
serde_jsonJSON 직렬화
crossbeamLock-free 큐

관련 문서: