feat: implement Stream Spoofing interceptor and robust format preservation patches

This commit is contained in:
lingyuzeng
2026-03-22 17:55:47 +08:00
parent ebd73e1c69
commit ce99e8b418
12 changed files with 426 additions and 11 deletions

View File

@@ -1,12 +1,9 @@
# 下游 Ollama 地址
OLLAMA_PROXY_TARGET=http://127.0.0.1:11434
# 默认模型名
OLLAMA_DEFAULT_MODEL=hotwa/qwen35-opus-distilled-agent:latest
# 默认模型名 (可以为空,如果需要强制指定一个,也可以从配置中配)
# OLLAMA_DEFAULT_MODEL=Huihui-Qwen3.5-27B-Claude-4.6-Opus-abliterated-4bit
# 代理服务监听端口
PROXY_PORT=11435
PROXY_HOST=127.0.0.1
# Osaurus API Key (可选,如果下游需要认证)
# OSAURUS_API_KEY=your-api-key-here

View File

@@ -17,4 +17,9 @@ export const config = {
// Osaurus API Key (可选,如果下游需要认证)
apiKey: process.env.OSAURUS_API_KEY || '',
// 新增 vLLM 支持的配置
proxyMode: (process.env.PROXY_MODE || 'ollama').toLowerCase() as 'ollama' | 'vllm',
vllmTargetUrl: process.env.VLLM_PROXY_TARGET || 'http://127.0.0.1:8000',
vllmDefaultModel: process.env.VLLM_DEFAULT_MODEL || 'Qwen3.5-27B',
};

157
src/proxy/vllm-forward.ts Executable file
View File

@@ -0,0 +1,157 @@
import { config } from '../config';
import { rewriteVllmResponse } from './vllm-response-rewriter';
import { logger } from '../utils/logger';
import { Readable } from 'stream';
export async function forwardVllmChatRequest(requestBody: any, authorization?: string): Promise<any> {
const targetHost = config.vllmTargetUrl;
const targetEndpoint = `${targetHost}/v1/chat/completions`;
// Inject default model if not provided
if (!requestBody.model && config.vllmDefaultModel) {
requestBody.model = config.vllmDefaultModel;
}
logger.info(`Forwarding vLLM chat request to ${targetEndpoint} for model: ${requestBody.model}`);
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'Accept': 'application/json'
};
if (authorization) {
headers['Authorization'] = authorization;
}
const options: RequestInit = {
method: 'POST',
headers,
body: JSON.stringify(requestBody)
};
const response = await fetch(targetEndpoint, options);
if (!response.ok) {
const errorText = await response.text();
logger.error(`vLLM upstream error ${response.status}: ${errorText}`);
throw new Error(`vLLM upstream returned ${response.status}: ${errorText}`);
}
// Handle standard JSON response (MVP supports non-streaming responses)
const responseData = await response.json();
// Rewrite if necessary using vLLM specific logic
const rewrittenData = rewriteVllmResponse(responseData);
return rewrittenData;
}
export async function forwardAndSpoofVllmStreamRequest(requestBody: any, authorization?: string): Promise<Readable> {
const targetHost = config.vllmTargetUrl;
const targetEndpoint = `${targetHost}/v1/chat/completions`;
// Force upstream to be NOT streaming so we can intercept the entire JSON
const upstreamBody = { ...requestBody, stream: false };
if (!upstreamBody.model && config.vllmDefaultModel) {
upstreamBody.model = config.vllmDefaultModel;
}
logger.info(`Forwarding vLLM stream-spoofing request to ${targetEndpoint} for model: ${upstreamBody.model}`);
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'Accept': 'application/json'
};
if (authorization) {
headers['Authorization'] = authorization;
}
const options: RequestInit = {
method: 'POST',
headers,
body: JSON.stringify(upstreamBody)
};
const response = await fetch(targetEndpoint, options);
if (!response.ok) {
const errorText = await response.text();
logger.error(`vLLM upstream error ${response.status}: ${errorText}`);
throw new Error(`vLLM upstream returned ${response.status}: ${errorText}`);
}
// Get full JSON representation
const responseData = await response.json();
const rewrittenData = rewriteVllmResponse(responseData);
// Construct a fake SSE stream using standard Node.js Readable
const sseStream = new Readable({
read() {} // Pushed manually
});
const pushChunk = (delta: any, finishReason: string | null = null) => {
const chunk = {
id: rewrittenData.id || `chatcmpl-${Date.now()}`,
object: "chat.completion.chunk",
created: rewrittenData.created || Math.floor(Date.now() / 1000),
model: rewrittenData.model || upstreamBody.model,
choices: [
{
index: 0,
delta: delta,
finish_reason: finishReason
}
]
};
sseStream.push(`data: ${JSON.stringify(chunk)}\n\n`);
};
process.nextTick(() => {
try {
// 1. Role chunk
pushChunk({ role: 'assistant' });
const message = rewrittenData.choices?.[0]?.message || {};
// 2. Content chunk MUST come strictly before tool calls
if (message.content) {
// Fragment the content to simulate real token streaming and prevent UI double-rendering bugs
const chunkSize = 16;
for (let i = 0; i < message.content.length; i += chunkSize) {
pushChunk({ content: message.content.substring(i, i + chunkSize) });
}
}
// 3. Tool calls chunk
if (message.tool_calls && message.tool_calls.length > 0) {
const streamToolCalls = message.tool_calls.map((tc: any, idx: number) => ({
index: idx,
id: tc.id,
type: tc.type,
function: {
name: tc.function.name,
arguments: tc.function.arguments
}
}));
pushChunk({ tool_calls: streamToolCalls });
}
// 4. Finish reason chunk
const finalFinishReason = rewrittenData.choices?.[0]?.finish_reason || (message.tool_calls?.length > 0 ? 'tool_calls' : 'stop');
pushChunk({}, finalFinishReason);
// 5. Done
sseStream.push('data: [DONE]\n\n');
sseStream.push(null);
} catch (e: any) {
logger.error('Error generating fake SSE stream:', e.message);
sseStream.push(`data: {"error": "Internal stream spoofing error"}\n\n`);
sseStream.push('data: [DONE]\n\n');
sseStream.push(null);
}
});
return sseStream;
}

