Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions containers/api-proxy/test-helpers/websocket-frame-helpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

/**
* Build a minimal unmasked WebSocket text frame for a given UTF-8 string.
* Only supports payloads up to 125 bytes (single-byte length field).
*
* @param {string} text JSON (or any string) to encode as a WS text frame
* @returns {Buffer}
*/
function buildFrame(text) {
const payload = Buffer.from(text, 'utf8');
if (payload.length > 125) {
throw new Error(`buildFrame only supports payloads up to 125 bytes, got ${payload.length}. Use a shorter payload or extend the helper to support extended length fields.`);
}
const header = Buffer.alloc(2);
header[0] = 0x81; // FIN + opcode 1 (text)
header[1] = payload.length;
return Buffer.concat([header, payload]);
}

/**
* Return the standard HTTP/1.1 101 Switching Protocols header buffer used in
* all Anthropic WebSocket upgrade tests.
*
* @returns {Buffer}
*/
function buildHttpUpgradeHeader() {
return Buffer.from('HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\n\r\n');
}

/**
* Build the canonical Anthropic streaming WebSocket payload consisting of:
* - HTTP 101 upgrade header
* - `message_start` frame (input_tokens: 20, output_tokens: 0)
* - `message_delta` frame (output_tokens: 8)
*
* @returns {Buffer} A single buffer ready to emit on the socket 'data' event
*/
function buildAnthropicUsageFrames() {
const httpHeader = buildHttpUpgradeHeader();
const frame1 = buildFrame(JSON.stringify({
type: 'message_start',
message: { model: 'claude-sonnet-4-20250514', usage: { input_tokens: 20, output_tokens: 0 } },
}));
const frame2 = buildFrame(JSON.stringify({
type: 'message_delta',
usage: { output_tokens: 8 },
}));
return Buffer.concat([httpHeader, frame1, frame2]);
}

module.exports = { buildFrame, buildHttpUpgradeHeader, buildAnthropicUsageFrames };
61 changes: 4 additions & 57 deletions containers/api-proxy/token-tracker.schema.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const {
TOKEN_DIAG_SCHEMA,
} = require('./token-persistence');
const { EventEmitter } = require('events');
const { buildAnthropicUsageFrames } = require('./test-helpers/websocket-frame-helpers');

afterAll(async () => {
await closeLogStream();
Expand Down Expand Up @@ -276,24 +277,6 @@ describe('token-usage JSONL record schema field', () => {
test('trackWebSocketTokenUsage path writes versioned _schema to the stream', (done) => {
const socket = new EventEmitter();

function buildFrame(text) {
const payload = Buffer.from(text, 'utf8');
const header = Buffer.alloc(2);
header[0] = 0x81;
header[1] = payload.length;
return Buffer.concat([header, payload]);
}

const httpHeader = Buffer.from('HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\n\r\n');
const frame1 = buildFrame(JSON.stringify({
type: 'message_start',
message: { model: 'claude-sonnet-4-20250514', usage: { input_tokens: 20, output_tokens: 0 } },
}));
const frame2 = buildFrame(JSON.stringify({
type: 'message_delta',
usage: { output_tokens: 8 },
}));

trackWebSocketTokenUsage(socket, {
requestId: 'schema-field-ws',
provider: 'anthropic',
Expand All @@ -302,7 +285,7 @@ describe('token-usage JSONL record schema field', () => {
metrics: null,
});

socket.emit('data', Buffer.concat([httpHeader, frame1, frame2]));
socket.emit('data', buildAnthropicUsageFrames());
socket.emit('close');

setTimeout(() => {
Expand Down Expand Up @@ -390,24 +373,6 @@ describe('token-usage JSONL record schema field', () => {
test('trackWebSocketTokenUsage path persists optional budget fields returned by onUsage', (done) => {
const socket = new EventEmitter();

function buildFrame(text) {
const payload = Buffer.from(text, 'utf8');
const header = Buffer.alloc(2);
header[0] = 0x81;
header[1] = payload.length;
return Buffer.concat([header, payload]);
}

const httpHeader = Buffer.from('HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\n\r\n');
const frame1 = buildFrame(JSON.stringify({
type: 'message_start',
message: { model: 'claude-sonnet-4-20250514', usage: { input_tokens: 20, output_tokens: 0 } },
}));
const frame2 = buildFrame(JSON.stringify({
type: 'message_delta',
usage: { output_tokens: 8 },
}));

trackWebSocketTokenUsage(socket, {
requestId: 'budget-fields-ws',
provider: 'anthropic',
Expand All @@ -423,7 +388,7 @@ describe('token-usage JSONL record schema field', () => {
}),
});

socket.emit('data', Buffer.concat([httpHeader, frame1, frame2]));
socket.emit('data', buildAnthropicUsageFrames());
socket.emit('close');

setTimeout(() => {
Expand All @@ -444,24 +409,6 @@ describe('token-usage JSONL record schema field', () => {
test('trackWebSocketTokenUsage path omits optional budget fields when onUsage returns undefined', (done) => {
const socket = new EventEmitter();

function buildFrame(text) {
const payload = Buffer.from(text, 'utf8');
const header = Buffer.alloc(2);
header[0] = 0x81;
header[1] = payload.length;
return Buffer.concat([header, payload]);
}

const httpHeader = Buffer.from('HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\n\r\n');
const frame1 = buildFrame(JSON.stringify({
type: 'message_start',
message: { model: 'claude-sonnet-4-20250514', usage: { input_tokens: 20, output_tokens: 0 } },
}));
const frame2 = buildFrame(JSON.stringify({
type: 'message_delta',
usage: { output_tokens: 8 },
}));

trackWebSocketTokenUsage(socket, {
requestId: 'budget-fields-ws-none',
provider: 'anthropic',
Expand All @@ -471,7 +418,7 @@ describe('token-usage JSONL record schema field', () => {
onUsage: () => undefined,
});

socket.emit('data', Buffer.concat([httpHeader, frame1, frame2]));
socket.emit('data', buildAnthropicUsageFrames());
socket.emit('close');

setTimeout(() => {
Expand Down
Loading