import time from typing import Any import openai from openai import OpenAI from .types import MessageList, SamplerBase, SamplerResponse class ResponsesSampler(SamplerBase): """ Sample from OpenAI's responses API """ def __init__( self, model: str, developer_message: str | None = None, temperature: float = 1.0, max_tokens: int = 1024, reasoning_model: bool = False, reasoning_effort: str | None = None, base_url: str = "http://localhost:8000/v1", ): self.api_key_name = "OPENAI_API_KEY" self.client = OpenAI(base_url=base_url, timeout=24*60*60) self.model = model self.developer_message = developer_message self.temperature = temperature self.max_tokens = max_tokens self.image_format = "url" self.reasoning_model = reasoning_model self.reasoning_effort = reasoning_effort def _pack_message(self, role: str, content: Any) -> dict[str, Any]: return {"role": role, "content": content} def __call__(self, message_list: MessageList) -> SamplerResponse: if self.developer_message: message_list = [ self._pack_message("developer", self.developer_message) ] + message_list trial = 0 while True: try: if self.reasoning_model: reasoning = ( {"effort": self.reasoning_effort} if self.reasoning_effort else None ) response = self.client.responses.create( model=self.model, input=message_list, reasoning=reasoning, ) else: response = self.client.responses.create( model=self.model, input=message_list, temperature=self.temperature, max_output_tokens=self.max_tokens, ) for output in response.output: if hasattr(output, "text"): message_list.append(self._pack_message(getattr(output, "role", "assistant"), output.text)) elif hasattr(output, "content"): for c in output.content: # c.text handled below pass return SamplerResponse( response_text=response.output_text, response_metadata={"usage": response.usage}, actual_queried_message_list=message_list, ) except openai.BadRequestError as e: print("Bad Request Error", e) return SamplerResponse( response_text="", response_metadata={"usage": None}, actual_queried_message_list=message_list, ) except Exception as e: exception_backoff = 2**trial # expontial back off print( f"Rate limit exception so wait and retry {trial} after {exception_backoff} sec", e, ) time.sleep(exception_backoff) trial += 1 # unknown error shall throw exception