View File

@@ -0,0 +1,54 @@
import { parseXmlToolCalls } from '../parsers';
import { logger } from '../utils/logger';
/**
* Rewrites the vLLM/OpenAI standard response to include structured tool calls if missing
* but present in XML tags within the content.
*/
export function rewriteVllmResponse(response: any): any {
if (!response || !response.choices || !Array.isArray(response.choices) || response.choices.length === 0) {
return response;
}
const message = response.choices[0].message;
if (!message) return response;
// If already has tool_calls, do nothing
if (message.tool_calls && message.tool_calls.length > 0) {
return response;
}
const content = message.content;
if (!content) {
return response;
}
const parsedCalls = parseXmlToolCalls(content);
if (parsedCalls.length > 0) {
logger.info(`Rewriting vLLM response: found ${parsedCalls.length} tool calls in XML content`);
const standardToolCalls = parsedCalls.map((call, index) => {
let argumentsString = '{}';
try {
argumentsString = JSON.stringify(call.args);
} catch (e) {
logger.error('Failed to stringify arguments for tool call', call.args);
}
return {
id: `call_${Date.now()}_${index}`,
type: 'function',
function: {
name: call.name,
arguments: argumentsString,
}
};
});
message.tool_calls = standardToolCalls;
message.content = '';
}
return response;
}

31
src/routes/vllm.ts Executable file
View File

