Skip to content

Commit

Permalink
Merge pull request #16 from swombat/main
Browse files Browse the repository at this point in the history
Implemented streaming and stream preprocessing
  • Loading branch information
alexrudall authored Jun 10, 2024
2 parents 5487a84 + 143c124 commit faf9a01
Show file tree
Hide file tree
Showing 15 changed files with 1,423 additions and 67 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ gemspec

gem "byebug", "~> 11.1.3"
gem "dotenv", "~> 2.8.1"
gem "racc", "~> 1.7.3"
gem "rake", "~> 13.0"
gem "rspec", "~> 3.12"
gem "rubocop", "~> 1.50.2"
Expand Down
4 changes: 4 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
anthropic (0.2.0)
event_stream_parser (>= 0.3.0, < 2.0.0)
faraday (>= 1)
faraday-multipart (>= 1)

Expand All @@ -16,6 +17,7 @@ GEM
rexml
diff-lcs (1.5.0)
dotenv (2.8.1)
event_stream_parser (1.0.0)
faraday (2.7.10)
faraday-net_http (>= 2.0, < 3.1)
ruby2_keywords (>= 0.0.4)
Expand All @@ -29,6 +31,7 @@ GEM
parser (3.2.2.0)
ast (~> 2.4.1)
public_suffix (5.0.1)
racc (1.7.3)
rainbow (3.1.1)
rake (13.0.6)
regexp_parser (2.8.0)
Expand Down Expand Up @@ -74,6 +77,7 @@ DEPENDENCIES
anthropic!
byebug (~> 11.1.3)
dotenv (~> 2.8.1)
racc (~> 1.7.3)
rake (~> 13.0)
rspec (~> 3.12)
rubocop (~> 1.50.2)
Expand Down
110 changes: 110 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,116 @@ response = client.messages(
# => }
```

#### Additional parameters

You can add other parameters to the parameters hash, like `temperature` and even `top_k` or `top_p`. They will just be passed to the Anthropic server. You
can read more about the supported parameters [here](https://docs.anthropic.com/claude/reference/messages_post).

There are two special parameters, though, to do with... streaming. Keep reading to find out more.

#### JSON

If you want your output to be json, it is recommended to provide an additional message like this:

```ruby
[{ role: "user", content: "Give me the heights of the 3 tallest mountains. Answer in the provided JSON format. Only include JSON." },
{ role: "assistant", content: '[{"name": "Mountain Name", "height": "height in km"}]' }]
```

Then Claude v3, even Haiku, might respond with:

```ruby
[{"name"=>"Mount Everest", "height"=>"8.85 km"}, {"name"=>"K2", "height"=>"8.61 km"}, {"name"=>"Kangchenjunga", "height"=>"8.58 km"}]
```

### Streaming

There are two modes of streaming: raw and preprocessed. The default is raw. You can call it like this:

```ruby
client.messages(
parameters: {
model: "claude-3-haiku-20240307",
messages: [{ role: "user", content: "How high is the sky?" }],
max_tokens: 50,
stream: Proc.new { |chunk| print chunk }
}
)
```

This still returns a regular response at the end, but also gives you direct access to every single chunk returned by Anthropic as they come in. Even if you don't want to
use the streaming, you may find this useful to avoid timeouts, which can happen if you send Opus a large input context, and expect a long response... It has been known to take
several minutes to compile the full response - which is longer than our 120 second default timeout. But when streaming, the connection does not time out.

Here is an example of a stream you might get back:

```ruby
{"type"=>"message_start", "message"=>{"id"=>"msg_01WMWvcZq5JEMLf6Jja4Bven", "type"=>"message", "role"=>"assistant", "model"=>"claude-3-haiku-20240307", "stop_sequence"=>nil, "usage"=>{"input_tokens"=>13, "output_tokens"=>1}, "content"=>[], "stop_reason"=>nil}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>"There"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" is"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" no"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" single"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" defin"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>"itive"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" \""}}
...
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>"'s"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" atmosphere"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" extends"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" up"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" to"}}
{"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" about"}}
{"type"=>"content_block_stop", "index"=>0}
{"type"=>"message_delta", "delta"=>{"stop_reason"=>"max_tokens", "stop_sequence"=>nil}, "usage"=>{"output_tokens"=>50}}
{"type"=>"message_stop"}
```

Now, you may find this... somewhat less than practical. Surely, the vast majority of developers will not want to deal with so much
boilerplate json.

Luckily, you can ask the anthropic gem to preprocess things for you!

First, if you expect simple text output, you can receive it delta by delta:

```ruby
client.messages(
parameters: {
model: "claude-3-haiku-20240307",
messages: [{ role: "user", content: "How high is the sky?" }],
max_tokens: 50,
stream: Proc.new { |incremental_response, delta| process_your(incremental_response, delta) },
preprocess_stream: :text
}
)
```

The first block argument, `incremental_response`, accrues everything that's been returned so far, so you don't have to. If you just want the last bit,
then use the second, `delta` argument.

But what if you want to stream JSON?

Partial JSON is not very useful. But it is common enough to request a collection of JSON objects as a response, as in our earlier example of asking for the heights of the 3 tallest mountains.

If you ask it to, this gem will also do its best to sort this out for you:

```ruby
client.messages(
parameters: {
model: "claude-3-haiku-20240307",
messages: [{ role: "user", content: "How high is the sky?" }],
max_tokens: 50,
stream: Proc.new { |json_object| process_your(json_object) },
preprocess_stream: :json
}
)
```

Each time a `}` is reached in the stream, the preprocessor will take what it has in the preprocessing stack, pick out whatever's between the widest `{` and `}`, and try to parse it into a JSON object.
If it succeeds, it will pass you the json object, reset its preprocessing stack, and carry on.

If the parsing fails despite reaching a `}`, currently, it will catch the Error, log it to `$stdout`, ignore the malformed object, reset the preprocessing stack and carry on. This does mean that it is possible,
if the AI is sending some malformed JSON (which can happen, albeit rarely), that some objects will be lost.

## Development

After checking out the repo, run `bin/setup` to install dependencies. You can run `bin/console` for an interactive prompt that will allow you to experiment.
Expand Down
1 change: 1 addition & 0 deletions anthropic.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]

spec.add_dependency "event_stream_parser", ">= 0.3.0", "< 2.0.0"
spec.add_dependency "faraday", ">= 1"
spec.add_dependency "faraday-multipart", ">= 1"
end
20 changes: 18 additions & 2 deletions lib/anthropic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,25 @@ module Anthropic
class Error < StandardError; end
class ConfigurationError < Error; end

class MiddlewareErrors < Faraday::Middleware
def call(env)
@app.call(env)
rescue Faraday::Error => e
raise e unless e.response.is_a?(Hash)

logger = Logger.new($stdout)
logger.formatter = proc do |_severity, _datetime, _progname, msg|
"\033[31mAnthropic HTTP Error (spotted in ruby-anthropic #{VERSION}): #{msg}\n\033[0m"
end
logger.error(e.response[:body])

raise e
end
end

class Configuration
attr_writer :access_token
attr_accessor :anthropic_version, :api_version, :extra_headers, :organization_id,
attr_accessor :anthropic_version, :api_version, :extra_headers,
:request_timeout, :uri_base

DEFAULT_API_VERSION = "v1".freeze
Expand All @@ -23,9 +39,9 @@ def initialize
@access_token = nil
@api_version = DEFAULT_API_VERSION
@anthropic_version = DEFAULT_ANTHROPIC_VERSION
@organization_id = nil
@uri_base = DEFAULT_URI_BASE
@request_timeout = DEFAULT_REQUEST_TIMEOUT
@extra_headers = {}
end

def access_token
Expand Down
68 changes: 58 additions & 10 deletions lib/anthropic/client.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,75 @@
module Anthropic
class Client
extend Anthropic::HTTP
include Anthropic::HTTP

def initialize(access_token: nil, organization_id: nil, uri_base: nil, request_timeout: nil,
extra_headers: {})
Anthropic.configuration.access_token = access_token if access_token
Anthropic.configuration.organization_id = organization_id if organization_id
Anthropic.configuration.uri_base = uri_base if uri_base
Anthropic.configuration.request_timeout = request_timeout if request_timeout
Anthropic.configuration.extra_headers = extra_headers
CONFIG_KEYS = %i[
access_token
anthropic_version
api_version
uri_base
request_timeout
extra_headers
].freeze

def initialize(config = {}, &faraday_middleware)
CONFIG_KEYS.each do |key|
# Set instance variables like api_type & access_token. Fall back to global config
# if not present.
instance_variable_set(
"@#{key}",
config[key].nil? ? Anthropic.configuration.send(key) : config[key]
)
end
@faraday_middleware = faraday_middleware
end

# @deprecated (but still works while Anthropic API responds to it)
def complete(parameters: {})
parameters[:prompt] = wrap_prompt(prompt: parameters[:prompt])
Anthropic::Client.json_post(path: "/complete", parameters: parameters)
json_post(path: "/complete", parameters: parameters)
end

# Anthropic API Parameters as of 2024-05-07:
# @see https://docs.anthropic.com/claude/reference/messages_post
#
# @param [Hash] parameters
# @option parameters [Array] :messages - Required. An array of messages to send to the API. Each
# message should have a role and content. Single message example:
# +[{ role: "user", content: "Hello, Claude!" }]+
# @option parameters [String] :model - see https://docs.anthropic.com/claude/docs/models-overview
# @option parameters [Integer] :max_tokens - Required, must be less than 4096 - @see https://docs.anthropic.com/claude/docs/models-overview
# @option parameters [String] :system - Optional but recommended. @see https://docs.anthropic.com/claude/docs/system-prompts
# @option parameters [Float] :temperature - Optional, defaults to 1.0
# @option parameters [Proc] :stream - Optional, if present, must be a Proc that will receive the
# content fragments as they come in
# @option parameters [String] :preprocess_stream - If true, the streaming Proc will be pre-
# processed. Specifically, instead of being passed a raw Hash like:
# {"type"=>"content_block_delta", "index"=>0, "delta"=>{"type"=>"text_delta", "text"=>" of"}}
# the Proc will instead be passed something nicer. If +preprocess_stream+ is set to +"json"+
# or +:json+, then the Proc will only receive full json objects, one at a time.
# If +preprocess_stream+ is set to +"text"+ or +:text+ then the Proc will receive two
# arguments: the first will be the text accrued so far, and the second will be the delta
# just received in the current chunk.
#
# @returns [Hash] the response from the API (after the streaming is done, if streaming)
# @example:
# {
# "id" => "msg_013xVudG9xjSvLGwPKMeVXzG",
# "type" => "message",
# "role" => "assistant",
# "content" => [{"type" => "text", "text" => "The sky has no distinct"}],
# "model" => "claude-2.1",
# "stop_reason" => "max_tokens",
# "stop_sequence" => nil,
# "usage" => {"input_tokens" => 15, "output_tokens" => 5}
# }
def messages(parameters: {})
Anthropic::Client.json_post(path: "/messages", parameters: parameters)
json_post(path: "/messages", parameters: parameters)
end

private

# Used only by @deprecated +complete+ method
def wrap_prompt(prompt:, prefix: "\n\nHuman: ", suffix: "\n\nAssistant:")
return if prompt.nil?

Expand Down
Loading

0 comments on commit faf9a01

Please sign in to comment.