From 649958677e15a683a59d99ee118df78bd1b3ad84 Mon Sep 17 00:00:00 2001 From: lingyuzeng Date: Sun, 22 Mar 2026 20:15:30 +0800 Subject: [PATCH] fix: spoof ollama streaming responses for tool calls --- src/proxy/forward.ts | 73 ++++++++++++++++++++++++++++++++++ src/routes/ollama.ts | 18 ++++----- test/integration.proxy.test.ts | 26 +++++++++--- 3 files changed, 103 insertions(+), 14 deletions(-) diff --git a/src/proxy/forward.ts b/src/proxy/forward.ts index 21b0cc9..5f0292c 100755 --- a/src/proxy/forward.ts +++ b/src/proxy/forward.ts @@ -2,6 +2,7 @@ import { config } from '../config'; import { rewriteResponse } from './response-rewriter'; import { normalizeRequest } from './request-normalizer'; import { logger } from '../utils/logger'; +import { Readable } from 'stream'; export async function forwardChatRequest(requestBody: any, authorization?: string): Promise { const targetHost = config.targetUrl; @@ -108,3 +109,75 @@ export async function forwardChatRequest(requestBody: any, authorization?: strin return rewrittenData; } + +export async function forwardAndSpoofOllamaStreamRequest(requestBody: any, authorization?: string): Promise { + const upstreamBody = { ...requestBody, stream: false }; + const rewrittenData = await forwardChatRequest(upstreamBody, authorization); + + const sseStream = new Readable({ + read() {} + }); + + const pushChunk = (delta: any, finishReason: string | null = null) => { + const chunk = { + id: rewrittenData.id || `chatcmpl-${Date.now()}`, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: rewrittenData.model || upstreamBody.model, + choices: [ + { + index: 0, + delta, + finish_reason: finishReason + } + ] + }; + + sseStream.push(`data: ${JSON.stringify(chunk)}\n\n`); + }; + + process.nextTick(() => { + try { + pushChunk({ role: 'assistant' }); + + const message = rewrittenData.message || {}; + + if (message.content) { + const chunkSize = 16; + for (let i = 0; i < message.content.length; i += chunkSize) { + pushChunk({ content: message.content.substring(i, i + chunkSize) }); + } + } + + if (message.tool_calls && message.tool_calls.length > 0) { + const streamToolCalls = message.tool_calls.map((tc: any, idx: number) => ({ + index: idx, + id: tc.id, + type: tc.type, + function: { + name: tc.function.name, + arguments: JSON.stringify(tc.function.arguments ?? {}) + } + })); + + pushChunk({ tool_calls: streamToolCalls }); + } + + const finalFinishReason = + message.tool_calls?.length > 0 + ? 'tool_calls' + : (rewrittenData.done_reason || 'stop'); + + pushChunk({}, finalFinishReason); + sseStream.push('data: [DONE]\n\n'); + sseStream.push(null); + } catch (e: any) { + logger.error('Error generating Ollama SSE stream:', e.message); + sseStream.push('data: {"error":"Internal stream spoofing error"}\n\n'); + sseStream.push('data: [DONE]\n\n'); + sseStream.push(null); + } + }); + + return sseStream; +} diff --git a/src/routes/ollama.ts b/src/routes/ollama.ts index 9cf3797..6c663fd 100755 --- a/src/routes/ollama.ts +++ b/src/routes/ollama.ts @@ -1,20 +1,20 @@ import { FastifyInstance, FastifyPluginAsync } from 'fastify'; -import { forwardChatRequest } from '../proxy/forward'; +import { forwardAndSpoofOllamaStreamRequest, forwardChatRequest } from '../proxy/forward'; import { logger } from '../utils/logger'; const ollamaRoutes: FastifyPluginAsync = async (server: FastifyInstance) => { server.post('/api/chat', async (request, reply) => { try { const body = request.body as any; - - // Currently only supporting non-streaming requests in this proxy MVP + if (body?.stream === true) { - // As per requirements: return clear error or pass through without rewriting - // We'll return a clear error for now, because stream parsing is out of scope for MVP - reply.status(400).send({ - error: "Streaming is not supported by this proxy MVP. Please set stream=false." - }); - return; + const spoofedStream = await forwardAndSpoofOllamaStreamRequest(body, request.headers.authorization); + + reply.raw.setHeader('Content-Type', 'text/event-stream'); + reply.raw.setHeader('Cache-Control', 'no-cache'); + reply.raw.setHeader('Connection', 'keep-alive'); + + return reply.send(spoofedStream); } const response = await forwardChatRequest(body, request.headers.authorization); diff --git a/test/integration.proxy.test.ts b/test/integration.proxy.test.ts index bd2a19d..f186cd3 100755 --- a/test/integration.proxy.test.ts +++ b/test/integration.proxy.test.ts @@ -62,20 +62,36 @@ describe('Proxy Integration Test', () => { }); }); - it('rejects streaming requests cleanly', async () => { + it('spoofs streaming responses for stream=true requests', async () => { const requestFixturePath = path.join(__dirname, 'fixtures', 'openclaw-like-request.json'); + const responseFixturePath = path.join(__dirname, 'fixtures', 'ollama-xml-response.json'); const requestJson = JSON.parse(fs.readFileSync(requestFixturePath, 'utf8')); + const responseJson = JSON.parse(fs.readFileSync(responseFixturePath, 'utf8')); requestJson.stream = true; + (global.fetch as any).mockResolvedValue({ + ok: true, + text: async () => JSON.stringify(responseJson), + json: async () => responseJson + }); + const response = await server.inject({ method: 'POST', url: '/api/chat', payload: requestJson }); - expect(response.statusCode).toBe(400); - const body = JSON.parse(response.payload); - expect(body.error).toContain('Streaming is not supported'); - expect(global.fetch).not.toHaveBeenCalled(); + expect(response.statusCode).toBe(200); + expect(response.headers['content-type']).toContain('text/event-stream'); + expect(global.fetch).toHaveBeenCalledTimes(1); + + const fetchArgs = (global.fetch as any).mock.calls[0]; + const upstreamBody = JSON.parse(fetchArgs[1].body); + expect(upstreamBody.stream).toBe(false); + + expect(response.payload).toContain('"role":"assistant"'); + expect(response.payload).toContain('"tool_calls"'); + expect(response.payload).toContain('"finish_reason":"tool_calls"'); + expect(response.payload).toContain('data: [DONE]'); }); });