Skip to content

Commit

Permalink
Refactored presenceMap, removed unnecessary initial sync references
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Sep 4, 2023
1 parent e69a0c5 commit 3714e94
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 102 deletions.
52 changes: 24 additions & 28 deletions src/IO.Ably.Shared/Realtime/Presence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public sealed partial class Presence : IDisposable
internal Presence(IConnectionManager connection, RealtimeChannel channel, string clientId, ILogger logger)
{
Logger = logger;
Map = new PresenceMap(channel.Name, logger);
InternalMap = new InternalPresenceMap(channel.Name, logger); // RTP17h
MembersMap = new PresenceMap(channel.Name, logger);
InternalMembersMap = new InternalPresenceMap(channel.Name, logger);
PendingPresenceQueue = new ConcurrentQueue<QueuedPresenceMessage>();
_connection = connection;
_channel = channel;
Expand All @@ -41,21 +41,22 @@ internal Presence(IConnectionManager connection, RealtimeChannel channel, string
/// <summary>
/// Has the sync completed.
/// </summary>
public bool SyncComplete => Map.SyncCompleted;
public bool SyncComplete => MembersMap.SyncCompleted;

/// <summary>
/// Indicates whether there is currently a sync in progress.
/// </summary>
public bool IsSyncInProgress => Map.IsSyncInProgress;
public bool IsSyncInProgress => MembersMap.IsSyncInProgress;

internal bool InternalSyncComplete => !Map.IsSyncInProgress && SyncComplete;

internal PresenceMap Map { get; }
/// <summary>
/// Indicates all members present on the channel.
/// </summary>
internal PresenceMap MembersMap { get; } // RTP2

/// <summary>
/// Indicates members belonging to current connectionId.
/// </summary>
internal PresenceMap InternalMap { get; } // RTP17
internal PresenceMap InternalMembersMap { get; } // RTP17h

internal ConcurrentQueue<QueuedPresenceMessage> PendingPresenceQueue { get; }

Expand Down Expand Up @@ -101,7 +102,7 @@ public async Task<IEnumerable<PresenceMessage>> GetAsync(GetParams options)
_ = await WaitForSyncAsync();
}

var result = Map.Values.Where(x => (getOptions.ClientId.IsEmpty() || x.ClientId == getOptions.ClientId)
var result = MembersMap.Values.Where(x => (getOptions.ClientId.IsEmpty() || x.ClientId == getOptions.ClientId)
&& (getOptions.ConnectionId.IsEmpty() || x.ConnectionId == getOptions.ConnectionId));
return result;
}
Expand Down Expand Up @@ -152,7 +153,7 @@ private async Task<bool> WaitForSyncAsync()
// The InternalSync should be completed and the channels Attached or Attaching
void CheckAndSet()
{
if (InternalSyncComplete
if (SyncComplete
&& (_channel.State == ChannelState.Attached || _channel.State == ChannelState.Attaching))
{
tsc.TrySetResult(true);
Expand All @@ -171,15 +172,15 @@ void OnChannelStateChanged(object sender, ChannelStateChange args)
void OnSyncEvent(object sender, EventArgs args) => CheckAndSet();

_channel.StateChanged += OnChannelStateChanged;
Map.SyncNoLongerInProgress += OnSyncEvent;
SyncCompleted += OnSyncEvent;

// Do a manual check in case we are already in the desired state
CheckAndSet();
bool syncIsComplete = await tsc.Task;

// unsubscribe from events
_channel.StateChanged -= OnChannelStateChanged;
Map.SyncNoLongerInProgress -= OnSyncEvent;
SyncCompleted -= OnSyncEvent;

if (!syncIsComplete)
{
Expand Down Expand Up @@ -529,7 +530,7 @@ internal void OnSyncMessage(ProtocolMessage protocolMessage)
/* If a new sequence identifier is sent from Ably, then the client library
* must consider that to be the start of a new sync sequence
* and any previous in-flight sync should be discarded. (part of RTP18)*/
if (Map.IsSyncInProgress && _currentSyncChannelSerial != syncSequenceId)
if (MembersMap.IsSyncInProgress && _currentSyncChannelSerial != syncSequenceId)
{
EndSync();
}
Expand Down Expand Up @@ -570,18 +571,18 @@ internal void OnPresence(PresenceMessage[] messages)
case PresenceAction.Enter:
case PresenceAction.Update:
case PresenceAction.Present:
broadcast &= Map.Put(message);
broadcast &= MembersMap.Put(message);
if (updateInternalPresence)
{
InternalMap.Put(message);
InternalMembersMap.Put(message);
}

break;
case PresenceAction.Leave:
broadcast &= Map.Remove(message);
broadcast &= MembersMap.Remove(message);
if (updateInternalPresence && !message.IsSynthesized())
{
InternalMap.Remove(message);
InternalMembersMap.Remove(message);
}

break;
Expand Down Expand Up @@ -611,7 +612,7 @@ internal void StartSync()
{
if (!IsSyncInProgress)
{
Map.StartSync();
MembersMap.StartSync();
}
}

Expand All @@ -623,7 +624,7 @@ private void EndSync()
}

// RTP19
var localNonUpdatedMembersDuringSync = Map.EndSync();
var localNonUpdatedMembersDuringSync = MembersMap.EndSync();
foreach (var presenceMember in localNonUpdatedMembersDuringSync)
{
presenceMember.Action = PresenceAction.Leave;
Expand All @@ -639,7 +640,7 @@ private void EndSync()
private void EnterMembersFromInternalPresenceMap()
{
// RTP17g
foreach (var item in InternalMap.Values)
foreach (var item in InternalMembersMap.Values)
{
try
{
Expand Down Expand Up @@ -707,8 +708,8 @@ private void NotifySubscribers(PresenceMessage message)
internal void ChannelDetachedOrFailed(ErrorInfo error)
{
FailQueuedMessages(error);
Map.Clear();
InternalMap.Clear();
MembersMap.Clear();
InternalMembersMap.Clear();
}

// RTP5f
Expand Down Expand Up @@ -842,15 +843,10 @@ private void NotifySyncCompleted()
internal JToken GetState() => new JObject
{
["handlers"] = _handlers.GetState(),
["members"] = Map.GetState(),
["members"] = MembersMap.GetState(),
["pendingQueue"] = new JArray(PendingPresenceQueue.Select(x => JObject.FromObject(x.Message))),
};

private void OnInitialSyncCompleted()
{
InitialSyncCompleted?.Invoke(this, EventArgs.Empty);
}

/// <summary>
/// Dispose(bool disposing) executes in two distinct scenarios. If disposing equals true, the method has
/// been called directly or indirectly by a user's code. Managed and unmanaged resources can be disposed.
Expand Down
34 changes: 19 additions & 15 deletions src/IO.Ably.Shared/Realtime/PresenceMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal class PresenceMap

private ICollection<string> _beforeSyncMembers;
private bool _isSyncInProgress;
private bool _isSyncCompleted;

public PresenceMap(string channelName, ILogger logger)
{
Expand All @@ -28,8 +29,6 @@ internal virtual string GetKey(PresenceMessage presence)
return presence.MemberKey;
}

internal event EventHandler SyncNoLongerInProgress;

// Exposed internally to allow for testing.
internal ConcurrentDictionary<string, PresenceMessage> Members => _members;

Expand All @@ -47,19 +46,29 @@ private set
{
lock (_lock)
{
var previous = _isSyncInProgress;
_isSyncInProgress = value;

// if we have gone from true to false then fire SyncNoLongerInProgress
if (previous && !_isSyncInProgress)
{
OnSyncNoLongerInProgress();
}
}
}
}

public bool SyncCompleted { get; private set; }
public bool SyncCompleted
{
get
{
lock (_lock)
{
return _isSyncCompleted;
}
}

private set
{
lock (_lock)
{
_isSyncCompleted = value;
}
}
}

public PresenceMessage[] Values
{
Expand Down Expand Up @@ -222,11 +231,6 @@ internal JObject GetState()

return state;
}

private void OnSyncNoLongerInProgress()
{
SyncNoLongerInProgress?.Invoke(this, EventArgs.Empty);
}
}

// RTP17h
Expand Down
Loading

0 comments on commit 3714e94

Please sign in to comment.