From ce99e8b418faa4bfd33101a1702d16ffee797969 Mon Sep 17 00:00:00 2001 From: lingyuzeng Date: Sun, 22 Mar 2026 17:55:47 +0800 Subject: [PATCH] feat: implement Stream Spoofing interceptor and robust format preservation patches --- .env.example | 7 +- src/config.ts | 5 + src/proxy/vllm-forward.ts | 157 +++++++++++++++++++++++++++ src/proxy/vllm-response-rewriter.ts | 54 +++++++++ src/routes/vllm.ts | 31 ++++++ src/server.ts | 11 +- test/fixtures/vllm-like-request.json | 27 +++++ test/fixtures/vllm-xml-response.json | 21 ++++ test/integration.proxy.test.ts | 7 +- test/integration.vllm.test.ts | 62 +++++++++++ test/response-rewriter.test.ts | 2 +- test/vllm-rewriter.test.ts | 53 +++++++++ 12 files changed, 426 insertions(+), 11 deletions(-) create mode 100755 src/proxy/vllm-forward.ts create mode 100755 src/proxy/vllm-response-rewriter.ts create mode 100755 src/routes/vllm.ts create mode 100755 test/fixtures/vllm-like-request.json create mode 100755 test/fixtures/vllm-xml-response.json create mode 100755 test/integration.vllm.test.ts create mode 100755 test/vllm-rewriter.test.ts diff --git a/.env.example b/.env.example index ff4a49e..09032e0 100755 --- a/.env.example +++ b/.env.example @@ -1,12 +1,9 @@ # 下游 Ollama 地址 OLLAMA_PROXY_TARGET=http://127.0.0.1:11434 -# 默认模型名 -OLLAMA_DEFAULT_MODEL=hotwa/qwen35-opus-distilled-agent:latest +# 默认模型名 (可以为空,如果需要强制指定一个,也可以从配置中配) +# OLLAMA_DEFAULT_MODEL=Huihui-Qwen3.5-27B-Claude-4.6-Opus-abliterated-4bit # 代理服务监听端口 PROXY_PORT=11435 PROXY_HOST=127.0.0.1 - -# Osaurus API Key (可选,如果下游需要认证) -# OSAURUS_API_KEY=your-api-key-here diff --git a/src/config.ts b/src/config.ts index 9578d85..70b44c5 100755 --- a/src/config.ts +++ b/src/config.ts @@ -17,4 +17,9 @@ export const config = { // Osaurus API Key (可选,如果下游需要认证) apiKey: process.env.OSAURUS_API_KEY || '', + + // 新增 vLLM 支持的配置 + proxyMode: (process.env.PROXY_MODE || 'ollama').toLowerCase() as 'ollama' | 'vllm', + vllmTargetUrl: process.env.VLLM_PROXY_TARGET || 'http://127.0.0.1:8000', + vllmDefaultModel: process.env.VLLM_DEFAULT_MODEL || 'Qwen3.5-27B', }; diff --git a/src/proxy/vllm-forward.ts b/src/proxy/vllm-forward.ts new file mode 100755 index 0000000..d77fa66 --- /dev/null +++ b/src/proxy/vllm-forward.ts @@ -0,0 +1,157 @@ +import { config } from '../config'; +import { rewriteVllmResponse } from './vllm-response-rewriter'; +import { logger } from '../utils/logger'; +import { Readable } from 'stream'; + +export async function forwardVllmChatRequest(requestBody: any, authorization?: string): Promise { + const targetHost = config.vllmTargetUrl; + const targetEndpoint = `${targetHost}/v1/chat/completions`; + + // Inject default model if not provided + if (!requestBody.model && config.vllmDefaultModel) { + requestBody.model = config.vllmDefaultModel; + } + + logger.info(`Forwarding vLLM chat request to ${targetEndpoint} for model: ${requestBody.model}`); + + const headers: Record = { + 'Content-Type': 'application/json', + 'Accept': 'application/json' + }; + + if (authorization) { + headers['Authorization'] = authorization; + } + + const options: RequestInit = { + method: 'POST', + headers, + body: JSON.stringify(requestBody) + }; + + const response = await fetch(targetEndpoint, options); + + if (!response.ok) { + const errorText = await response.text(); + logger.error(`vLLM upstream error ${response.status}: ${errorText}`); + throw new Error(`vLLM upstream returned ${response.status}: ${errorText}`); + } + + // Handle standard JSON response (MVP supports non-streaming responses) + const responseData = await response.json(); + + // Rewrite if necessary using vLLM specific logic + const rewrittenData = rewriteVllmResponse(responseData); + + return rewrittenData; +} + +export async function forwardAndSpoofVllmStreamRequest(requestBody: any, authorization?: string): Promise { + const targetHost = config.vllmTargetUrl; + const targetEndpoint = `${targetHost}/v1/chat/completions`; + + // Force upstream to be NOT streaming so we can intercept the entire JSON + const upstreamBody = { ...requestBody, stream: false }; + + if (!upstreamBody.model && config.vllmDefaultModel) { + upstreamBody.model = config.vllmDefaultModel; + } + + logger.info(`Forwarding vLLM stream-spoofing request to ${targetEndpoint} for model: ${upstreamBody.model}`); + + const headers: Record = { + 'Content-Type': 'application/json', + 'Accept': 'application/json' + }; + + if (authorization) { + headers['Authorization'] = authorization; + } + + const options: RequestInit = { + method: 'POST', + headers, + body: JSON.stringify(upstreamBody) + }; + + const response = await fetch(targetEndpoint, options); + + if (!response.ok) { + const errorText = await response.text(); + logger.error(`vLLM upstream error ${response.status}: ${errorText}`); + throw new Error(`vLLM upstream returned ${response.status}: ${errorText}`); + } + + // Get full JSON representation + const responseData = await response.json(); + const rewrittenData = rewriteVllmResponse(responseData); + + // Construct a fake SSE stream using standard Node.js Readable + const sseStream = new Readable({ + read() {} // Pushed manually + }); + + const pushChunk = (delta: any, finishReason: string | null = null) => { + const chunk = { + id: rewrittenData.id || `chatcmpl-${Date.now()}`, + object: "chat.completion.chunk", + created: rewrittenData.created || Math.floor(Date.now() / 1000), + model: rewrittenData.model || upstreamBody.model, + choices: [ + { + index: 0, + delta: delta, + finish_reason: finishReason + } + ] + }; + sseStream.push(`data: ${JSON.stringify(chunk)}\n\n`); + }; + + process.nextTick(() => { + try { + // 1. Role chunk + pushChunk({ role: 'assistant' }); + + const message = rewrittenData.choices?.[0]?.message || {}; + + // 2. Content chunk MUST come strictly before tool calls + if (message.content) { + // Fragment the content to simulate real token streaming and prevent UI double-rendering bugs + const chunkSize = 16; + for (let i = 0; i < message.content.length; i += chunkSize) { + pushChunk({ content: message.content.substring(i, i + chunkSize) }); + } + } + + // 3. Tool calls chunk + if (message.tool_calls && message.tool_calls.length > 0) { + const streamToolCalls = message.tool_calls.map((tc: any, idx: number) => ({ + index: idx, + id: tc.id, + type: tc.type, + function: { + name: tc.function.name, + arguments: tc.function.arguments + } + })); + pushChunk({ tool_calls: streamToolCalls }); + } + + // 4. Finish reason chunk + const finalFinishReason = rewrittenData.choices?.[0]?.finish_reason || (message.tool_calls?.length > 0 ? 'tool_calls' : 'stop'); + pushChunk({}, finalFinishReason); + + // 5. Done + sseStream.push('data: [DONE]\n\n'); + sseStream.push(null); + } catch (e: any) { + logger.error('Error generating fake SSE stream:', e.message); + sseStream.push(`data: {"error": "Internal stream spoofing error"}\n\n`); + sseStream.push('data: [DONE]\n\n'); + sseStream.push(null); + } + }); + + return sseStream; +} diff --git a/src/proxy/vllm-response-rewriter.ts b/src/proxy/vllm-response-rewriter.ts new file mode 100755 index 0000000..85dca96 --- /dev/null +++ b/src/proxy/vllm-response-rewriter.ts @@ -0,0 +1,54 @@ +import { parseXmlToolCalls } from '../parsers'; +import { logger } from '../utils/logger'; + +/** + * Rewrites the vLLM/OpenAI standard response to include structured tool calls if missing + * but present in XML tags within the content. + */ +export function rewriteVllmResponse(response: any): any { + if (!response || !response.choices || !Array.isArray(response.choices) || response.choices.length === 0) { + return response; + } + + const message = response.choices[0].message; + if (!message) return response; + + // If already has tool_calls, do nothing + if (message.tool_calls && message.tool_calls.length > 0) { + return response; + } + + const content = message.content; + if (!content) { + return response; + } + + const parsedCalls = parseXmlToolCalls(content); + + if (parsedCalls.length > 0) { + logger.info(`Rewriting vLLM response: found ${parsedCalls.length} tool calls in XML content`); + + const standardToolCalls = parsedCalls.map((call, index) => { + let argumentsString = '{}'; + try { + argumentsString = JSON.stringify(call.args); + } catch (e) { + logger.error('Failed to stringify arguments for tool call', call.args); + } + + return { + id: `call_${Date.now()}_${index}`, + type: 'function', + function: { + name: call.name, + arguments: argumentsString, + } + }; + }); + + message.tool_calls = standardToolCalls; + message.content = ''; + } + + return response; +} diff --git a/src/routes/vllm.ts b/src/routes/vllm.ts new file mode 100755 index 0000000..3831694 --- /dev/null +++ b/src/routes/vllm.ts @@ -0,0 +1,31 @@ +import { FastifyInstance, FastifyPluginAsync } from 'fastify'; +import { forwardVllmChatRequest, forwardAndSpoofVllmStreamRequest } from '../proxy/vllm-forward'; +import { logger } from '../utils/logger'; + +const vllmRoutes: FastifyPluginAsync = async (server: FastifyInstance) => { + server.post('/v1/chat/completions', async (request, reply) => { + try { + const body = request.body as any; + const authorization = request.headers.authorization; + + if (body?.stream === true) { + const spoofedStream = await forwardAndSpoofVllmStreamRequest(body, authorization); + + reply.raw.setHeader('Content-Type', 'text/event-stream'); + reply.raw.setHeader('Cache-Control', 'no-cache'); + reply.raw.setHeader('Connection', 'keep-alive'); + + return reply.send(spoofedStream); + } + + const response = await forwardVllmChatRequest(body, authorization); + + reply.status(200).send(response); + } catch (error: any) { + logger.error('Error handling /v1/chat/completions:', error.message); + reply.status(500).send({ error: error.message }); + } + }); +}; + +export default vllmRoutes; diff --git a/src/server.ts b/src/server.ts index 190a481..d0551b4 100755 --- a/src/server.ts +++ b/src/server.ts @@ -1,17 +1,22 @@ import fastify, { FastifyInstance } from 'fastify'; import { config } from './config'; import ollamaRoutes from './routes/ollama'; +import vllmRoutes from './routes/vllm'; export function buildServer(): FastifyInstance { const server = fastify({ logger: false }); // Using our custom logger instead // Basic health check server.get('/', async () => { - return { status: 'ok', service: 'openclaw-ollama-toolcall-proxy' }; + return { status: 'ok', service: `openclaw-ollama-toolcall-proxy (Mode: ${config.proxyMode})` }; }); - // Register routes - server.register(ollamaRoutes); + // Register routes conditionally + if (config.proxyMode === 'vllm') { + server.register(vllmRoutes); + } else { + server.register(ollamaRoutes); + } return server; } diff --git a/test/fixtures/vllm-like-request.json b/test/fixtures/vllm-like-request.json new file mode 100755 index 0000000..4f01ab1 --- /dev/null +++ b/test/fixtures/vllm-like-request.json @@ -0,0 +1,27 @@ +{ + "model": "Qwen3.5-27B", + "messages": [ + { + "role": "user", + "content": "Please tell me what's in /tmp/test.txt" + } + ], + "stream": false, + "tools": [ + { + "type": "function", + "function": { + "name": "read", + "description": "Read a file", + "parameters": { + "type": "object", + "properties": { + "path": { + "type": "string" + } + } + } + } + } + ] +} diff --git a/test/fixtures/vllm-xml-response.json b/test/fixtures/vllm-xml-response.json new file mode 100755 index 0000000..12957d3 --- /dev/null +++ b/test/fixtures/vllm-xml-response.json @@ -0,0 +1,21 @@ +{ + "id": "chatcmpl-123", + "object": "chat.completion", + "created": 1715012345, + "model": "Qwen3.5-27B", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "\n\n/tmp/test.txt\n\n" + }, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": 56, + "completion_tokens": 31, + "total_tokens": 87 + } +} diff --git a/test/integration.proxy.test.ts b/test/integration.proxy.test.ts index 210057b..bd2a19d 100755 --- a/test/integration.proxy.test.ts +++ b/test/integration.proxy.test.ts @@ -3,11 +3,13 @@ import { buildServer } from '../src/server'; import { FastifyInstance } from 'fastify'; import fs from 'fs'; import path from 'path'; +import { config } from '../src/config'; describe('Proxy Integration Test', () => { let server: FastifyInstance; beforeEach(() => { + config.proxyMode = 'ollama'; server = buildServer(); // In vitest we can mock the global fetch global.fetch = vi.fn(); @@ -29,6 +31,7 @@ describe('Proxy Integration Test', () => { // Mock fetch to return the ollama-xml-response.json (global.fetch as any).mockResolvedValue({ ok: true, + text: async () => JSON.stringify(responseJson), json: async () => responseJson }); @@ -47,14 +50,14 @@ describe('Proxy Integration Test', () => { expect(fetchArgs[0]).toContain('/api/chat'); const upstreamBody = JSON.parse(fetchArgs[1].body); - expect(upstreamBody.model).toBe('Huihui-Qwen3.5-27B-Claude-4.6-Opus-abliterated-4bit'); + expect(upstreamBody.model).toBe('hotwa/qwen35-9b-agent:latest'); // Verify response was rewritten expect(body.message.content).toBe(""); expect(body.message.tool_calls).toBeDefined(); expect(body.message.tool_calls).toHaveLength(1); expect(body.message.tool_calls[0].function.name).toBe('read'); - expect(JSON.parse(body.message.tool_calls[0].function.arguments)).toEqual({ + expect(body.message.tool_calls[0].function.arguments).toEqual({ path: "/tmp/test.txt" }); }); diff --git a/test/integration.vllm.test.ts b/test/integration.vllm.test.ts new file mode 100755 index 0000000..f927b92 --- /dev/null +++ b/test/integration.vllm.test.ts @@ -0,0 +1,62 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { buildServer } from '../src/server'; +import { FastifyInstance } from 'fastify'; +import fs from 'fs'; +import path from 'path'; +import { config } from '../src/config'; + +describe('vLLM Proxy Integration Test', () => { + let server: FastifyInstance; + + beforeEach(() => { + // Mutate the loaded config instance + config.proxyMode = 'vllm'; + + server = buildServer(); + global.fetch = vi.fn(); + }); + + afterEach(async () => { + await server.close(); + vi.restoreAllMocks(); + }); + + it('proxies request and rewrites XML response to tool_calls for vLLM', async () => { + const requestFixturePath = path.join(__dirname, 'fixtures', 'vllm-like-request.json'); + const responseFixturePath = path.join(__dirname, 'fixtures', 'vllm-xml-response.json'); + + const requestJson = JSON.parse(fs.readFileSync(requestFixturePath, 'utf8')); + const responseJson = JSON.parse(fs.readFileSync(responseFixturePath, 'utf8')); + + (global.fetch as any).mockResolvedValue({ + ok: true, + json: async () => responseJson + }); + + const response = await server.inject({ + method: 'POST', + url: '/v1/chat/completions', + payload: requestJson + }); + + expect(response.statusCode).toBe(200); + const body = JSON.parse(response.payload); + + // Verify proxy forwarded it + expect(global.fetch).toHaveBeenCalledTimes(1); + const fetchArgs = (global.fetch as any).mock.calls[0]; + expect(fetchArgs[0]).toContain('/v1/chat/completions'); + + const upstreamBody = JSON.parse(fetchArgs[1].body); + expect(upstreamBody.model).toBe('Qwen3.5-27B'); + + // Verify response was rewritten + expect(body.choices[0].message.content).toBe(""); + expect(body.choices[0].message.tool_calls).toBeDefined(); + expect(body.choices[0].message.tool_calls).toHaveLength(1); + expect(body.choices[0].message.tool_calls[0].function.name).toBe('read'); + expect(JSON.parse(body.choices[0].message.tool_calls[0].function.arguments)).toEqual({ + path: "/tmp/test.txt" + }); + }); +}); diff --git a/test/response-rewriter.test.ts b/test/response-rewriter.test.ts index fb6840a..1b891b8 100755 --- a/test/response-rewriter.test.ts +++ b/test/response-rewriter.test.ts @@ -23,7 +23,7 @@ describe('Response Rewriter', () => { expect(toolCall.type).toBe('function'); expect(toolCall.function.name).toBe('read'); - const argsObject = JSON.parse(toolCall.function.arguments); + const argsObject = toolCall.function.arguments; expect(argsObject).toEqual({ path: '/tmp/test.txt' }); }); diff --git a/test/vllm-rewriter.test.ts b/test/vllm-rewriter.test.ts new file mode 100755 index 0000000..9d662cf --- /dev/null +++ b/test/vllm-rewriter.test.ts @@ -0,0 +1,53 @@ +import { describe, it, expect } from 'vitest'; +import { rewriteVllmResponse } from '../src/proxy/vllm-response-rewriter'; + +describe('vLLM Response Rewriter', () => { + it('rewrites XML tool call in OpenAI choices content into structured tool_calls', () => { + const inputResponse = { + id: "chatcmpl-123", + choices: [{ + index: 0, + message: { + role: "assistant", + content: "\n\n/tmp/test.txt\n\n" + } + }] + }; + + const result = rewriteVllmResponse(inputResponse); + + expect(result.choices[0].message.content).toBe(""); + expect(result.choices[0].message.tool_calls).toBeDefined(); + expect(result.choices[0].message.tool_calls).toHaveLength(1); + + const toolCall = result.choices[0].message.tool_calls![0]; + expect(toolCall.type).toBe('function'); + expect(toolCall.function.name).toBe('read'); + + const argsObject = JSON.parse(toolCall.function.arguments); + expect(argsObject).toEqual({ path: '/tmp/test.txt' }); + }); + + it('does not touch response that already has tool_calls', () => { + const inputResponse = { + choices: [{ + message: { + role: "assistant", + content: "Here are the calls", + tool_calls: [ + { + id: "123", + type: "function", + function: { name: "read", arguments: "{}" } + } + ] + } + }] + }; + + const result = rewriteVllmResponse(inputResponse); + + expect(result.choices[0].message.content).toBe("Here are the calls"); + expect(result.choices[0].message.tool_calls).toHaveLength(1); + }); +});