class AnthropicServingMessages(OpenAIServingChat):
"""Handler for Anthropic Messages API requests"""
def __init__(
self,
engine_client: EngineClient,
models: OpenAIServingModels,
response_role: str,
*,
request_logger: RequestLogger | None,
chat_template: str | None,
chat_template_content_format: ChatTemplateContentFormatOption,
return_tokens_as_token_ids: bool = False,
reasoning_parser: str = "",
enable_auto_tools: bool = False,
tool_parser: str | None = None,
enable_prompt_tokens_details: bool = False,
enable_force_include_usage: bool = False,
):
super().__init__(
engine_client=engine_client,
models=models,
response_role=response_role,
request_logger=request_logger,
chat_template=chat_template,
chat_template_content_format=chat_template_content_format,
return_tokens_as_token_ids=return_tokens_as_token_ids,
reasoning_parser=reasoning_parser,
enable_auto_tools=enable_auto_tools,
tool_parser=tool_parser,
enable_prompt_tokens_details=enable_prompt_tokens_details,
enable_force_include_usage=enable_force_include_usage,
)
self.stop_reason_map = {
"stop": "end_turn",
"length": "max_tokens",
"tool_calls": "tool_use",
}
def _convert_anthropic_to_openai_request(
self, anthropic_request: AnthropicMessagesRequest
) -> ChatCompletionRequest:
"""Convert Anthropic message format to OpenAI format"""
openai_messages = []
# Add system message if provided
if anthropic_request.system:
if isinstance(anthropic_request.system, str):
openai_messages.append(
{"role": "system", "content": anthropic_request.system}
)
else:
system_prompt = ""
for block in anthropic_request.system:
if block.type == "text" and block.text:
system_prompt += block.text
openai_messages.append({"role": "system", "content": system_prompt})
for msg in anthropic_request.messages:
openai_msg: dict[str, Any] = {"role": msg.role} # type: ignore
if isinstance(msg.content, str):
openai_msg["content"] = msg.content
else:
# Handle complex content blocks
content_parts: list[dict[str, Any]] = []
tool_calls: list[dict[str, Any]] = []
for block in msg.content:
if block.type == "text" and block.text:
content_parts.append({"type": "text", "text": block.text})
elif block.type == "image" and block.source:
content_parts.append(
{
"type": "image_url",
"image_url": {"url": block.source.get("data", "")},
}
)
elif block.type == "tool_use":
# Convert tool use to function call format
tool_call = {
"id": block.id or f"call_{int(time.time())}",
"type": "function",
"function": {
"name": block.name or "",
"arguments": json.dumps(block.input or {}),
},
}
tool_calls.append(tool_call)
elif block.type == "tool_result":
if msg.role == "user":
openai_messages.append(
{
"role": "tool",
"tool_call_id": block.id or "",
"content": str(block.content)
if block.content
else "",
}
)
else:
# Assistant tool result becomes regular text
tool_result_text = (
str(block.content) if block.content else ""
)
content_parts.append(
{
"type": "text",
"text": f"Tool result: {tool_result_text}",
}
)
# Add tool calls to the message if any
if tool_calls:
openai_msg["tool_calls"] = tool_calls # type: ignore
# Add content parts if any
if content_parts:
if len(content_parts) == 1 and content_parts[0]["type"] == "text":
openai_msg["content"] = content_parts[0]["text"]
else:
openai_msg["content"] = content_parts # type: ignore
elif not tool_calls:
continue
openai_messages.append(openai_msg)
req = ChatCompletionRequest(
model=anthropic_request.model,
messages=openai_messages,
max_tokens=anthropic_request.max_tokens,
max_completion_tokens=anthropic_request.max_tokens,
stop=anthropic_request.stop_sequences,
temperature=anthropic_request.temperature,
top_p=anthropic_request.top_p,
top_k=anthropic_request.top_k,
)
if anthropic_request.stream:
req.stream = anthropic_request.stream
req.stream_options = StreamOptions.validate({"include_usage": True})
if anthropic_request.tool_choice is None:
req.tool_choice = None
elif anthropic_request.tool_choice.type == "auto":
req.tool_choice = "auto"
elif anthropic_request.tool_choice.type == "any":
req.tool_choice = "required"
elif anthropic_request.tool_choice.type == "tool":
req.tool_choice = ChatCompletionNamedToolChoiceParam.model_validate(
{
"type": "function",
"function": {"name": anthropic_request.tool_choice.name},
}
)
tools = []
if anthropic_request.tools is None:
return req
for tool in anthropic_request.tools:
tools.append(
ChatCompletionToolsParam.model_validate(
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema,
},
}
)
)
if req.tool_choice is None:
req.tool_choice = "auto"
req.tools = tools
return req
async def create_messages(
self,
request: AnthropicMessagesRequest,
raw_request: Request | None = None,
) -> AsyncGenerator[str, None] | AnthropicMessagesResponse | ErrorResponse:
"""
Messages API similar to Anthropic's API.
See https://docs.anthropic.com/en/api/messages
for the API specification. This API mimics the Anthropic messages API.
"""
logger.debug("Received messages request %s", request.model_dump_json())
chat_req = self._convert_anthropic_to_openai_request(request)
logger.debug("Convert to OpenAI request %s", request.model_dump_json())
generator = await self.create_chat_completion(chat_req, raw_request)
if isinstance(generator, ErrorResponse):
return generator
elif isinstance(generator, ChatCompletionResponse):
return self.messages_full_converter(generator)
return self.message_stream_converter(generator)
def messages_full_converter(
self,
generator: ChatCompletionResponse,
) -> AnthropicMessagesResponse:
result = AnthropicMessagesResponse(
id=generator.id,
content=[],
model=generator.model,
usage=AnthropicUsage(
input_tokens=generator.usage.prompt_tokens,
output_tokens=generator.usage.completion_tokens,
),
)
if generator.choices[0].finish_reason == "stop":
result.stop_reason = "end_turn"
elif generator.choices[0].finish_reason == "length":
result.stop_reason = "max_tokens"
elif generator.choices[0].finish_reason == "tool_calls":
result.stop_reason = "tool_use"
content: list[AnthropicContentBlock] = [
AnthropicContentBlock(
type="text",
text=generator.choices[0].message.content
if generator.choices[0].message.content
else "",
)
]
for tool_call in generator.choices[0].message.tool_calls:
anthropic_tool_call = AnthropicContentBlock(
type="tool_use",
id=tool_call.id,
name=tool_call.function.name,
input=json.loads(tool_call.function.arguments),
)
content += [anthropic_tool_call]
result.content = content
return result
async def message_stream_converter(
self,
generator: AsyncGenerator[str, None],
) -> AsyncGenerator[str, None]:
try:
first_item = True
finish_reason = None
content_block_index = 0
content_block_started = False
async for item in generator:
if item.startswith("data:"):
data_str = item[5:].strip().rstrip("\n")
if data_str == "[DONE]":
stop_message = AnthropicStreamEvent(
type="message_stop",
)
data = stop_message.model_dump_json(
exclude_unset=True, exclude_none=True
)
yield wrap_data_with_event(data, "message_stop")
yield "data: [DONE]\n\n"
else:
origin_chunk = ChatCompletionStreamResponse.model_validate_json(
data_str
)
if first_item:
chunk = AnthropicStreamEvent(
type="message_start",
message=AnthropicMessagesResponse(
id=origin_chunk.id,
content=[],
model=origin_chunk.model,
),
)
first_item = False
data = chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "message_start")
continue
# last chunk including usage info
if len(origin_chunk.choices) == 0:
if content_block_started:
stop_chunk = AnthropicStreamEvent(
index=content_block_index,
type="content_block_stop",
)
data = stop_chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "content_block_stop")
stop_reason = self.stop_reason_map.get(
finish_reason or "stop"
)
chunk = AnthropicStreamEvent(
type="message_delta",
delta=AnthropicDelta(stop_reason=stop_reason),
usage=AnthropicUsage(
input_tokens=origin_chunk.usage.prompt_tokens
if origin_chunk.usage
else 0,
output_tokens=origin_chunk.usage.completion_tokens
if origin_chunk.usage
else 0,
),
)
data = chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "message_delta")
continue
if origin_chunk.choices[0].finish_reason is not None:
finish_reason = origin_chunk.choices[0].finish_reason
continue
# content
if origin_chunk.choices[0].delta.content is not None:
if not content_block_started:
chunk = AnthropicStreamEvent(
index=content_block_index,
type="content_block_start",
content_block=AnthropicContentBlock(
type="text", text=""
),
)
data = chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "content_block_start")
content_block_started = True
if origin_chunk.choices[0].delta.content == "":
continue
chunk = AnthropicStreamEvent(
index=content_block_index,
type="content_block_delta",
delta=AnthropicDelta(
type="text_delta",
text=origin_chunk.choices[0].delta.content,
),
)
data = chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "content_block_delta")
continue
# tool calls
elif len(origin_chunk.choices[0].delta.tool_calls) > 0:
tool_call = origin_chunk.choices[0].delta.tool_calls[0]
if tool_call.id is not None:
if content_block_started:
stop_chunk = AnthropicStreamEvent(
index=content_block_index,
type="content_block_stop",
)
data = stop_chunk.model_dump_json(
exclude_unset=True
)
yield wrap_data_with_event(
data, "content_block_stop"
)
content_block_started = False
content_block_index += 1
chunk = AnthropicStreamEvent(
index=content_block_index,
type="content_block_start",
content_block=AnthropicContentBlock(
type="tool_use",
id=tool_call.id,
name=tool_call.function.name
if tool_call.function
else None,
input={},
),
)
data = chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "content_block_start")
content_block_started = True
else:
chunk = AnthropicStreamEvent(
index=content_block_index,
type="content_block_delta",
delta=AnthropicDelta(
type="input_json_delta",
partial_json=tool_call.function.arguments
if tool_call.function
else None,
),
)
data = chunk.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "content_block_delta")
continue
else:
error_response = AnthropicStreamEvent(
type="error",
error=AnthropicError(
type="internal_error",
message="Invalid data format received",
),
)
data = error_response.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "error")
yield "data: [DONE]\n\n"
except Exception as e:
logger.exception("Error in message stream converter.")
error_response = AnthropicStreamEvent(
type="error",
error=AnthropicError(type="internal_error", message=str(e)),
)
data = error_response.model_dump_json(exclude_unset=True)
yield wrap_data_with_event(data, "error")
yield "data: [DONE]\n\n"