Skip to content

Commit

Permalink
Merge pull request #111 from xmtp/daniel-ephemera
Browse files Browse the repository at this point in the history
feat: add support for ephemeral messages
  • Loading branch information
dmccartney authored Jan 24, 2024
2 parents f10f5d0 + 7da9d89 commit 634597a
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 11 deletions.
13 changes: 13 additions & 0 deletions lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,15 @@ class Client implements Codec<DecodedContent> {
Stream<DecodedMessage> streamMessages(Conversation conversation) =>
_conversations.streamMessages([conversation]);

/// This exposes a stream of ephemeral messages sent to the [conversation].
Stream<DecodedMessage> streamEphemeralMessages(Conversation conversation) =>
_conversations.streamEphemeralMessages([conversation]);

/// This exposes a stream of ephemeral messages sent to any of [conversations].
Stream<DecodedMessage> streamBatchEphemeralMessages(
Iterable<Conversation> conversations) =>
_conversations.streamEphemeralMessages(conversations);

/// This exposes a stream of new messages sent to any of the [conversations].
Stream<DecodedMessage> streamBatchMessages(
Iterable<Conversation> conversations) =>
Expand All @@ -316,10 +325,13 @@ class Client implements Codec<DecodedContent> {
/// It returns the [DecodedMessage] to simplify optimistic local updates.
/// e.g. you can display the [DecodedMessage] immediately
/// without having to wait for it to come back down the stream.
/// When [isEphemeral] the message is only sent to [streamEphemeralMessages].
/// e.g. so you can send "typing..." or other live indicators
Future<DecodedMessage> sendMessage(
Conversation conversation,
Object content, {
xmtp.ContentTypeId? contentType,
bool isEphemeral = false,
// TODO: support fallback and compression
}) async {
// Sending a message implies allowing an unknown contact.
Expand All @@ -330,6 +342,7 @@ class Client implements Codec<DecodedContent> {
conversation,
content,
contentType: contentType,
isEphemeral: isEphemeral,
);
}

Expand Down
17 changes: 16 additions & 1 deletion lib/src/common/topic.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:typed_data';

import 'package:quiver/check.dart';
import 'package:web3dart/credentials.dart';
import 'package:xmtp_bindings_flutter/xmtp_bindings_flutter.dart';

Expand All @@ -10,7 +11,9 @@ import 'package:xmtp_bindings_flutter/xmtp_bindings_flutter.dart';
/// See [EIP 55](https://github.com/ethereum/EIPs/blob/master/EIPS/eip-55.md)
class Topic {
/// All topics are in this format.
static String _content(String name) => '/xmtp/0/$name/proto';
static String _content(String name) => '$_versionPrefix/$name/proto';

static const String _versionPrefix = "/xmtp/0";

/// This represents direct message conversation between `sender` and `recipient`.
/// NOTE: the addresses are normalized (EIP-55) and then sorted.
Expand All @@ -23,6 +26,18 @@ class Topic {
return _content('dm-${addresses.join('-')}');
}

/// This contains ephemeral messages belonging to the `conversationTopic`.
/// It knows how to create the ephemeral topic for both v1 and v2.
static String ephemeralMessage(String conversationTopic) {
checkArgument(
conversationTopic.startsWith('$_versionPrefix/dm-') ||
conversationTopic.startsWith('$_versionPrefix/m-'),
message: 'invalid conversation topic');
return conversationTopic
.replaceFirst('$_versionPrefix/dm-', '$_versionPrefix/dmE-')
.replaceFirst('$_versionPrefix/m-', '$_versionPrefix/mE-');
}

/// This represents a message conversation.
static String messageV2(String randomString) => _content('m-$randomString');

Expand Down
6 changes: 6 additions & 0 deletions lib/src/conversation/conversation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class Conversation {
/// NOTE: this is a good identifier for local caching purposes.
final String topic;

/// This is the ephemeral message topic for this conversation.
final String ephemeralTopic;

/// This distinctly identifies between two addresses.
/// Note: this will be empty for older v1 conversations.
final String conversationId;
Expand Down Expand Up @@ -43,6 +46,8 @@ class Conversation {
required this.peer,
}) : version = xmtp.Message_Version.v1,
topic = Topic.directMessageV1(me.hex, peer.hex),
ephemeralTopic =
Topic.ephemeralMessage(Topic.directMessageV1(me.hex, peer.hex)),
conversationId = "",
metadata = <String, String>{},
invite = xmtp.InvitationV1();
Expand All @@ -54,6 +59,7 @@ class Conversation {
required this.peer,
}) : version = xmtp.Message_Version.v2,
topic = invite.topic,
ephemeralTopic = Topic.ephemeralMessage(invite.topic),
conversationId = invite.context.conversationId,
metadata = invite.context.metadata;

Expand Down
22 changes: 20 additions & 2 deletions lib/src/conversation/conversation_v1.dart
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,21 @@ class ConversationManagerV1 {
.map((msg) => msg!);
}

Stream<DecodedMessage> streamEphemeralMessages(
Iterable<Conversation> conversations) {
if (conversations.isEmpty) {
return const Stream.empty();
}
return _api.client
.subscribe(xmtp.SubscribeRequest(
contentTopics: conversations.map((c) => c.ephemeralTopic)))
.map((e) => xmtp.Message.fromBuffer(e.message))
.asyncMap((msg) => _decodedFromMessage(msg))
// Remove nulls (which are discarded bad envelopes).
.where((msg) => msg != null)
.map((msg) => msg!);
}

/// This decrypts and decodes the [xmtp.Message].
///
/// It returns `null` when the message could not be decoded.
Expand Down Expand Up @@ -193,16 +208,18 @@ class ConversationManagerV1 {
Conversation conversation,
Object content, {
xmtp.ContentTypeId? contentType,
bool isEphemeral = false,
}) async {
contentType ??= contentTypeText;
var encoded = await _codecs.encode(DecodedContent(contentType, content));
var sent = await sendMessageEncoded(conversation, encoded);
var sent = await sendMessageEncoded(conversation, encoded, isEphemeral);
return sent!;
}

Future<DecodedMessage?> sendMessageEncoded(
Conversation conversation,
xmtp.EncodedContent encoded,
bool isEphemeral,
) async {
var peerContact = await _contacts.getUserContactV1(conversation.peer.hex);
var encrypted = await encryptMessageV1(
Expand All @@ -217,7 +234,8 @@ class ConversationManagerV1 {
}
await _api.client.publish(xmtp.PublishRequest(envelopes: [
xmtp.Envelope(
contentTopic: conversation.topic,
contentTopic:
isEphemeral ? conversation.ephemeralTopic : conversation.topic,
timestampNs: nowNs(),
message: msg.writeToBuffer(),
),
Expand Down
26 changes: 24 additions & 2 deletions lib/src/conversation/conversation_v2.dart
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ class ConversationManagerV2 {
Conversation conversation,
Object content, {
xmtp.ContentTypeId? contentType,
bool isEphemeral = false,
}) async {
contentType ??= contentTypeText;
var encoded = await _codecs.encode(DecodedContent(contentType, content));
var sent = await sendMessageEncoded(conversation, encoded);
var sent = await sendMessageEncoded(conversation, encoded, isEphemeral);
return sent!;
}

Expand All @@ -182,6 +183,7 @@ class ConversationManagerV2 {
Future<DecodedMessage?> sendMessageEncoded(
Conversation conversation,
xmtp.EncodedContent encoded,
bool isEphemeral,
) async {
var now = nowNs();
var header = xmtp.MessageHeaderV2(
Expand All @@ -193,7 +195,8 @@ class ConversationManagerV2 {
var dm = xmtp.Message(v2: encrypted);
await api.client.publish(xmtp.PublishRequest(envelopes: [
xmtp.Envelope(
contentTopic: conversation.topic,
contentTopic:
isEphemeral ? conversation.ephemeralTopic : conversation.topic,
timestampNs: now,
message: dm.writeToBuffer(),
),
Expand Down Expand Up @@ -261,6 +264,25 @@ class ConversationManagerV2 {
.map((msg) => msg!);
}

/// This exposes the stream of ephemeral messages in the [conversations].
Stream<DecodedMessage> streamEphemeralMessages(
Iterable<Conversation> conversations) {
if (conversations.isEmpty) {
return const Stream.empty();
}
var convoByTopic = {for (var c in conversations) c.ephemeralTopic: c};
return api.client
.subscribe(xmtp.SubscribeRequest(contentTopics: convoByTopic.keys))
.where((e) => convoByTopic.containsKey(e.contentTopic))
.asyncMap((e) => _decodedFromMessage(
convoByTopic[e.contentTopic]!,
xmtp.Message.fromBuffer(e.message),
))
// Remove nulls (which are discarded bad envelopes).
.where((msg) => msg != null)
.map((msg) => msg!);
}

/// This decrypts and decodes the [xmtp.Message].
///
/// It returns `null` when the message could not be decoded.
Expand Down
27 changes: 21 additions & 6 deletions lib/src/conversation/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,38 @@ class ConversationManager {
conversations.where((c) => c.version == xmtp.Message_Version.v2)),
]);

/// This exposes a stream of ephemeral messages in [conversations].
Stream<DecodedMessage> streamEphemeralMessages(
Iterable<Conversation> conversations,
) =>
StreamGroup.merge([
_v1.streamEphemeralMessages(
conversations.where((c) => c.version == xmtp.Message_Version.v1)),
_v2.streamEphemeralMessages(
conversations.where((c) => c.version == xmtp.Message_Version.v2)),
]);

/// This sends [content] as a message to [conversation].
Future<DecodedMessage> sendMessage(
Conversation conversation,
Object content, {
xmtp.ContentTypeId? contentType,
bool isEphemeral = false,
}) =>
conversation.version == xmtp.Message_Version.v1
? _v1.sendMessage(conversation, content, contentType: contentType)
: _v2.sendMessage(conversation, content, contentType: contentType);
? _v1.sendMessage(conversation, content,
contentType: contentType, isEphemeral: isEphemeral)
: _v2.sendMessage(conversation, content,
contentType: contentType, isEphemeral: isEphemeral);

/// This sends the [encoded] message to the [conversation].
/// If it cannot be decoded then it still sends but this returns `null`.
Future<DecodedMessage?> sendMessageEncoded(
Conversation conversation,
xmtp.EncodedContent encoded,
) =>
xmtp.EncodedContent encoded, {
bool isEphemeral = false,
}) =>
conversation.version == xmtp.Message_Version.v1
? _v1.sendMessageEncoded(conversation, encoded)
: _v2.sendMessageEncoded(conversation, encoded);
? _v1.sendMessageEncoded(conversation, encoded, isEphemeral)
: _v2.sendMessageEncoded(conversation, encoded, isEphemeral);
}
52 changes: 52 additions & 0 deletions test/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,58 @@ void main() {
},
);

test(
skip: skipUnlessTestServerEnabled,
"codecs: sending and streaming ephemeral messages",
() async {
var aliceWallet = EthPrivateKey.createRandom(Random.secure()).asSigner();
var bobWallet = EthPrivateKey.createRandom(Random.secure()).asSigner();
var aliceApi = createTestServerApi();
var bobApi = createTestServerApi();
var alice = await Client.createFromWallet(aliceApi, aliceWallet);
var bob = await Client.createFromWallet(bobApi, bobWallet);
await delayToPropagate();

// Alice starts a conversation w/ Bob
var convo = await alice.newConversation(bob.address.hex);
await delayToPropagate();
await alice.sendMessage(convo, "Hello");
await delayToPropagate();

// Bob should see that first message.
expect((await bob.listMessages(convo)).map((m) => m.content).toList(), [
"Hello",
]);

// Bob starts listening to the stream of ephemera.
var ephemera = [];
var bobListening = bob
.streamEphemeralMessages(convo)
.listen((msg) => ephemera.add('${msg.sender.hex}> ${msg.content}'));
await delayToPropagate();

// Alice sends an ephemeral "typing..." indicator (text for now)
await alice.sendMessage(convo, "typing...", isEphemeral: true);
await delayToPropagate();

// Bob should see the ephemeral indicator
expect(ephemera, [
"${alice.address.hex}> typing...",
]);

// Then Alice sends the actual message she typed.
await alice.sendMessage(convo, "I'm hungry, let's get lunch");
await delayToPropagate();

// And Bob should now see those two messages (w/o the ephemera)
expect((await bob.listMessages(convo)).map((m) => m.content).toList(), [
"I'm hungry, let's get lunch",
"Hello",
]);
await bobListening.cancel();
},
);

// This uses a custom codec to send integers between two people.
test(
skip: skipUnlessTestServerEnabled,
Expand Down
28 changes: 28 additions & 0 deletions test/common/topic_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,32 @@ void main() {
var identifier = await generateUserPreferencesIdentifier(key);
expect(identifier, "EBsHSM9lLmELuUVCMJ-tPE0kDcok1io9IwUO6WPC-cM");
});

test('ephemeralMessage v1', () {
// V1 ephemeral topic should be `dmE-` instead of `dm-`.
var addressA = "0x7E75Ee2a9f7D65E49cd8619b24B5731EbFa8064C"; // random
var addressB = "0xFE4464Ea091FE9A8bE25BF7B413616b087F1D896"; // random
var conversationTopic = Topic.directMessageV1(addressA, addressB);
expect(
conversationTopic,
'/xmtp/0/dm-$addressA-$addressB/proto',
);
expect(
Topic.ephemeralMessage(conversationTopic),
'/xmtp/0/dmE-$addressA-$addressB/proto',
);
});

test('ephemeralMessage v2', () {
var randomId = "abc123";
var conversationTopic = Topic.messageV2(randomId);
expect(
conversationTopic,
'/xmtp/0/m-$randomId/proto',
);
expect(
Topic.ephemeralMessage(conversationTopic),
'/xmtp/0/mE-$randomId/proto',
);
});
}

0 comments on commit 634597a

Please sign in to comment.