Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

anthropic

Package Overview
Dependencies
Maintainers
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

anthropic

The official Python library for the anthropic API

  • 0.39.0
  • PyPI
  • Socket score

Maintainers
7

Anthropic Python API library

PyPI version

The Anthropic Python library provides convenient access to the Anthropic REST API from any Python 3.8+ application. It includes type definitions for all request params and response fields, and offers both synchronous and asynchronous clients powered by httpx.

Documentation

The REST API documentation can be found on docs.anthropic.com. The full API of this library can be found in api.md.

Installation

# install from PyPI
pip install anthropic

Usage

The full API of this library can be found in api.md.

import os
from anthropic import Anthropic

client = Anthropic(
    # This is the default and can be omitted
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
)

message = client.messages.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello, Claude",
        }
    ],
    model="claude-3-opus-20240229",
)
print(message.content)

While you can provide an api_key keyword argument, we recommend using python-dotenv to add ANTHROPIC_API_KEY="my-anthropic-api-key" to your .env file so that your API Key is not stored in source control.

Async usage

Simply import AsyncAnthropic instead of Anthropic and use await with each API call:

import os
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic(
    # This is the default and can be omitted
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
)


async def main() -> None:
    message = await client.messages.create(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Hello, Claude",
            }
        ],
        model="claude-3-opus-20240229",
    )
    print(message.content)


asyncio.run(main())

Functionality between the synchronous and asynchronous clients is otherwise identical.

Streaming responses

We provide support for streaming responses using Server Side Events (SSE).

from anthropic import Anthropic

client = Anthropic()

stream = client.messages.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello, Claude",
        }
    ],
    model="claude-3-opus-20240229",
    stream=True,
)
for event in stream:
    print(event.type)

The async client uses the exact same interface.

from anthropic import AsyncAnthropic

client = AsyncAnthropic()

stream = await client.messages.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello, Claude",
        }
    ],
    model="claude-3-opus-20240229",
    stream=True,
)
async for event in stream:
    print(event.type)

Streaming Helpers

This library provides several conveniences for streaming messages, for example:

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def main() -> None:
    async with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Say hello there!",
            }
        ],
        model="claude-3-opus-20240229",
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
        print()

    message = await stream.get_final_message()
    print(message.to_json())

asyncio.run(main())

Streaming with client.messages.stream(...) exposes various helpers for your convenience including accumulation & SDK-specific events.

Alternatively, you can use client.messages.create(..., stream=True) which only returns an async iterable of the events in the stream and thus uses less memory (it does not build up a final message object for you).

Token counting

To get the token count for a message without creating it you can use the client.beta.messages.count_tokens() method. This takes the same messages list as the .create() method.

count = client.beta.messages.count_tokens(
    model="claude-3-5-sonnet-20241022",
    messages=[
        {"role": "user", "content": "Hello, world"}
    ]
)
count.input_tokens  # 10

You can also see the exact usage for a given request through the usage response property, e.g.

message = client.messages.create(...)
message.usage
# Usage(input_tokens=25, output_tokens=13)

Message Batches

This SDK provides beta support for the Message Batches API under the client.beta.messages.batches namespace.

Creating a batch

Message Batches take the exact same request params as the standard Messages API:

await client.beta.messages.batches.create(
    requests=[
        {
            "custom_id": "my-first-request",
            "params": {
                "model": "claude-3-5-sonnet-20240620",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Hello, world"}],
            },
        },
        {
            "custom_id": "my-second-request",
            "params": {
                "model": "claude-3-5-sonnet-20240620",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Hi again, friend"}],
            },
        },
    ]
)

Getting results from a batch

Once a Message Batch has been processed, indicated by .processing_status === 'ended', you can access the results with .batches.results()

result_stream = await client.beta.messages.batches.results(batch_id)
async for entry in result_stream:
    if entry.result.type == "succeeded":
        print(entry.result.message.content)

Tool use

This SDK provides support for tool use, aka function calling. More details can be found in the documentation.

AWS Bedrock

This library also provides support for the Anthropic Bedrock API if you install this library with the bedrock extra, e.g. pip install -U anthropic[bedrock].

You can then import and instantiate a separate AnthropicBedrock class, the rest of the API is the same.

from anthropic import AnthropicBedrock

client = AnthropicBedrock()

message = client.messages.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello!",
        }
    ],
    model="anthropic.claude-3-sonnet-20240229-v1:0",
)
print(message)

The bedrock client supports the following arguments for authentication

AnthropicBedrock(
  aws_profile='...',
  aws_region='us-east'
  aws_secret_key='...',
  aws_access_key='...',
  aws_session_token='...',
)

For a more fully fledged example see examples/bedrock.py.

