diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..f71929f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,3 @@ +[*.*] +indent_style = space +indent_size = 2 diff --git a/README.md b/README.md index 2fadc9d..32582db 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ The best way to use @opik/ccsync is with **Claude Code hooks** for automatic syn ```bash npx @opik/ccsync config ``` - + This will guide you through an interactive setup process. Alternatively, you can set environment variables: ```bash export OPIK_API_KEY="your-api-key" @@ -78,6 +78,7 @@ Configure Opik connection using environment variables or a config file: export OPIK_API_KEY="your-api-key" export OPIK_BASE_URL="http://localhost:5173" # Optional export OPIK_PROJECT_NAME="your-project" # Optional +export OPIK_PROVIDER="anthropic" # Optional ``` ### Configuration File @@ -86,6 +87,7 @@ Create `~/.opik.config`: api_key = your-api-key url_override = http://localhost:5173 workspace = your-workspace +provider = anthropic ``` ## Commands @@ -286,7 +288,7 @@ Add hooks to your Claude Code settings file (`~/.claude/settings.json` or `.clau ], "Stop": [ { - "matcher": "*", + "matcher": "*", "hooks": [ { "type": "command", @@ -310,7 +312,7 @@ Add hooks to your Claude Code settings file (`~/.claude/settings.json` or `.clau "matcher": "*", "hooks": [ { - "type": "command", + "type": "command", "command": "jq -r '.session_id' | xargs -I {} npx @opik/ccsync sync --session {} --force" } ] diff --git a/src/api/opik-client.ts b/src/api/opik-client.ts index e23bfdf..6f7d643 100644 --- a/src/api/opik-client.ts +++ b/src/api/opik-client.ts @@ -1,5 +1,5 @@ import axios, { AxiosInstance, AxiosError } from 'axios'; -import { OpikConfig, OpikTrace } from '../types'; +import { OpikConfig, OpikSpan, OpikTrace } from '../types'; import { createLogger } from '../utils/logger'; export interface OpikCreateTracesRequest { @@ -14,6 +14,17 @@ export interface OpikCreateTracesResponse { [key: string]: any; // Allow additional properties } +export interface OpikCreateSpansRequest { + spans: OpikSpan[]; +} + +export interface OpikCreateSpansResponse { + spans?: Array<{ + id: string; + project_id?: string; + }>; +} + export class OpikApiClient { private client: AxiosInstance; private config: OpikConfig; @@ -22,7 +33,7 @@ export class OpikApiClient { constructor(config: OpikConfig) { this.config = config; this.logger = createLogger({ verbose: false }); - + // Normalize the base URL - remove trailing /api/ if present since we'll add the full path let baseURL = config.base_url; if (baseURL.endsWith('/api/')) { @@ -30,7 +41,7 @@ export class OpikApiClient { } else if (baseURL.endsWith('/api')) { baseURL = baseURL.slice(0, -4); // Remove '/api' } - + this.client = axios.create({ baseURL: baseURL, headers: { @@ -41,7 +52,7 @@ export class OpikApiClient { }, timeout: 30000 // 30 second timeout }); - + this.logger.debug(`Opik API client configured with base URL: ${baseURL}`); } @@ -50,12 +61,12 @@ export class OpikApiClient { if (traces.length === 0) { return { traces: [] }; } - + const request: OpikCreateTracesRequest = { traces }; - + try { this.logger.debug(`Sending ${traces.length} traces to Opik at ${this.config.base_url}`); - + const response = await this.client.post( '/api/v1/private/traces/batch', request @@ -68,12 +79,12 @@ export class OpikApiClient { } catch (error) { if (error instanceof AxiosError) { // If workspace doesn't exist, try with "default" - if (error.response?.status === 403 && - error.response?.data?.message?.includes('Workspace') && - this.config.workspace !== 'default') { - + if (error.response?.status === 403 && + error.response?.data?.message?.includes('Workspace') && + this.config.workspace !== 'default') { + this.logger.warning(`Workspace '${this.config.workspace}' not found, trying 'default'`); - + // Create a new client with default workspace const defaultClient = axios.create({ ...this.client.defaults, @@ -82,7 +93,7 @@ export class OpikApiClient { 'Comet-Workspace': 'default' } }); - + try { const response = await defaultClient.post( '/api/v1/private/traces/batch', @@ -96,14 +107,14 @@ export class OpikApiClient { this.logger.error(`Fallback to default workspace also failed`); } } - + this.logger.debug(`Request details:`, { url: error.config?.url, method: error.config?.method, baseURL: error.config?.baseURL, headers: error.config?.headers }); - + const status = error.response?.status || 'no response'; const statusText = error.response?.statusText || 'unknown error'; const errorMsg = `Failed to create traces: ${status} ${statusText}`; @@ -114,6 +125,85 @@ export class OpikApiClient { } } + async createSpans(spans: OpikSpan[]): Promise { + // Don't call API if there are no spans to create + if (spans.length === 0) { + return { spans: [] }; + } + + const limit = 1000; // Opik API prohibits more than 1000 spans at a time + const chunks = [...Array(Math.ceil(spans.length / limit))].map(_ => spans.splice(0, limit)); + const response: OpikCreateSpansResponse = { spans: [] }; + + for (const chunkSpans of chunks) { + const request: OpikCreateSpansRequest = { spans: chunkSpans }; + + try { + this.logger.debug(`Sending ${chunkSpans.length} spans to Opik at ${this.config.base_url}`); + + const chunkResponse = await this.client.post( + '/api/v1/private/spans/batch', + request + ); + + this.logger.debug(`Status Code: ${chunkResponse.status}`); + this.logger.debug(`API Response:`, chunkResponse.data); + this.logger.debug(`Successfully created ${chunkResponse.data?.spans?.length || 'unknown number of'} spans`); + + response.spans?.push(...(chunkResponse.data.spans ?? [])); + } catch (error) { + if (error instanceof AxiosError) { + // If workspace doesn't exist, try with "default" + if (error.response?.status === 403 && + error.response?.data?.message?.includes('Workspace') && + this.config.workspace !== 'default') { + + this.logger.warning(`Workspace '${this.config.workspace}' not found, trying 'default'`); + + // Create a new client with default workspace + const defaultClient = axios.create({ + ...this.client.defaults, + headers: { + ...this.client.defaults.headers, + 'Comet-Workspace': 'default' + } + }); + + try { + const chunkResponse = await defaultClient.post( + '/api/v1/private/spans/batch', + request + ); + this.logger.debug(`Fallback Status Code: ${chunkResponse.status}`); + this.logger.debug(`Fallback API Response:`, chunkResponse.data); + this.logger.debug(`Successfully created ${chunkResponse.data?.spans?.length || 'unknown number of'} spans in default workspace`); + + response.spans?.push(...(chunkResponse.data.spans ?? [])); + } catch (fallbackError) { + this.logger.error(`Fallback to default workspace also failed`); + } + } + + this.logger.debug(`Request details:`, { + url: error.config?.url, + method: error.config?.method, + baseURL: error.config?.baseURL, + headers: error.config?.headers + }); + + const status = error.response?.status || 'no response'; + const statusText = error.response?.statusText || 'unknown error'; + const errorMsg = `Failed to create spans: ${status} ${statusText}`; + const errorDetails = error.response?.data ? JSON.stringify(error.response.data, null, 2) : error.message; + throw new Error(`${errorMsg}\nDetails: ${errorDetails}`); + } + throw new Error(`Unexpected error creating spans: ${error instanceof Error ? error.message : error}`); + } + } + + return response; + } + async testConnection(): Promise { try { // Test with empty traces array to validate connection @@ -144,7 +234,7 @@ export class OpikApiClient { async updateTrace(traceId: string, trace: Partial): Promise { try { this.logger.debug(`Updating trace ${traceId} in Opik`); - + const response = await this.client.patch( `/api/v1/private/traces/${traceId}`, trace @@ -160,7 +250,7 @@ export class OpikApiClient { baseURL: error.config?.baseURL, headers: error.config?.headers }); - + const status = error.response?.status || 'no response'; const statusText = error.response?.statusText || 'unknown error'; const errorMsg = `Failed to update trace ${traceId}: ${status} ${statusText}`; @@ -195,7 +285,7 @@ export class OpikApiClient { }); const searchResponse = await this.client.get(`${searchEndpoint}?${searchParams}`); - + if (!searchResponse.data?.content?.[0]?.thread_model_id) { throw new Error(`Thread ${threadId} not found or no thread_model_id available`); } @@ -205,7 +295,7 @@ export class OpikApiClient { // Now update the thread tags using the thread_model_id const updateEndpoint = `/api/v1/private/traces/threads/${threadModelId}`; const payload = { tags }; - + await this.client.patch(updateEndpoint, payload); } catch (error) { if (error instanceof AxiosError) { @@ -216,7 +306,7 @@ export class OpikApiClient { headers: error.config?.headers, data: error.config?.data })}`); - + const status = error.response?.status || 'no response'; const statusText = error.response?.statusText || 'unknown error'; const errorMsg = `Failed to update thread ${threadId} tags: ${status} ${statusText}`; diff --git a/src/config/index.ts b/src/config/index.ts index 6cf56d2..93af51a 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -10,26 +10,27 @@ interface OpikConfigFile { workspace?: string; project_name?: string; is_local?: boolean; + provider?: string; } function parseOpikConfigFile(content: string): OpikConfigFile { const config: OpikConfigFile = {}; const lines = content.split('\n'); - + for (const line of lines) { const trimmed = line.trim(); - + // Skip empty lines and comments if (!trimmed || trimmed.startsWith('#') || trimmed.startsWith('[')) { continue; } - + // Parse key = value pairs const match = trimmed.match(/^(\w+)\s*=\s*(.+)$/); if (match) { const [, key, value] = match; const cleanValue = value.trim(); - + switch (key) { case 'api_key': config.api_key = cleanValue; @@ -46,22 +47,26 @@ function parseOpikConfigFile(content: string): OpikConfigFile { case 'is_local': config.is_local = cleanValue.toLowerCase() === 'true'; break; + case 'provider': + config.provider = cleanValue; + break; } } } - + return config; } export function getOpikConfig(): OpikConfig { const logger = createLogger({ verbose: false }); - + // Try environment variables first const apiKey = process.env.OPIK_API_KEY; const baseUrl = process.env.OPIK_BASE_URL || 'http://localhost:5173'; const projectName = process.env.OPIK_PROJECT_NAME; const workspace = process.env.OPIK_WORKSPACE; const isLocal = process.env.OPIK_IS_LOCAL?.toLowerCase() === 'true'; + const provider = process.env.OPIK_PROVIDER; if (apiKey || isLocal) { return { @@ -69,7 +74,8 @@ export function getOpikConfig(): OpikConfig { base_url: baseUrl, project_name: projectName || 'Claude Code', workspace: workspace || 'default', - is_local: isLocal + is_local: isLocal, + provider, }; } @@ -80,7 +86,7 @@ export function getOpikConfig(): OpikConfig { const configFile = readFileSync(configPath, 'utf8'); const config = parseOpikConfigFile(configFile); logger.debug(`Parsed config:`, config); - + if (!config.api_key && !config.is_local) { throw new Error('No API key found in config file (required for cloud instance)'); } @@ -90,14 +96,15 @@ export function getOpikConfig(): OpikConfig { base_url: config.url_override || baseUrl, project_name: config.project_name || 'Claude Code', workspace: config.workspace || 'default', - is_local: config.is_local || false + is_local: config.is_local || false, + provider: config.provider, }; - - logger.debug(`Using Opik config:`, { - ...finalConfig, - api_key: finalConfig.api_key ? '***configured***' : 'not set' + + logger.debug(`Using Opik config:`, { + ...finalConfig, + api_key: finalConfig.api_key ? '***configured***' : 'not set' }); - + return finalConfig; } catch (error) { logger.error(`Config error: ${error instanceof Error ? error.message : error}`); @@ -109,4 +116,4 @@ export function getOpikConfig(): OpikConfig { export function getClaudeDataDir(): string { return process.env.CLAUDE_DATA_DIR || join(homedir(), '.claude'); -} \ No newline at end of file +} diff --git a/src/parsers/opik.ts b/src/parsers/opik.ts index 974b4f3..e973737 100644 --- a/src/parsers/opik.ts +++ b/src/parsers/opik.ts @@ -1,4 +1,4 @@ -import { ClaudeMessage, OpikTrace } from '../types'; +import { ClaudeMessage, OpikSpan, OpikTrace } from '../types'; import { randomBytes } from 'crypto'; import { relative, resolve } from 'path'; @@ -6,58 +6,71 @@ function generateUUIDv7(): string { // Generate a UUID version 7 (time-ordered) // Format: TTTTTTTT-TTTT-7RRR-VRRR-RRRRRRRRRRRR // Where T = timestamp, R = random, V = variant bits - + const timestamp = Date.now(); const randomBits = randomBytes(10); - + // Convert timestamp to hex and pad to 12 characters (48 bits) const timestampHex = timestamp.toString(16).padStart(12, '0'); - + // Build the UUID const uuid = [ timestampHex.slice(0, 8), // 32 bits of timestamp - timestampHex.slice(8, 12), // 16 bits of timestamp + timestampHex.slice(8, 12), // 16 bits of timestamp '7' + randomBits[0].toString(16).padStart(3, '0'), // version 7 + 12 random bits ((randomBits[1] & 0x3F) | 0x80).toString(16) + // variant bits + 6 random bits - randomBits[2].toString(16).padStart(2, '0'), // + 8 random bits + randomBits[2].toString(16).padStart(2, '0'), // + 8 random bits randomBits.slice(3, 9).toString('hex') // 48 random bits ].join('-'); - + return uuid; } -export function claudeToOpikTraces(messages: ClaudeMessage[], startFromIndex: number = 0): OpikTrace[] { +type ClaudeToOpikTracesProps = { + messages: ClaudeMessage[]; + startFromIndex?: number; + projectName?: string; + provider?: string; +} + +export function claudeToOpikTraces({ + messages, + startFromIndex = 0, + projectName = "Claude Code", + provider = "anthropic", +}: ClaudeToOpikTracesProps): { + traces: OpikTrace[], + spans: OpikSpan[], +} { // If startFromIndex is provided, only process messages from that index onwards const messagesToProcess = startFromIndex > 0 ? messages.slice(startFromIndex) : messages; const groups = groupMessagesByUserInteraction(messagesToProcess); const traces: OpikTrace[] = []; - + const spans: OpikSpan[] = []; + for (const group of groups) { if (group.length === 0) continue; - + const userMessage = group[0]; const assistantMessages = group.slice(1); - + // Skip if no user message or user message is not text if (userMessage.type !== 'user' || typeof userMessage.message.content !== 'string') { continue; } - - // Use "Claude Code" as the project name - const projectName = "Claude Code"; - + // Extract project name from working directory const workingProjectName = extractProjectName(userMessage.cwd); - + // Build timeline-style output from assistant responses const output = buildTimelineOutput(assistantMessages); - + // Calculate total tokens const totalTokens = calculateTotalTokens(group); - + // Create project-specific tags const projectTags = ['claude-code', `project:${workingProjectName}`]; - + // Create trace const trace: OpikTrace = { id: generateUUIDv7(), @@ -80,11 +93,42 @@ export function claudeToOpikTraces(messages: ClaudeMessage[], startFromIndex: nu thread_id: userMessage.sessionId, tags: projectTags }; - + traces.push(trace); + + let lastSpanTime = userMessage.timestamp;; + + for (const message of assistantMessages) { + const inputTokens = message.message?.usage?.input_tokens ?? 0; + const outputTokens = message.message?.usage?.output_tokens ?? 0; + const output = buildTimelineOutput([message]); + const span: OpikSpan = { + id: generateUUIDv7(), + project_name: projectName, + name: truncateString(output, 100), + trace_id: trace.id, + start_time: lastSpanTime, + end_time: message.timestamp, + output: output, + usage: { + prompt_tokens: inputTokens, + completion_tokens: outputTokens, + total_tokens: inputTokens + outputTokens, + }, + type: (message.toolUseResult) ? 'tool' : 'llm', + provider, + model: message.message?.model, + }; + + spans.push(span); + + if (message.timestamp) { + lastSpanTime = message.timestamp; + } + } } - - return traces; + + return { traces, spans }; } function extractProjectName(cwd: string): string { @@ -97,41 +141,41 @@ function extractProjectName(cwd: string): string { function buildTimelineOutput(assistantMessages: ClaudeMessage[]): string { // First pass: collect all tool calls and their results - const toolCalls: Map = new Map(); - const timeline: Array<{type: 'text' | 'tool', content: string, toolId?: string}> = []; - + const toolCalls: Map = new Map(); + const timeline: Array<{ type: 'text' | 'tool', content: string, toolId?: string }> = []; + // Collect tool calls and results for (const message of assistantMessages) { if (!message.message) continue; - + if (message.type === 'assistant') { if (typeof message.message.content === 'string') { // Text response - timeline.push({type: 'text', content: message.message.content}); + timeline.push({ type: 'text', content: message.message.content }); } else if (Array.isArray(message.message.content)) { // Process array content (can contain both text and tool_use) for (const item of message.message.content) { if (item && item.type === 'text') { - timeline.push({type: 'text', content: item.text}); + timeline.push({ type: 'text', content: item.text }); } else if (item && item.type === 'tool_use') { // Tool call const toolDesc = formatToolDescription(item.name, item.input, message.cwd); const toolId = item.id; - + toolCalls.set(toolId, { call: `⏺ ${toolDesc}`, result: null, cwd: message.cwd }); - - timeline.push({type: 'tool', content: '', toolId}); + + timeline.push({ type: 'tool', content: '', toolId }); } } } } else if (message.type === 'user' && message.toolUseResult) { // Tool result - find the matching tool call ID const toolId = findToolCallId(message); - + if (toolId && toolCalls.has(toolId)) { const result = message.toolUseResult; if (result && typeof result === 'object') { @@ -143,10 +187,10 @@ function buildTimelineOutput(assistantMessages: ClaudeMessage[]): string { } } } - + // Second pass: build final timeline with grouped tool calls and results const finalTimeline: string[] = []; - + for (const item of timeline) { if (item.type === 'text') { finalTimeline.push(item.content); @@ -160,7 +204,7 @@ function buildTimelineOutput(assistantMessages: ClaudeMessage[]): string { } } } - + return finalTimeline.join('\n\n').replace(/\\n/g, '\n').replace(/\\"/g, '"'); } @@ -173,7 +217,7 @@ function findToolCallId(userMessage: ClaudeMessage): string | null { } } } - + return null; } @@ -183,21 +227,21 @@ function makeRelativePath(absolutePath: string, cwd: string): string { if (!absolutePath || typeof absolutePath !== 'string') { return absolutePath || ''; } - + // If the path is already relative or doesn't start with /, return as is if (!absolutePath.startsWith('/')) { return absolutePath; } - + // Convert absolute path to relative from the working directory const relativePath = relative(cwd, absolutePath); - + // If the relative path would go up directories, just use the filename if (relativePath.startsWith('../')) { const parts = absolutePath.split('/'); return parts[parts.length - 1]; } - + return relativePath; } catch { // If path conversion fails, just return the filename @@ -214,7 +258,7 @@ function formatToolDescription(toolName: string, input: any, cwd?: string): stri const getDisplayPath = (path: string) => { return cwd ? makeRelativePath(path, cwd) : path; }; - + switch (toolName) { case 'Read': return `Read(${getDisplayPath(input.file_path)})`; @@ -246,7 +290,7 @@ function formatEnhancedToolResultSummary(result: any, cwd?: string): string { if (result.success === false || result.error || result.stderr) { return ` ⎿ Failed`; } - + // Default to successful for all other cases return ` ⎿ Successful`; } @@ -255,16 +299,16 @@ function generateDiffOutput(oldContent: string, newContent: string, filePath?: s // Simple diff generation - in a real implementation you might want to use a proper diff library const oldLines = oldContent.split('\n'); const newLines = newContent.split('\n'); - + // For now, just show a simple line-by-line diff const diffLines: string[] = []; const maxLines = Math.max(oldLines.length, newLines.length); - + let lineNumber = 1; for (let i = 0; i < maxLines; i++) { const oldLine = oldLines[i]; const newLine = newLines[i]; - + if (oldLine !== undefined && newLine !== undefined) { if (oldLine !== newLine) { // Changed line @@ -281,16 +325,16 @@ function generateDiffOutput(oldContent: string, newContent: string, filePath?: s // Added line diffLines.push(`${lineNumber.toString().padStart(8)} + ${newLine}`); } - + lineNumber++; - + // Limit diff output to prevent overwhelming display if (diffLines.length > 20) { diffLines.push('... (diff truncated)'); break; } } - + return diffLines.length > 0 ? diffLines.join('\n') : ''; } @@ -299,17 +343,20 @@ function formatToolResultSummary(result: any): string { return formatEnhancedToolResultSummary(result).replace(/^ ⎿ /, ''); } -function calculateTotalTokens(messages: ClaudeMessage[]): any { +function calculateTotalTokens(messages: ClaudeMessage[]): { + input: number, + output: number, +} { let inputTokens = 0; let outputTokens = 0; - + for (const message of messages) { if (message.message?.usage) { inputTokens += message.message.usage.input_tokens || 0; outputTokens += message.message.usage.output_tokens || 0; } } - + return { input: inputTokens, output: outputTokens @@ -324,13 +371,13 @@ function truncateString(str: string, maxLength: number): string { export function groupMessagesByUserInteraction(messages: ClaudeMessage[]): ClaudeMessage[][] { const groups: ClaudeMessage[][] = []; let currentGroup: ClaudeMessage[] = []; - + for (const message of messages) { // Skip summary messages if ((message as any).type === 'summary') { continue; } - + // Start a new group on user text messages (not tool results) if (message.type === 'user' && typeof message.message.content === 'string') { // Save previous group if it exists @@ -344,11 +391,11 @@ export function groupMessagesByUserInteraction(messages: ClaudeMessage[]): Claud currentGroup.push(message); } } - + // Add the final group if it exists if (currentGroup.length > 0) { groups.push(currentGroup); } - + return groups; } diff --git a/src/sync.ts b/src/sync.ts index 4d9cbd8..12c4da6 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -1,4 +1,4 @@ -import { OpikConfig, OpikTrace, SyncOptions, ClaudeMessage } from './types'; +import { OpikConfig, OpikTrace, SyncOptions, ClaudeMessage, OpikSpan } from './types'; import { createOpikClient, OpikApiClient } from './api/opik-client'; import { SyncedGroup } from './state/sync-state'; import { createLogger } from './utils/logger'; @@ -14,6 +14,10 @@ export class OpikClient { return await this.apiClient.createTraces(traces); } + async createSpans(spans: OpikSpan[]): Promise { + return await this.apiClient.createSpans(spans); + } + async testConnection(): Promise { return await this.apiClient.testConnection(); } @@ -35,27 +39,27 @@ interface GroupAction { function analyzeGroupChanges(currentGroups: ClaudeMessage[][], syncedGroups: SyncedGroup[]): GroupAction[] { const actions: GroupAction[] = []; - + for (const group of currentGroups) { if (group.length === 0) continue; - + const userMessage = group[0]; const lastMessage = group[group.length - 1]; - + // Find if this group was previously synced const syncedGroup = syncedGroups.find(sg => sg.userMessageUuid === userMessage.uuid); - + if (!syncedGroup) { // New group - CREATE trace - actions.push({action: 'create', group}); - } else if (syncedGroup.lastMessageUuid !== lastMessage.uuid || - syncedGroup.messageCount !== group.length) { + actions.push({ action: 'create', group }); + } else if (syncedGroup.lastMessageUuid !== lastMessage.uuid || + syncedGroup.messageCount !== group.length) { // Group has new content - UPDATE trace - actions.push({action: 'update', group, traceId: syncedGroup.traceId}); + actions.push({ action: 'update', group, traceId: syncedGroup.traceId }); } // If unchanged, no action needed } - + return actions; } @@ -65,63 +69,64 @@ function generateUUIDv7(): string { return uuidv7(); } -async function convertGroupToTrace(group: ClaudeMessage[]): Promise { +async function convertGroupToTrace( + group: ClaudeMessage[], + config: OpikConfig, +): Promise<{ trace: OpikTrace, spans: OpikSpan[] } | null> { // Import the necessary functions const { claudeToOpikTraces } = await import('./parsers/opik'); - + // Create a temporary message array with just this group - const traces = claudeToOpikTraces(group); + const { traces, spans } = claudeToOpikTraces({ + messages: group, + projectName: config.project_name, + provider: config.provider, + }); if (traces.length === 0) return null; - - const trace = traces[0]; - const userMessage = group[0]; - - // Generate proper UUIDv7 trace ID - trace.id = generateUUIDv7(); - - return trace; + + return { trace: traces[0], spans }; } export async function syncSession(sessionId: string, options: SyncOptions = {}): Promise { const { findSessionFile, parseConversation } = await import('./parsers/claude'); - const { claudeToOpikTraces, groupMessagesByUserInteraction } = await import('./parsers/opik'); + const { groupMessagesByUserInteraction } = await import('./parsers/opik'); const { getOpikConfig } = await import('./config'); const { writeFileSync } = await import('fs'); const { join } = await import('path'); const { tmpdir } = await import('os'); const { syncStateManager } = await import('./state/sync-state'); - + const logger = createLogger({ verbose: options.verbose, dryRun: options.dryRun }); - + // Find the session file const sessionFile = findSessionFile(sessionId); if (!sessionFile) { throw new Error(`Session ${sessionId} not found`); } - + logger.debug(`Reading session from: ${sessionFile}`); - + // Parse the conversation and group by user interaction const messages = parseConversation(sessionFile); const currentGroups = groupMessagesByUserInteraction(messages); logger.debug(`Parsed ${messages.length} messages into ${currentGroups.length} conversation groups`); - + let existingSyncedGroups: SyncedGroup[] = []; let syncReason = ''; - + // Check if sync is needed (unless force flag is set) if (!options.force) { try { const syncCheck = await syncStateManager.needsSync(sessionId, sessionFile); if (!syncCheck.needsSync) { logger.dryRun(`Would skip session ${sessionId}`); - + // Return empty result to indicate no sync performed return ''; } - + syncReason = syncCheck.reason || 'Sync needed'; - + // Get existing synced groups const state = await syncStateManager.getSyncState(); const existingSession = state.sessions[sessionId]; @@ -142,17 +147,17 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): } else { syncReason = 'Force sync enabled'; } - + // Analyze what groups need to be created or updated const groupActions = analyzeGroupChanges(currentGroups, existingSyncedGroups); const createCount = groupActions.filter(a => a.action === 'create').length; const updateCount = groupActions.filter(a => a.action === 'update').length; - + if (groupActions.length === 0) { // Skip logging - no conversations need syncing return ''; } - + // If dry run, just show what would be done if (options.dryRun) { logger.dryRun(`Would process ${groupActions.length} conversation${groupActions.length === 1 ? '' : 's'}`); @@ -161,48 +166,53 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): } return ''; } - + // Process group actions const opikConfig = getOpikConfig(); const opikClient = new OpikClient(opikConfig); const updatedSyncedGroups: SyncedGroup[] = [...existingSyncedGroups]; let totalTraces = 0; - + // Write to tmp file for review const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const tmpFile = join(tmpdir(), `opik-traces-${sessionId}-${timestamp}.json`); const allTraces: OpikTrace[] = []; - + // Separate create and update actions const createActions = groupActions.filter(a => a.action === 'create'); const updateActions = groupActions.filter(a => a.action === 'update'); - + try { // Process create actions in batch if (createActions.length > 0) { const tracesToCreate: OpikTrace[] = []; + const spansToCreate: OpikSpan[] = []; const createGroups: { action: GroupAction; trace: OpikTrace }[] = []; - + for (const action of createActions) { - const trace = await convertGroupToTrace(action.group); - if (trace) { + const res = await convertGroupToTrace(action.group, opikConfig); + if (res) { + const { trace, spans } = res; + tracesToCreate.push(trace); createGroups.push({ action, trace }); allTraces.push(trace); + spansToCreate.push(...spans); } } - + if (tracesToCreate.length > 0) { await opikClient.createTraces(tracesToCreate); - + await opikClient.createSpans(spansToCreate); + // Process traces and update synced groups using deterministic IDs for (const { action, trace } of createGroups) { const userMessage = action.group[0]; const lastMessage = action.group[action.group.length - 1]; const traceId = trace.id!; // We set this in convertGroupToTrace - + logger.debug(`Created trace ${traceId} for conversation ${userMessage.uuid}`); - + // Update thread tags if we have thread_id and tags if (trace.thread_id && trace.tags) { try { @@ -212,7 +222,7 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): logger.warning(`Failed to update thread tags for ${trace.thread_id}: ${error instanceof Error ? error.message : error}`); } } - + // Add to synced groups updatedSyncedGroups.push({ traceId, @@ -220,28 +230,30 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): lastMessageUuid: lastMessage.uuid, messageCount: action.group.length }); - + totalTraces++; } } } - + // Process update actions individually for (const action of updateActions) { if (!action.traceId) continue; - + const userMessage = action.group[0]; const lastMessage = action.group[action.group.length - 1]; - - const trace = await convertGroupToTrace(action.group); - if (!trace) continue; - + + const res = await convertGroupToTrace(action.group, opikConfig); + if (!res) continue; + + const { trace } = res; + allTraces.push(trace); - + // Update existing trace await opikClient.updateTrace(action.traceId, trace); logger.debug(`Updated trace ${action.traceId} for conversation ${userMessage.uuid}`); - + // Update thread tags if we have thread_id and tags if (trace.thread_id && trace.tags) { try { @@ -251,7 +263,7 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): logger.warning(`Failed to update thread tags for ${trace.thread_id}: ${error instanceof Error ? error.message : error}`); } } - + // Update synced groups const existingIndex = updatedSyncedGroups.findIndex(sg => sg.traceId === action.traceId); if (existingIndex !== -1) { @@ -262,19 +274,19 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): messageCount: action.group.length }; } - + totalTraces++; } - + writeFileSync(tmpFile, JSON.stringify(allTraces, null, 2)); logger.debug(`Opik traces written to: ${tmpFile}`); - + const finalCreateCount = groupActions.filter(a => a.action === 'create').length; const finalUpdateCount = groupActions.filter(a => a.action === 'update').length; - + // Show sync summary logger.syncSummary(sessionId, syncReason, finalCreateCount, finalUpdateCount, totalTraces); - + // Update synced groups in state try { await syncStateManager.updateSyncedGroups(sessionId, sessionFile, updatedSyncedGroups); @@ -283,13 +295,13 @@ export async function syncSession(sessionId: string, options: SyncOptions = {}): logger.warning(`Failed to update sync state: ${stateError instanceof Error ? stateError.message : stateError}`); logger.warning(`Sync completed successfully but state update failed`); } - + } catch (error) { logger.error(`Failed to sync to Opik: ${error instanceof Error ? error.message : error}`); logger.info(`Traces are still available in: ${tmpFile}`); throw error; } - + return tmpFile; } @@ -297,21 +309,21 @@ export async function syncProject(projectPath: string | null, options: SyncOptio const { listSessions } = await import('./parsers/claude'); const { getClaudeDataDir } = await import('./config'); const logger = createLogger({ verbose: options.verbose, dryRun: options.dryRun }); - + const claudeDataDir = getClaudeDataDir(); const sessions = listSessions(projectPath || undefined, claudeDataDir); - + if (sessions.length === 0) { const message = projectPath ? `No sessions found for project: ${projectPath}` : 'No sessions found'; logger.info(message); return; } - + logger.info(`Found ${sessions.length} session${sessions.length === 1 ? '' : 's'} to sync`); - + let syncedCount = 0; let skippedCount = 0; - + for (const session of sessions) { try { const result = await syncSession(session.sessionId, options); @@ -326,6 +338,6 @@ export async function syncProject(projectPath: string | null, options: SyncOptio logger.error(`Failed to sync session ${session.sessionId}: ${error instanceof Error ? error.message : error}`); } } - + logger.info(`Sync completed: ${syncedCount} synced, ${skippedCount} skipped`); } diff --git a/src/types.ts b/src/types.ts index 3f0b04b..203c32c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,6 +3,7 @@ export interface ClaudeMessage { isSidechain: boolean; userType: string; cwd: string; + slug: string; sessionId: string; version: string; type: 'user' | 'assistant'; @@ -39,12 +40,31 @@ export interface OpikTrace { thread_id?: string; } +export interface OpikSpan { + id?: string; + project_name?: string; + name?: string; + type?: 'general' | 'tool' | 'llm' | 'guardrail'; + start_time: string; + end_time?: string; + input?: any; + output?: any; + metadata?: any; + tags?: string[]; + error_info?: any; + trace_id?: string; + provider?: string; + model?: string; + usage?: any; +} + export interface OpikConfig { base_url: string; api_key?: string; project_name?: string; workspace?: string; is_local?: boolean; + provider?: string; } export interface SyncOptions { @@ -64,4 +84,4 @@ export interface SessionInfo { description: string; timestamp: string; messageCount: number; -} \ No newline at end of file +}