@@ -0,0 +1,31 @@
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
import { forwardVllmChatRequest, forwardAndSpoofVllmStreamRequest } from '../proxy/vllm-forward';
import { logger } from '../utils/logger';
const vllmRoutes: FastifyPluginAsync = async (server: FastifyInstance) => {
server.post('/v1/chat/completions', async (request, reply) => {
try {
const body = request.body as any;
const authorization = request.headers.authorization;
if (body?.stream === true) {
const spoofedStream = await forwardAndSpoofVllmStreamRequest(body, authorization);
reply.raw.setHeader('Content-Type', 'text/event-stream');
reply.raw.setHeader('Cache-Control', 'no-cache');
reply.raw.setHeader('Connection', 'keep-alive');
return reply.send(spoofedStream);
}
const response = await forwardVllmChatRequest(body, authorization);
reply.status(200).send(response);
} catch (error: any) {
logger.error('Error handling /v1/chat/completions:', error.message);
reply.status(500).send({ error: error.message });
}
});
};
export default vllmRoutes;

View File

@@ -1,17 +1,22 @@
import fastify, { FastifyInstance } from 'fastify';
import { config } from './config';
import ollamaRoutes from './routes/ollama';
import vllmRoutes from './routes/vllm';
export function buildServer(): FastifyInstance {
const server = fastify({ logger: false }); // Using our custom logger instead
// Basic health check
server.get('/', async () => {
return { status: 'ok', service: 'openclaw-ollama-toolcall-proxy' };
return { status: 'ok', service: `openclaw-ollama-toolcall-proxy (Mode: ${config.proxyMode})` };
});
// Register routes
server.register(ollamaRoutes);
// Register routes conditionally
if (config.proxyMode === 'vllm') {
server.register(vllmRoutes);
} else {
server.register(ollamaRoutes);
}
return server;
}

27
test/fixtures/vllm-like-request.json vendored Executable file
View File

@@ -0,0 +1,27 @@
{
"model": "Qwen3.5-27B",
"messages": [
{
"role": "user",
"content": "Please tell me what's in /tmp/test.txt"
}
],
"stream": false,
"tools": [
{
"type": "function",
"function": {
"name": "read",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string"
}
}
}
}
}
]
}

21
test/fixtures/vllm-xml-response.json vendored Executable file
View File

@@ -0,0 +1,21 @@
{
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1715012345,
"model": "Qwen3.5-27B",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "<function=read>\n<parameter=path>\n/tmp/test.txt\n</parameter>\n</function>"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 56,
"completion_tokens": 31,
"total_tokens": 87
}
}

View File