Google Vertex

This library also provides support for the Anthropic Vertex API if you install this library with the vertex extra, e.g. pip install -U anthropic[vertex].

You can then import and instantiate a separate AnthropicVertex/AsyncAnthropicVertex class, which has the same API as the base Anthropic/AsyncAnthropic class.

from anthropic import AnthropicVertex

client = AnthropicVertex()

message = client.messages.create(
    model="claude-3-sonnet@20240229",
    max_tokens=100,
    messages=[
        {
            "role": "user",
            "content": "Hello!",
        }
    ],
)
print(message)

For a more complete example see examples/vertex.py.

Using types

Nested request parameters are TypedDicts. Responses are Pydantic models which also provide helper methods for things like:

  • Serializing back into JSON, model.to_json()
  • Converting to a dictionary, model.to_dict()

Typed requests and responses provide autocomplete and documentation within your editor. If you would like to see type errors in VS Code to help catch bugs earlier, set python.analysis.typeCheckingMode to basic.

Pagination

List methods in the Anthropic API are paginated.

This library provides auto-paginating iterators with each list response, so you do not have to request successive pages manually:

from anthropic import Anthropic

client = Anthropic()

all_batches = []
# Automatically fetches more pages as needed.
for batch in client.beta.messages.batches.list(
    limit=20,
):
    # Do something with batch here
    all_batches.append(batch)
print(all_batches)

Or, asynchronously:

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()


async def main() -> None:
    all_batches = []
    # Iterate through items across all pages, issuing requests as needed.
    async for batch in client.beta.messages.batches.list(
        limit=20,
    ):
        all_batches.append(batch)
    print(all_batches)


asyncio.run(main())

Alternatively, you can use the .has_next_page(), .next_page_info(), or .get_next_page() methods for more granular control working with pages:

first_page = await client.beta.messages.batches.list(
    limit=20,
)
if first_page.has_next_page():
    print(f"will fetch next page using these details: {first_page.next_page_info()}")
    next_page = await first_page.get_next_page()
    print(f"number of items we just fetched: {len(next_page.data)}")

# Remove `await` for non-async usage.

Or just work directly with the returned data:

first_page = await client.beta.messages.batches.list(
    limit=20,
)

print(f"next page cursor: {first_page.last_id}")  # => "next page cursor: ..."
for batch in first_page.data:
    print(batch.id)

# Remove `await` for non-async usage.

Handling errors

When the library is unable to connect to the API (for example, due to network connection problems or a timeout), a subclass of anthropic.APIConnectionError is raised.

When the API returns a non-success status code (that is, 4xx or 5xx response), a subclass of anthropic.APIStatusError is raised, containing status_code and response properties.

All errors inherit from anthropic.APIError.

import anthropic
from anthropic import Anthropic

client = Anthropic()

try:
    client.messages.create(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Hello, Claude",
            }
        ],
        model="claude-3-opus-20240229",
    )
except anthropic.APIConnectionError as e:
    print("The server could not be reached")
    print(e.__cause__)  # an underlying Exception, likely raised within httpx.
except anthropic.RateLimitError as e:
    print("A 429 status code was received; we should back off a bit.")
except anthropic.APIStatusError as e:
    print("Another non-200-range status code was received")
    print(e.status_code)
    print(e.response)

Error codes are as followed:

Status CodeError Type
400BadRequestError
401AuthenticationError
403PermissionDeniedError
404NotFoundError
422UnprocessableEntityError
429RateLimitError
>=500InternalServerError
N/AAPIConnectionError

Retries

Certain errors are automatically retried 2 times by default, with a short exponential backoff. Connection errors (for example, due to a network connectivity problem), 408 Request Timeout, 409 Conflict, 429 Rate Limit, and >=500 Internal errors are all retried by default.

You can use the max_retries option to configure or disable retry settings:

from anthropic import Anthropic

# Configure the default for all requests:
client = Anthropic(
    # default is 2
    max_retries=0,
)

# Or, configure per-request:
client.with_options(max_retries=5).messages.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello, Claude",
        }
    ],
    model="claude-3-opus-20240229",
)

Timeouts

By default requests time out after 10 minutes. You can configure this with a timeout option, which accepts a float or an httpx.Timeout object:

from anthropic import Anthropic

# Configure the default for all requests:
client = Anthropic(
    # 20 seconds (default is 10 minutes)
    timeout=20.0,
)

# More granular control:
client = Anthropic(
    timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)

# Override per-request:
client.with_options(timeout=5.0).messages.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello, Claude",
        }
    ],
    model="claude-3-opus-20240229",
)

On timeout, an APITimeoutError is thrown.

Note that requests that time out are retried twice by default.

Default Headers

