This project is under development.
Python client for Liftbridge, a system that provides lightweight, fault-tolerant message streams for NATS.
Liftbridge provides the following high-level features:
- Log-based API for NATS
- Replicated for fault-tolerance
- Horizontally scalable
- Wildcard subscription support
- At-least-once delivery support and message replay
- Message key-value support
- Log compaction by key
$ pip install python-liftbridge
from python_liftbridge import Lift, Message, Stream, ErrStreamExists
# Create a Liftbridge client.
client = Lift(ip_address='localhost:9292', timeout=5)
# Create a Liftbridge stream with name "foo-stream"
try:
# create a stream with 5 partitions. default value is 1
client.create_stream(Stream(subject='foo', name='foo-stream', partitions=5))
except ErrStreamExists:
print('This stream already exists!')
# Publish a message to the stream with the name "foo-stream".
client.publish(Message(value='hello', stream='foo-stream'))
# Subscribe to the stream starting from the beginning.
for message in client.subscribe(
Stream(
subject='foo',
name='foo-stream',
subscribe_to_partition=0 # subscribe to specific partition of stream. default value is 0
).start_at_earliest_received(),
):
print("Received: '{}'".format(message.value))
Streams are a durable message log attached to a NATS subject. They record messages published to the subject for consumption.
Streams have a few key properties: a subject, which is the corresponding NATS subject, a name, which is a human-readable identifier for the stream, and a replication factor, which is the number of nodes the stream should be replicated to for redundancy. Optionally, there is a group which is the name of a load-balance group for the stream to join. Also you can specify partitions count. When there are multiple streams in the same group, messages will be balanced among them.
"""
Create a stream attached to the NATS subject "foo.*" that is replicated to
all the brokers in the cluster. ErrStreamExists is returned if a stream with
the given name already exists for the subject.
"""
client.create_stream(Stream(subject='foo.*', name='my-stream', max_replication=True, partitions=3))
Subscriptions are how Liftbridge streams are consumed. Clients can choose where to start consuming messages from in a stream. This is controlled using options passed to Subscribe.
# Subscribe starting with new messages only.
client.subscribe(
Stream(subject='foo', name='foo-stream')
)
# Subscribe starting with the most recently published value.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_earliest_received()
)
# Subscribe starting with the oldest published value.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_latest_received()
)
# Subscribe starting at a specific offset.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_offset(4)
)
# Subscribe starting at a specific time.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_time(datetime.now())
)
# Subscribe starting at a specific amount of time in the past.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_time_delta(timedelta(days=1))
)
# Subscribe specific partition
client.subscribe(
Stream(subject='foo', name='foo-stream', subscribe_to_partition=2).start_at_latest_received()
)
A publish API is provided to make it easy to write messages to streams. This includes a number of options for decorating messages with metadata like a message key.
Keys are used by Liftbridge's log compaction. When enabled, Liftbridge streams will retain only the last message for a given key.
# Publish a message with a key
client.publish(Message(stream='foo-stream', value='Hello', key='key'))
Also, it is possible to publish a message to the NATS subject (and, in turn, any streams that match the subject).
# Publish a message to the NATS subject
client.publish_to_subject(Message(subject='foo', value='Hello foo'))
And you can publish message to specific stream partition.
# Publish a message to the NATS subject
client.publish_to_subject(Message(subject='foo', value='Hello foo', partition=3))
Since Liftbridge is an extension of NATS, a NATS client can also be used to publish messages. This means existing NATS publishers do not need any changes for messages to be consumed in Liftbridge.
- Check for open issues or open a fresh issue to start a discussion around a feature idea or a bug.
- Fork the repository on GitHub to start making your changes to the master branch (or branch off of it).
- Write a test which shows that the bug was fixed or that the feature works as expected.
- Send a pull request and bug me until it gets merged and published.
Some things on the backlog:
- Add documentation (Sphynx)
- Add CI (CircleCI or TravisCI)
- Add tests
- Add code coverage
- Add TLS support for gRPC
- Add message headers support
- Add message ACK support (scaffolding is already done)
- Add method to close connection
- Add async client
- Add gRPC connection pool
- Add logging (and remove all the random prints)
- Add proper docstrings
- Add version file
- Add Contributing.md and explanation of the workflow (pyenv,tox,make,pre-commit...)
- Improve fetch metadata
- Improve error handling
- Add to the makefile run-liftbridge using Docker container
- Better instrumentation/observability (OpenCensus support?)