@@ -3,11 +3,13 @@ import { buildServer } from '../src/server';
import { FastifyInstance } from 'fastify';
import fs from 'fs';
import path from 'path';
import { config } from '../src/config';
describe('Proxy Integration Test', () => {
let server: FastifyInstance;
beforeEach(() => {
config.proxyMode = 'ollama';
server = buildServer();
// In vitest we can mock the global fetch
global.fetch = vi.fn();
@@ -29,6 +31,7 @@ describe('Proxy Integration Test', () => {
// Mock fetch to return the ollama-xml-response.json
(global.fetch as any).mockResolvedValue({
ok: true,
text: async () => JSON.stringify(responseJson),
json: async () => responseJson
});
@@ -47,14 +50,14 @@ describe('Proxy Integration Test', () => {
expect(fetchArgs[0]).toContain('/api/chat');
const upstreamBody = JSON.parse(fetchArgs[1].body);
expect(upstreamBody.model).toBe('Huihui-Qwen3.5-27B-Claude-4.6-Opus-abliterated-4bit');
expect(upstreamBody.model).toBe('hotwa/qwen35-9b-agent:latest');
// Verify response was rewritten
expect(body.message.content).toBe("");
expect(body.message.tool_calls).toBeDefined();
expect(body.message.tool_calls).toHaveLength(1);
expect(body.message.tool_calls[0].function.name).toBe('read');
expect(JSON.parse(body.message.tool_calls[0].function.arguments)).toEqual({
expect(body.message.tool_calls[0].function.arguments).toEqual({
path: "/tmp/test.txt"
});
});

62
test/integration.vllm.test.ts Executable file
View File

@@ -0,0 +1,62 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { buildServer } from '../src/server';
import { FastifyInstance } from 'fastify';
import fs from 'fs';
import path from 'path';
import { config } from '../src/config';
describe('vLLM Proxy Integration Test', () => {
let server: FastifyInstance;
beforeEach(() => {
// Mutate the loaded config instance
config.proxyMode = 'vllm';
server = buildServer();
global.fetch = vi.fn();
});
afterEach(async () => {
await server.close();
vi.restoreAllMocks();
});
it('proxies request and rewrites XML response to tool_calls for vLLM', async () => {
const requestFixturePath = path.join(__dirname, 'fixtures', 'vllm-like-request.json');
const responseFixturePath = path.join(__dirname, 'fixtures', 'vllm-xml-response.json');
const requestJson = JSON.parse(fs.readFileSync(requestFixturePath, 'utf8'));
const responseJson = JSON.parse(fs.readFileSync(responseFixturePath, 'utf8'));
(global.fetch as any).mockResolvedValue({
ok: true,
json: async () => responseJson
});
const response = await server.inject({
method: 'POST',
url: '/v1/chat/completions',
payload: requestJson
});
expect(response.statusCode).toBe(200);
const body = JSON.parse(response.payload);
// Verify proxy forwarded it
expect(global.fetch).toHaveBeenCalledTimes(1);
const fetchArgs = (global.fetch as any).mock.calls[0];
expect(fetchArgs[0]).toContain('/v1/chat/completions');
const upstreamBody = JSON.parse(fetchArgs[1].body);
expect(upstreamBody.model).toBe('Qwen3.5-27B');
// Verify response was rewritten
expect(body.choices[0].message.content).toBe("");
expect(body.choices[0].message.tool_calls).toBeDefined();
expect(body.choices[0].message.tool_calls).toHaveLength(1);
expect(body.choices[0].message.tool_calls[0].function.name).toBe('read');
expect(JSON.parse(body.choices[0].message.tool_calls[0].function.arguments)).toEqual({
path: "/tmp/test.txt"
});
});
});

View File

@@ -23,7 +23,7 @@ describe('Response Rewriter', () => {
expect(toolCall.type).toBe('function');
expect(toolCall.function.name).toBe('read');
const argsObject = JSON.parse(toolCall.function.arguments);
const argsObject = toolCall.function.arguments;
expect(argsObject).toEqual({ path: '/tmp/test.txt' });
});

53
test/vllm-rewriter.test.ts Executable file
View File

@@ -0,0 +1,53 @@
import { describe, it, expect } from 'vitest';
import { rewriteVllmResponse } from '../src/proxy/vllm-response-rewriter';
describe('vLLM Response Rewriter', () => {
it('rewrites XML tool call in OpenAI choices content into structured tool_calls', () => {
const inputResponse = {
id: "chatcmpl-123",
choices: [{
index: 0,
message: {
role: "assistant",
content: "<function=read>\n<parameter=path>\n/tmp/test.txt\n</parameter>\n</function>"
}
}]
};
const result = rewriteVllmResponse(inputResponse);
expect(result.choices[0].message.content).toBe("");
expect(result.choices[0].message.tool_calls).toBeDefined();
expect(result.choices[0].message.tool_calls).toHaveLength(1);
const toolCall = result.choices[0].message.tool_calls![0];
expect(toolCall.type).toBe('function');
expect(toolCall.function.name).toBe('read');
const argsObject = JSON.parse(toolCall.function.arguments);
expect(argsObject).toEqual({ path: '/tmp/test.txt' });
});
it('does not touch response that already has tool_calls', () => {
const inputResponse = {
choices: [{
message: {
role: "assistant",
content: "Here are the calls",
tool_calls: [
{
id: "123",
type: "function",
function: { name: "read", arguments: "{}" }
}
]
}
}]
};
const result = rewriteVllmResponse(inputResponse);
expect(result.choices[0].message.content).toBe("Here are the calls");
expect(result.choices[0].message.tool_calls).toHaveLength(1);
});
});