We automatically send the anthropic-version header set to 2023-06-01.

If you need to, you can override it by setting default headers per-request or on the client object.

Be aware that doing so may result in incorrect types and other unexpected or undefined behavior in the SDK.

from anthropic import Anthropic

client = Anthropic(
    default_headers={"anthropic-version": "My-Custom-Value"},
)

Advanced

Logging

We use the standard library logging module.

You can enable logging by setting the environment variable ANTHROPIC_LOG to debug.

$ export ANTHROPIC_LOG=debug

How to tell whether None means null or missing

In an API response, a field may be explicitly null, or missing entirely; in either case, its value is None in this library. You can differentiate the two cases with .model_fields_set:

if response.my_field is None:
  if 'my_field' not in response.model_fields_set:
    print('Got json like {}, without a "my_field" key present at all.')
  else:
    print('Got json like {"my_field": null}.')

Accessing raw response data (e.g. headers)

The "raw" Response object can be accessed by prefixing .with_raw_response. to any HTTP method call, e.g.,

from anthropic import Anthropic

client = Anthropic()
response = client.messages.with_raw_response.create(
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": "Hello, Claude",
    }],
    model="claude-3-opus-20240229",
)
print(response.headers.get('X-My-Header'))

message = response.parse()  # get the object that `messages.create()` would have returned
print(message.content)

These methods return an LegacyAPIResponse object. This is a legacy class as we're changing it slightly in the next major version.

For the sync client this will mostly be the same with the exception of content & text will be methods instead of properties. In the async client, all methods will be async.

A migration script will be provided & the migration in general should be smooth.

.with_streaming_response

The above interface eagerly reads the full response body when you make the request, which may not always be what you want.

To stream the response body, use .with_streaming_response instead, which requires a context manager and only reads the response body once you call .read(), .text(), .json(), .iter_bytes(), .iter_text(), .iter_lines() or .parse(). In the async client, these are async methods.

As such, .with_streaming_response methods return a different APIResponse object, and the async client returns an AsyncAPIResponse object.

with client.messages.with_streaming_response.create(
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": "Hello, Claude",
        }
    ],
    model="claude-3-opus-20240229",
) as response:
    print(response.headers.get("X-My-Header"))

    for line in response.iter_lines():
        print(line)

The context manager is required so that the response will reliably be closed.

Making custom/undocumented requests

This library is typed for convenient access to the documented API.

If you need to access undocumented endpoints, params, or response properties, the library can still be used.

Undocumented endpoints

To make requests to undocumented endpoints, you can make requests using client.get, client.post, and other http verbs. Options on the client will be respected (such as retries) when making this request.

import httpx

response = client.post(
    "/foo",
    cast_to=httpx.Response,
    body={"my_param": True},
)

print(response.headers.get("x-foo"))
Undocumented request params

If you want to explicitly send an extra param, you can do so with the extra_query, extra_body, and extra_headers request options.

Undocumented response properties

To access undocumented response properties, you can access the extra fields like response.unknown_prop. You can also get all the extra fields on the Pydantic model as a dict with response.model_extra.

Configuring the HTTP client

You can directly override the httpx client to customize it for your use case, including:

  • Support for proxies
  • Custom transports
  • Additional advanced functionality
from anthropic import Anthropic, DefaultHttpxClient

client = Anthropic(
    # Or use the `ANTHROPIC_BASE_URL` env var
    base_url="http://my.test.server.example.com:8083",
    http_client=DefaultHttpxClient(
        proxies="http://my.test.proxy.example.com",
        transport=httpx.HTTPTransport(local_address="0.0.0.0"),
    ),
)

You can also customize the client on a per-request basis by using with_options():

client.with_options(http_client=DefaultHttpxClient(...))

Managing HTTP resources

By default the library closes underlying HTTP connections whenever the client is garbage collected. You can manually close the client using the .close() method if desired, or with a context manager that closes when exiting.

Versioning

This package generally follows SemVer conventions, though certain backwards-incompatible changes may be released as minor versions:

  1. Changes that only affect static types, without breaking runtime behavior.
  2. Changes to library internals which are technically public but not intended or documented for external use. (Please open a GitHub issue to let us know if you are relying on such internals).
  3. Changes that we do not expect to impact the vast majority of users in practice.

We take backwards-compatibility seriously and work hard to ensure you can rely on a smooth upgrade experience.

We are keen for your feedback; please open an issue with questions, bugs, or suggestions.

Determining the installed version

If you've upgraded to the latest version but aren't seeing any new features you were expecting then your python environment is likely still using an older version.

You can determine the version that is being used at runtime with:

import anthropic
print(anthropic.__version__)

Requirements

Python 3.8 or higher.

Contributing

See the contributing documentation.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc