fix: spoof ollama streaming responses for tool calls
This commit is contained in:
@@ -2,6 +2,7 @@ import { config } from '../config';
|
|||||||
import { rewriteResponse } from './response-rewriter';
|
import { rewriteResponse } from './response-rewriter';
|
||||||
import { normalizeRequest } from './request-normalizer';
|
import { normalizeRequest } from './request-normalizer';
|
||||||
import { logger } from '../utils/logger';
|
import { logger } from '../utils/logger';
|
||||||
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
export async function forwardChatRequest(requestBody: any, authorization?: string): Promise<any> {
|
export async function forwardChatRequest(requestBody: any, authorization?: string): Promise<any> {
|
||||||
const targetHost = config.targetUrl;
|
const targetHost = config.targetUrl;
|
||||||
@@ -108,3 +109,75 @@ export async function forwardChatRequest(requestBody: any, authorization?: strin
|
|||||||
|
|
||||||
return rewrittenData;
|
return rewrittenData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function forwardAndSpoofOllamaStreamRequest(requestBody: any, authorization?: string): Promise<Readable> {
|
||||||
|
const upstreamBody = { ...requestBody, stream: false };
|
||||||
|
const rewrittenData = await forwardChatRequest(upstreamBody, authorization);
|
||||||
|
|
||||||
|
const sseStream = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
const pushChunk = (delta: any, finishReason: string | null = null) => {
|
||||||
|
const chunk = {
|
||||||
|
id: rewrittenData.id || `chatcmpl-${Date.now()}`,
|
||||||
|
object: 'chat.completion.chunk',
|
||||||
|
created: Math.floor(Date.now() / 1000),
|
||||||
|
model: rewrittenData.model || upstreamBody.model,
|
||||||
|
choices: [
|
||||||
|
{
|
||||||
|
index: 0,
|
||||||
|
delta,
|
||||||
|
finish_reason: finishReason
|
||||||
|
}
|
||||||
|
]
|
||||||
|
};
|
||||||
|
|
||||||
|
sseStream.push(`data: ${JSON.stringify(chunk)}\n\n`);
|
||||||
|
};
|
||||||
|
|
||||||
|
process.nextTick(() => {
|
||||||
|
try {
|
||||||
|
pushChunk({ role: 'assistant' });
|
||||||
|
|
||||||
|
const message = rewrittenData.message || {};
|
||||||
|
|
||||||
|
if (message.content) {
|
||||||
|
const chunkSize = 16;
|
||||||
|
for (let i = 0; i < message.content.length; i += chunkSize) {
|
||||||
|
pushChunk({ content: message.content.substring(i, i + chunkSize) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.tool_calls && message.tool_calls.length > 0) {
|
||||||
|
const streamToolCalls = message.tool_calls.map((tc: any, idx: number) => ({
|
||||||
|
index: idx,
|
||||||
|
id: tc.id,
|
||||||
|
type: tc.type,
|
||||||
|
function: {
|
||||||
|
name: tc.function.name,
|
||||||
|
arguments: JSON.stringify(tc.function.arguments ?? {})
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
pushChunk({ tool_calls: streamToolCalls });
|
||||||
|
}
|
||||||
|
|
||||||
|
const finalFinishReason =
|
||||||
|
message.tool_calls?.length > 0
|
||||||
|
? 'tool_calls'
|
||||||
|
: (rewrittenData.done_reason || 'stop');
|
||||||
|
|
||||||
|
pushChunk({}, finalFinishReason);
|
||||||
|
sseStream.push('data: [DONE]\n\n');
|
||||||
|
sseStream.push(null);
|
||||||
|
} catch (e: any) {
|
||||||
|
logger.error('Error generating Ollama SSE stream:', e.message);
|
||||||
|
sseStream.push('data: {"error":"Internal stream spoofing error"}\n\n');
|
||||||
|
sseStream.push('data: [DONE]\n\n');
|
||||||
|
sseStream.push(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return sseStream;
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
|
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
|
||||||
import { forwardChatRequest } from '../proxy/forward';
|
import { forwardAndSpoofOllamaStreamRequest, forwardChatRequest } from '../proxy/forward';
|
||||||
import { logger } from '../utils/logger';
|
import { logger } from '../utils/logger';
|
||||||
|
|
||||||
const ollamaRoutes: FastifyPluginAsync = async (server: FastifyInstance) => {
|
const ollamaRoutes: FastifyPluginAsync = async (server: FastifyInstance) => {
|
||||||
@@ -7,14 +7,14 @@ const ollamaRoutes: FastifyPluginAsync = async (server: FastifyInstance) => {
|
|||||||
try {
|
try {
|
||||||
const body = request.body as any;
|
const body = request.body as any;
|
||||||
|
|
||||||
// Currently only supporting non-streaming requests in this proxy MVP
|
|
||||||
if (body?.stream === true) {
|
if (body?.stream === true) {
|
||||||
// As per requirements: return clear error or pass through without rewriting
|
const spoofedStream = await forwardAndSpoofOllamaStreamRequest(body, request.headers.authorization);
|
||||||
// We'll return a clear error for now, because stream parsing is out of scope for MVP
|
|
||||||
reply.status(400).send({
|
reply.raw.setHeader('Content-Type', 'text/event-stream');
|
||||||
error: "Streaming is not supported by this proxy MVP. Please set stream=false."
|
reply.raw.setHeader('Cache-Control', 'no-cache');
|
||||||
});
|
reply.raw.setHeader('Connection', 'keep-alive');
|
||||||
return;
|
|
||||||
|
return reply.send(spoofedStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
const response = await forwardChatRequest(body, request.headers.authorization);
|
const response = await forwardChatRequest(body, request.headers.authorization);
|
||||||
|
|||||||
@@ -62,20 +62,36 @@ describe('Proxy Integration Test', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('rejects streaming requests cleanly', async () => {
|
it('spoofs streaming responses for stream=true requests', async () => {
|
||||||
const requestFixturePath = path.join(__dirname, 'fixtures', 'openclaw-like-request.json');
|
const requestFixturePath = path.join(__dirname, 'fixtures', 'openclaw-like-request.json');
|
||||||
|
const responseFixturePath = path.join(__dirname, 'fixtures', 'ollama-xml-response.json');
|
||||||
const requestJson = JSON.parse(fs.readFileSync(requestFixturePath, 'utf8'));
|
const requestJson = JSON.parse(fs.readFileSync(requestFixturePath, 'utf8'));
|
||||||
|
const responseJson = JSON.parse(fs.readFileSync(responseFixturePath, 'utf8'));
|
||||||
requestJson.stream = true;
|
requestJson.stream = true;
|
||||||
|
|
||||||
|
(global.fetch as any).mockResolvedValue({
|
||||||
|
ok: true,
|
||||||
|
text: async () => JSON.stringify(responseJson),
|
||||||
|
json: async () => responseJson
|
||||||
|
});
|
||||||
|
|
||||||
const response = await server.inject({
|
const response = await server.inject({
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
url: '/api/chat',
|
url: '/api/chat',
|
||||||
payload: requestJson
|
payload: requestJson
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(response.statusCode).toBe(400);
|
expect(response.statusCode).toBe(200);
|
||||||
const body = JSON.parse(response.payload);
|
expect(response.headers['content-type']).toContain('text/event-stream');
|
||||||
expect(body.error).toContain('Streaming is not supported');
|
expect(global.fetch).toHaveBeenCalledTimes(1);
|
||||||
expect(global.fetch).not.toHaveBeenCalled();
|
|
||||||
|
const fetchArgs = (global.fetch as any).mock.calls[0];
|
||||||
|
const upstreamBody = JSON.parse(fetchArgs[1].body);
|
||||||
|
expect(upstreamBody.stream).toBe(false);
|
||||||
|
|
||||||
|
expect(response.payload).toContain('"role":"assistant"');
|
||||||
|
expect(response.payload).toContain('"tool_calls"');
|
||||||
|
expect(response.payload).toContain('"finish_reason":"tool_calls"');
|
||||||
|
expect(response.payload).toContain('data: [DONE]');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user