Rewrite LogCollector with file-based markers for crash resilience

Replace in-memory array storage with persistent file-based logging:
- Logs written to /tmp/ollama37-session-{timestamp}.log
- Text markers: ===MARKER:START:{ID}:{TIMESTAMP}=== for test boundaries
- Extract test-specific logs via sed to /tmp/test-{ID}-logs.txt
- Write queue prevents race conditions between log data and markers
- Line buffering prevents marker injection mid-line
- Auto-cleanup of session files older than 24 hours

Benefits:
- Crash resilient: logs persist even if test process dies
- Bounded memory: no array growth, only file I/O
- Precise boundaries: text markers unaffected by buffering delays

API unchanged - all existing integrations (executor, cli, test cases) work without modification.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Shang Chieh Tseng
2025-12-17 20:02:14 +08:00
parent e80f226507
commit 6c97e5cd61
2 changed files with 340 additions and 89 deletions

View File

@@ -54,7 +54,76 @@ A critical problem with container log analysis is temporal precision. Using `doc
- Long-running tests may exceed the time window
- Fast tests include irrelevant historical logs
The LogCollector solves this by running `docker compose logs --follow` as a background process throughout test execution. It maintains markers for each test's start and end, then extracts precisely the logs generated during that specific test. Each test step receives only its own logs for analysis.
The LogCollector solves this using a file-based architecture with embedded text markers.
#### Architecture
```
docker compose logs --follow --timestamps
/tmp/ollama37-session-{timestamp}.log (persistent file)
===MARKER:START:TC-001:2024-01-15T10:30:05Z===
[container logs during test]
===MARKER:END:TC-001:2024-01-15T10:30:15Z===
▼ sed extraction
/tmp/test-TC-001-logs.txt (test-specific logs)
```
#### Session File Format
The session file contains all docker logs with embedded markers:
```
===SESSION:START:2024-01-15T10:30:00.000Z===
ollama37 | 2024-01-15T10:30:01Z msg="starting server"
===MARKER:START:TC-RUNTIME-001:2024-01-15T10:30:05.123Z===
ollama37 | 2024-01-15T10:30:06Z msg="inference compute" library=CUDA
ollama37 | 2024-01-15T10:30:07Z msg="loaded model" layers=28
===MARKER:END:TC-RUNTIME-001:2024-01-15T10:30:15.789Z===
===MARKER:START:TC-RUNTIME-002:2024-01-15T10:30:16.000Z===
[more logs...]
===MARKER:END:TC-RUNTIME-002:2024-01-15T10:30:45.000Z===
===SESSION:END:2024-01-15T10:31:00.000Z===
```
**Marker Format**: `===MARKER:{TYPE}:{TEST_ID}:{ISO_TIMESTAMP}===`
- TYPE: `START`, `END`, or `SESSION`
- TEST_ID: Test case identifier (e.g., `TC-RUNTIME-001`)
- ISO_TIMESTAMP: When the marker was written
#### Log Extraction
Test-specific logs are extracted using sed:
```bash
# Extract logs for TC-RUNTIME-001 (excluding marker lines)
sed -n '/===MARKER:START:TC-RUNTIME-001:/,/===MARKER:END:TC-RUNTIME-001:/{/===MARKER:/d;p}' \
/tmp/ollama37-session-*.log
```
For tests still running (no END marker yet), extraction continues to EOF:
```bash
sed -n '/===MARKER:START:TC-RUNTIME-001:/,${/===MARKER:/d;p}' /tmp/ollama37-session-*.log
```
#### Design Benefits
1. **Crash Resilience**: The session file persists at `/tmp/ollama37-session-{timestamp}.log` even if the test process crashes. This enables post-mortem log analysis.
2. **Bounded Memory**: No in-memory array growth. All logs are written directly to disk.
3. **Precise Boundaries**: Text markers provide exact test boundaries regardless of docker log buffering delays.
4. **Race Condition Prevention**: All writes (both docker log data and markers) go through a serialized write queue with line buffering, ensuring markers never interleave with log lines.
#### Cleanup
- Old session files (> 24 hours) are automatically cleaned up at LogCollector startup
- Stale test log files are removed when a new test with the same ID starts
### Test Execution Flow

View File

@@ -1,73 +1,96 @@
import { spawn, ChildProcess } from "child_process";
import { writeFileSync } from "fs";
import { spawn, ChildProcess, execSync } from "child_process";
import {
createWriteStream,
WriteStream,
writeFileSync,
readFileSync,
existsSync,
unlinkSync,
readdirSync,
fsyncSync,
} from "fs";
import path from "path";
interface LogEntry {
timestamp: Date;
line: string;
}
interface TestMarker {
start: number;
end?: number;
interface TestMarkerState {
startWritten: boolean;
endWritten: boolean;
}
/**
* LogCollector runs `docker compose logs --follow` as a background process
* and captures logs with precise test boundaries.
* LogCollector pipes `docker compose logs --follow` to a persistent file
* with embedded text markers for precise test boundary extraction.
*
* This solves the log overlap problem where `docker compose logs --since=5m`
* could include logs from previous tests or miss logs if a test exceeds 5 minutes.
* Design improvements over in-memory version:
* - Crash resilient: logs persist on disk at /tmp/ollama37-session-{timestamp}.log
* - Bounded memory: no array growth, only file I/O
* - Precise markers: text-based extraction with sed
*
* Marker format: ===MARKER:{TYPE}:{TEST_ID}:{ISO_TIMESTAMP}===
* - TYPE: START, END, or SESSION
* - TEST_ID: Test case identifier (e.g., TC-RUNTIME-001)
* - ISO_TIMESTAMP: When the marker was written
*
* Log extraction uses sed to extract logs between START and END markers,
* writing to /tmp/test-{testId}-logs.txt for test steps to access.
*/
export class LogCollector {
private process: ChildProcess | null = null;
private logs: LogEntry[] = [];
private testMarkers: Map<string, TestMarker> = new Map();
private sessionFile: string;
private logFileStream: WriteStream | null = null;
private workingDir: string;
private isRunning: boolean = false;
private testMarkers: Map<string, TestMarkerState> = new Map();
private writeQueue: Promise<void> = Promise.resolve();
private lineBuffer: string = "";
constructor(workingDir: string) {
// workingDir should be the project root, docker-compose.yml is in docker/
this.workingDir = path.join(workingDir, "docker");
// Generate unique session file with timestamp
this.sessionFile = `/tmp/ollama37-session-${Date.now()}.log`;
}
/**
* Start the log collector background process
* Start the log collector background process.
* Creates session file and spawns docker compose logs --follow.
*/
async start(): Promise<void> {
if (this.isRunning) {
return;
}
// Clean up old session files (> 24 hours)
this.cleanupOldSessions();
return new Promise((resolve, reject) => {
// Create file stream first (synchronously to guarantee it exists)
this.logFileStream = createWriteStream(this.sessionFile, { flags: "a" });
// Write session header
this.logFileStream.write(
`===SESSION:START:${new Date().toISOString()}===\n`
);
// Spawn docker compose logs --follow
this.process = spawn("docker", ["compose", "logs", "--follow", "--timestamps"], {
cwd: this.workingDir,
stdio: ["ignore", "pipe", "pipe"],
});
this.process = spawn(
"docker",
["compose", "logs", "--follow", "--timestamps"],
{
cwd: this.workingDir,
stdio: ["ignore", "pipe", "pipe"],
}
);
this.isRunning = true;
// Handle stdout (main log output)
// Pipe stdout to file with line buffering
this.process.stdout?.on("data", (data: Buffer) => {
const lines = data.toString().split("\n").filter((l) => l.trim());
for (const line of lines) {
this.logs.push({
timestamp: new Date(),
line: line,
});
}
this.processLogData(data, false);
});
// Handle stderr (docker compose messages)
// Pipe stderr (docker compose messages) to file with prefix
this.process.stderr?.on("data", (data: Buffer) => {
const lines = data.toString().split("\n").filter((l) => l.trim());
for (const line of lines) {
this.logs.push({
timestamp: new Date(),
line: `[stderr] ${line}`,
});
}
this.processLogData(data, true);
});
this.process.on("error", (err) => {
@@ -75,11 +98,11 @@ export class LogCollector {
reject(err);
});
this.process.on("close", (code) => {
this.process.on("close", () => {
this.isRunning = false;
});
// Give it a moment to start
// Give docker compose time to start
setTimeout(() => {
if (this.isRunning) {
resolve();
@@ -91,25 +114,40 @@ export class LogCollector {
}
/**
* Stop the log collector background process
* Stop the log collector background process.
* Writes session end marker and closes file stream.
*/
async stop(): Promise<void> {
if (!this.process || !this.isRunning) {
return;
}
// Flush any remaining buffered data
if (this.lineBuffer) {
this.queueWrite(this.lineBuffer + "\n");
this.lineBuffer = "";
}
return new Promise((resolve) => {
let resolved = false;
const doResolve = () => {
if (!resolved) {
resolved = true;
this.isRunning = false;
// Write session end marker and close stream
if (this.logFileStream && !this.logFileStream.destroyed) {
this.logFileStream.write(
`===SESSION:END:${new Date().toISOString()}===\n`
);
this.logFileStream.end();
}
resolve();
}
};
this.process!.on("close", doResolve);
this.process!.kill("SIGTERM");
// Force kill after 5 seconds
@@ -123,84 +161,228 @@ export class LogCollector {
}
/**
* Mark the start of a test - records current log index
* Mark the start of a test - writes START marker to session file.
*/
markTestStart(testId: string): void {
this.testMarkers.set(testId, {
start: this.logs.length,
});
// Clean up any stale test log file from previous runs
const testLogPath = `/tmp/test-${testId}-logs.txt`;
if (existsSync(testLogPath)) {
try {
unlinkSync(testLogPath);
} catch {
// Ignore cleanup errors
}
}
const timestamp = new Date().toISOString();
this.testMarkers.set(testId, { startWritten: true, endWritten: false });
this.queueWrite(`===MARKER:START:${testId}:${timestamp}===\n`);
}
/**
* Mark the end of a test - records current log index and writes logs to file
* Mark the end of a test - writes END marker to session file.
*/
markTestEnd(testId: string): void {
const marker = this.testMarkers.get(testId);
if (marker) {
marker.end = this.logs.length;
const timestamp = new Date().toISOString();
const state = this.testMarkers.get(testId);
if (state) {
state.endWritten = true;
}
this.queueWrite(`===MARKER:END:${testId}:${timestamp}===\n`);
}
// Write logs to temp file for test steps to access
const logs = this.getLogsForTest(testId);
const filePath = `/tmp/test-${testId}-logs.txt`;
writeFileSync(filePath, logs);
/**
* Write current test logs to file (call during test execution).
* Extracts logs between START and END markers using sed.
* This allows test steps to access logs accumulated so far.
*/
writeCurrentLogs(testId: string): void {
const outputPath = `/tmp/test-${testId}-logs.txt`;
// Ensure pending writes are flushed to disk
this.syncFlush();
const markerState = this.testMarkers.get(testId);
if (!markerState?.startWritten) {
// No start marker yet, write empty file
writeFileSync(outputPath, "");
return;
}
try {
// Escape testId for use in sed pattern (handle special regex chars)
const escapedTestId = this.escapeRegex(testId);
let sedCmd: string;
if (markerState.endWritten) {
// Extract between START and END markers (excluding marker lines)
sedCmd = `sed -n '/===MARKER:START:${escapedTestId}:/,/===MARKER:END:${escapedTestId}:/{/===MARKER:/d;p}' "${this.sessionFile}"`;
} else {
// Extract from START to EOF (test still running, no END marker yet)
sedCmd = `sed -n '/===MARKER:START:${escapedTestId}:/,\\${/===MARKER:/d;p}' "${this.sessionFile}"`;
}
const result = execSync(sedCmd, {
encoding: "utf-8",
maxBuffer: 10 * 1024 * 1024,
});
writeFileSync(outputPath, result);
} catch {
// If extraction fails, write empty file (test steps have fallback)
writeFileSync(outputPath, "");
}
}
/**
* Get logs for a specific test (between start and end markers)
* Get logs for a specific test (between start and end markers).
*/
getLogsForTest(testId: string): string {
const marker = this.testMarkers.get(testId);
if (!marker) {
this.writeCurrentLogs(testId);
const outputPath = `/tmp/test-${testId}-logs.txt`;
try {
return readFileSync(outputPath, "utf-8");
} catch {
return "";
}
const endIndex = marker.end ?? this.logs.length;
const testLogs = this.logs.slice(marker.start, endIndex);
return testLogs.map((entry) => entry.line).join("\n");
}
/**
* Get all logs collected so far
* Get all logs collected so far (entire session file).
*/
getAllLogs(): string {
return this.logs.map((entry) => entry.line).join("\n");
this.syncFlush();
try {
return readFileSync(this.sessionFile, "utf-8");
} catch {
return "";
}
}
/**
* Get logs since a specific index
*/
getLogsSince(startIndex: number): string {
return this.logs.slice(startIndex).map((entry) => entry.line).join("\n");
}
/**
* Get current log count (useful for tracking)
*/
getLogCount(): number {
return this.logs.length;
}
/**
* Check if the collector is running
* Check if the collector is running.
*/
isActive(): boolean {
return this.isRunning;
}
/**
* Write current test logs to file (call during test execution)
* This allows test steps to access logs accumulated so far
* Get the session file path (useful for debugging).
*/
writeCurrentLogs(testId: string): void {
const marker = this.testMarkers.get(testId);
if (!marker) {
return;
}
getSessionFilePath(): string {
return this.sessionFile;
}
const testLogs = this.logs.slice(marker.start);
const filePath = `/tmp/test-${testId}-logs.txt`;
writeFileSync(filePath, testLogs.map((entry) => entry.line).join("\n"));
// ============================================
// Deprecated methods for backwards compatibility
// ============================================
/**
* @deprecated No longer meaningful with file-based approach.
* Returns all logs instead.
*/
getLogsSince(_startIndex: number): string {
return this.getAllLogs();
}
/**
* @deprecated No longer meaningful with file-based approach.
* Returns 0.
*/
getLogCount(): number {
return 0;
}
// ============================================
// Private methods
// ============================================
/**
* Process incoming log data with line buffering.
* Ensures complete lines are written and markers don't split lines.
*/
private processLogData(data: Buffer, isStderr: boolean): void {
const text = this.lineBuffer + data.toString();
const lines = text.split("\n");
// Keep incomplete last line in buffer
this.lineBuffer = lines.pop() || "";
// Write complete lines
if (lines.length > 0) {
const prefix = isStderr ? "[stderr] " : "";
const formatted = lines.map((l) => prefix + l).join("\n") + "\n";
this.queueWrite(formatted);
}
}
/**
* Queue a write to the log file.
* All writes go through this queue to prevent race conditions
* between log data and markers.
*/
private queueWrite(data: string): void {
this.writeQueue = this.writeQueue.then(() => {
return new Promise<void>((resolve) => {
if (this.logFileStream && !this.logFileStream.destroyed) {
this.logFileStream.write(data, () => resolve());
} else {
resolve();
}
});
});
}
/**
* Synchronously flush pending writes to disk.
* Uses fsync to ensure kernel buffers are written.
*/
private syncFlush(): void {
if (this.logFileStream && !this.logFileStream.destroyed) {
// Access the underlying file descriptor
const fd = (this.logFileStream as any).fd;
if (typeof fd === "number") {
try {
fsyncSync(fd);
} catch {
// Ignore fsync errors (fd might be invalid)
}
}
}
}
/**
* Escape special regex characters in test ID for sed patterns.
*/
private escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
/**
* Clean up old session files (older than 24 hours).
*/
private cleanupOldSessions(): void {
const oneDay = 24 * 60 * 60 * 1000;
const now = Date.now();
try {
const files = readdirSync("/tmp").filter((f) =>
f.startsWith("ollama37-session-")
);
for (const file of files) {
const match = file.match(/ollama37-session-(\d+)\.log/);
if (match) {
const timestamp = parseInt(match[1]);
if (now - timestamp > oneDay) {
try {
unlinkSync(`/tmp/${file}`);
} catch {
// Ignore individual file cleanup errors
}
}
}
}
} catch {
// Ignore cleanup errors
}
}
}