Skip to content

Commit

Permalink
No public description
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 595322747
  • Loading branch information
gribozavr authored and copybara-github committed Jan 3, 2024
1 parent b34da54 commit 6dc68f2
Show file tree
Hide file tree
Showing 23 changed files with 190 additions and 180 deletions.
10 changes: 5 additions & 5 deletions mediapipe/framework/profiler/circular_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ char CircularBuffer<T>::MaxLap(char u, char v) {

template <typename T>
class CircularBuffer<T>::iterator
: public std::iterator<std::random_access_iterator_tag, T, int64> {
: public std::iterator<std::random_access_iterator_tag, T, int64_t> {
public:
explicit iterator(const CircularBuffer* buffer, size_t index)
: buffer_(buffer), index_(index) {}
Expand All @@ -177,10 +177,10 @@ class CircularBuffer<T>::iterator
T operator*() const { return buffer_->GetAbsolute(index_); }
T* operator->() const { &buffer_->GetAbsolute(index_); }
iterator& operator++() { return (*this) += 1; }
iterator& operator+=(const int64& num) { return index_ += num, *this; }
int64 operator-(const iterator& it) const { return index_ - it.index_; }
iterator& operator+(const int64& num) { return iterator(*this) += num; }
iterator& operator-(const int64& num) { return iterator(*this) += -num; }
iterator& operator+=(const int64_t& num) { return index_ += num, *this; }
int64_t operator-(const iterator& it) const { return index_ - it.index_; }
iterator& operator+(const int64_t& num) { return iterator(*this) += num; }
iterator& operator-(const int64_t& num) { return iterator(*this) += -num; }

private:
const CircularBuffer* buffer_;
Expand Down
64 changes: 33 additions & 31 deletions mediapipe/framework/profiler/graph_profiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ bool IsTraceIntervalEnabled(const ProfilerConfig& profiler_config,
}

using PacketInfoMap =
ShardedMap<std::string, std::list<std::pair<int64, PacketInfo>>>;
ShardedMap<std::string, std::list<std::pair<int64_t, PacketInfo>>>;

// Inserts a PacketInfo into a PacketInfoMap.
void InsertPacketInfo(PacketInfoMap* map, const PacketId& packet_id,
Expand Down Expand Up @@ -162,9 +162,9 @@ void GraphProfiler::Initialize(
ABSL_CHECK(!is_initialized_)
<< "Cannot initialize the profiler for the same graph multiple times.";
profiler_config_ = validated_graph_config.Config().profiler_config();
int64 interval_size_usec = profiler_config_.histogram_interval_size_usec();
int64_t interval_size_usec = profiler_config_.histogram_interval_size_usec();
interval_size_usec = interval_size_usec ? interval_size_usec : 1000000;
int64 num_intervals = profiler_config_.num_histogram_intervals();
int64_t num_intervals = profiler_config_.num_histogram_intervals();
num_intervals = num_intervals ? num_intervals : 1;
if (IsTracerEnabled(profiler_config_)) {
packet_tracer_ = absl::make_unique<GraphTracer>(profiler_config_);
Expand Down Expand Up @@ -323,7 +323,7 @@ void GraphProfiler::AddPacketInfo(const TraceEvent& packet_info) {
return;
}

int64 production_time_usec =
int64_t production_time_usec =
profiler_config_.use_packet_timestamp_for_added_packet()
? packet_timestamp.Value()
: TimeNowUsec();
Expand All @@ -342,8 +342,8 @@ absl::Status GraphProfiler::GetCalculatorProfiles(
return absl::OkStatus();
}

void GraphProfiler::InitializeTimeHistogram(int64 interval_size_usec,
int64 num_intervals,
void GraphProfiler::InitializeTimeHistogram(int64_t interval_size_usec,
int64_t num_intervals,
TimeHistogram* histogram) {
histogram->set_interval_size_usec(interval_size_usec);
histogram->set_num_intervals(num_intervals);
Expand All @@ -355,8 +355,8 @@ void GraphProfiler::InitializeOutputStreams(
const CalculatorGraphConfig::Node& node_config) {}

void GraphProfiler::InitializeInputStreams(
const CalculatorGraphConfig::Node& node_config, int64 interval_size_usec,
int64 num_intervals, CalculatorProfile* calculator_profile) {
const CalculatorGraphConfig::Node& node_config, int64_t interval_size_usec,
int64_t num_intervals, CalculatorProfile* calculator_profile) {
std::shared_ptr<tool::TagMap> input_tag_map =
TagMap::Create(node_config.input_stream()).value();
std::set<int> back_edge_ids = GetBackEdgeIds(node_config, *input_tag_map);
Expand Down Expand Up @@ -405,15 +405,15 @@ void GraphProfiler::ResetTimeHistogram(TimeHistogram* histogram) {
}

void GraphProfiler::AddPacketInfoInternal(const PacketId& packet_id,
int64 production_time_usec,
int64 source_process_start_usec) {
int64_t production_time_usec,
int64_t source_process_start_usec) {
PacketInfo packet_info = {0, production_time_usec, source_process_start_usec};
InsertPacketInfo(&packets_info_, packet_id, packet_info);
}

void GraphProfiler::AddPacketInfoForOutputPackets(
const OutputStreamShardSet& output_stream_shard_set,
int64 production_time_usec, int64 source_process_start_usec) {
int64_t production_time_usec, int64_t source_process_start_usec) {
for (const OutputStreamShard& output_stream_shard : output_stream_shard_set) {
for (const Packet& output_packet : *output_stream_shard.OutputQueue()) {
AddPacketInfoInternal(PacketId({output_stream_shard.Name(),
Expand All @@ -423,11 +423,11 @@ void GraphProfiler::AddPacketInfoForOutputPackets(
}
}

int64 GraphProfiler::AddStreamLatencies(
const CalculatorContext& calculator_context, int64 start_time_usec,
int64 end_time_usec, CalculatorProfile* calculator_profile) {
int64_t GraphProfiler::AddStreamLatencies(
const CalculatorContext& calculator_context, int64_t start_time_usec,
int64_t end_time_usec, CalculatorProfile* calculator_profile) {
// Update input streams profiles.
int64 min_source_process_start_usec = AddInputStreamTimeSamples(
int64_t min_source_process_start_usec = AddInputStreamTimeSamples(
calculator_context, start_time_usec, calculator_profile);

// Update output production times.
Expand All @@ -437,14 +437,15 @@ int64 GraphProfiler::AddStreamLatencies(
}

void GraphProfiler::SetOpenRuntime(const CalculatorContext& calculator_context,
int64 start_time_usec, int64 end_time_usec) {
int64_t start_time_usec,
int64_t end_time_usec) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
}

const std::string& node_name = calculator_context.NodeName();
int64 time_usec = end_time_usec - start_time_usec;
int64_t time_usec = end_time_usec - start_time_usec;
auto profile_iter = calculator_profiles_.find(node_name);
ABSL_CHECK(profile_iter != calculator_profiles_.end()) << absl::Substitute(
"Calculator \"$0\" has not been added during initialization.",
Expand All @@ -459,14 +460,14 @@ void GraphProfiler::SetOpenRuntime(const CalculatorContext& calculator_context,
}

void GraphProfiler::SetCloseRuntime(const CalculatorContext& calculator_context,
int64 start_time_usec,
int64 end_time_usec) {
int64_t start_time_usec,
int64_t end_time_usec) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
}
const std::string& node_name = calculator_context.NodeName();
int64 time_usec = end_time_usec - start_time_usec;
int64_t time_usec = end_time_usec - start_time_usec;
auto profile_iter = calculator_profiles_.find(node_name);
ABSL_CHECK(profile_iter != calculator_profiles_.end()) << absl::Substitute(
"Calculator \"$0\" has not been added during initialization.",
Expand All @@ -480,7 +481,8 @@ void GraphProfiler::SetCloseRuntime(const CalculatorContext& calculator_context,
}
}

void GraphProfiler::AddTimeSample(int64 start_time_usec, int64 end_time_usec,
void GraphProfiler::AddTimeSample(int64_t start_time_usec,
int64_t end_time_usec,
TimeHistogram* histogram) {
if (end_time_usec < start_time_usec) {
ABSL_LOG(ERROR) << absl::Substitute(
Expand All @@ -489,21 +491,21 @@ void GraphProfiler::AddTimeSample(int64 start_time_usec, int64 end_time_usec,
return;
}

int64 time_usec = end_time_usec - start_time_usec;
int64_t time_usec = end_time_usec - start_time_usec;
histogram->set_total(histogram->total() + time_usec);
int64 interval_index = time_usec / histogram->interval_size_usec();
int64_t interval_index = time_usec / histogram->interval_size_usec();
if (interval_index > histogram->num_intervals() - 1) {
interval_index = histogram->num_intervals() - 1;
}
histogram->set_count(interval_index, histogram->count(interval_index) + 1);
}

int64 GraphProfiler::AddInputStreamTimeSamples(
const CalculatorContext& calculator_context, int64 start_time_usec,
int64_t GraphProfiler::AddInputStreamTimeSamples(
const CalculatorContext& calculator_context, int64_t start_time_usec,
CalculatorProfile* calculator_profile) {
int64 input_timestamp_usec = calculator_context.InputTimestamp().Value();
int64 min_source_process_start_usec = start_time_usec;
int64 input_stream_counter = -1;
int64_t input_timestamp_usec = calculator_context.InputTimestamp().Value();
int64_t min_source_process_start_usec = start_time_usec;
int64_t input_stream_counter = -1;
for (CollectionItemId id = calculator_context.Inputs().BeginId();
id < calculator_context.Inputs().EndId(); ++id) {
++input_stream_counter;
Expand Down Expand Up @@ -537,8 +539,8 @@ int64 GraphProfiler::AddInputStreamTimeSamples(
}

void GraphProfiler::AddProcessSample(
const CalculatorContext& calculator_context, int64 start_time_usec,
int64 end_time_usec) {
const CalculatorContext& calculator_context, int64_t start_time_usec,
int64_t end_time_usec) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
Expand All @@ -556,7 +558,7 @@ void GraphProfiler::AddProcessSample(
calculator_profile->mutable_process_runtime());

if (profiler_config_.enable_stream_latency()) {
int64 min_source_process_start_usec = AddStreamLatencies(
int64_t min_source_process_start_usec = AddStreamLatencies(
calculator_context, start_time_usec, end_time_usec, calculator_profile);
// Update input and output trace latencies.
AddTimeSample(min_source_process_start_usec, start_time_usec,
Expand Down
50 changes: 25 additions & 25 deletions mediapipe/framework/profiler/graph_profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct PacketId {
// Stream name, excluding TAG if available.
std::string stream_name;
// Timestamp of the packet.
int64 timestamp_usec;
int64_t timestamp_usec;

bool operator==(const PacketId& other) const {
return (stream_name == other.stream_name) &&
Expand All @@ -53,12 +53,12 @@ struct PacketId {
struct PacketInfo {
// Number of remained consumer of this packet.
// This is used to decide if this PacketInfo should be discarded.
int64 remaining_consumer_count;
int64_t remaining_consumer_count;
// Packet production time based on profiler's clock.
int64 production_time_usec;
int64_t production_time_usec;
// The time when the Process(), that generated the corresponding source
// packet, was started.
int64 source_process_start_usec;
int64_t source_process_start_usec;

// For testing.
bool operator==(const PacketInfo& other) const {
Expand Down Expand Up @@ -191,12 +191,12 @@ class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
}

inline ~Scope() {
int64 end_time_usec;
int64_t end_time_usec;
if (profiler_->is_profiling_ || profiler_->is_tracing_) {
end_time_usec = profiler_->TimeNowUsec();
}
if (profiler_->is_profiling_) {
int64 end_time_usec = profiler_->TimeNowUsec();
int64_t end_time_usec = profiler_->TimeNowUsec();
switch (calculator_method_) {
case GraphTrace::OPEN:
profiler_->SetOpenRuntime(calculator_context_, start_time_usec_,
Expand Down Expand Up @@ -227,7 +227,7 @@ class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
const GraphTrace::EventType calculator_method_;
const CalculatorContext& calculator_context_;
GraphProfiler* profiler_;
int64 start_time_usec_;
int64_t start_time_usec_;
};

const ProfilerConfig& profiler_config() { return profiler_config_; }
Expand All @@ -249,12 +249,12 @@ class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
// is valid for profiling.
void AddPacketInfo(const TraceEvent& packet_info)
ABSL_LOCKS_EXCLUDED(profiler_mutex_);
static void InitializeTimeHistogram(int64 interval_size_usec,
int64 num_intervals,
static void InitializeTimeHistogram(int64_t interval_size_usec,
int64_t num_intervals,
TimeHistogram* histogram);
static void ResetTimeHistogram(TimeHistogram* histogram);
// Add a sample to a time histogram.
static void AddTimeSample(int64 start_time_usec, int64 end_time_usec,
static void AddTimeSample(int64_t start_time_usec, int64_t end_time_usec,
TimeHistogram* histogram);

// Add output streams to the stream consumer count map.
Expand All @@ -266,43 +266,43 @@ class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
// not add them to |stream_consumer_counts_| to avoid using them for updating
// |source_process_start_usec| and garbage collection while profiling.
void InitializeInputStreams(const CalculatorGraphConfig::Node& node_config,
int64 interval_size_usec, int64 num_intervals,
int64_t interval_size_usec, int64_t num_intervals,
CalculatorProfile* calculator_profile);
// Returns the input stream back edges for a calculator.
std::set<int> GetBackEdgeIds(const CalculatorGraphConfig::Node& node_config,
const tool::TagMap& input_tag_map);

void AddPacketInfoInternal(const PacketId& packet_id,
int64 production_time_usec,
int64 source_process_start_usec);
int64_t production_time_usec,
int64_t source_process_start_usec);
// Adds packet info for non-empty output packets.
void AddPacketInfoForOutputPackets(
const OutputStreamShardSet& output_stream_shard_set,
int64 production_time_usec, int64 source_process_start_usec);
int64_t production_time_usec, int64_t source_process_start_usec);

// Updates the production time for outputs and the stream profile for inputs.
int64 AddStreamLatencies(const CalculatorContext& calculator_context,
int64 start_time_usec, int64 end_time_usec,
CalculatorProfile* calculator_profile);
int64_t AddStreamLatencies(const CalculatorContext& calculator_context,
int64_t start_time_usec, int64_t end_time_usec,
CalculatorProfile* calculator_profile);

void SetOpenRuntime(const CalculatorContext& calculator_context,
int64 start_time_usec, int64 end_time_usec)
int64_t start_time_usec, int64_t end_time_usec)
ABSL_LOCKS_EXCLUDED(profiler_mutex_);
void SetCloseRuntime(const CalculatorContext& calculator_context,
int64 start_time_usec, int64 end_time_usec)
int64_t start_time_usec, int64_t end_time_usec)
ABSL_LOCKS_EXCLUDED(profiler_mutex_);

// Updates the input streams profiles for the calculator and returns the
// minimum |source_process_start_usec| of all input packets, excluding empty
// packets and back-edge packets. Returns -1 if there is no input packets.
int64 AddInputStreamTimeSamples(const CalculatorContext& calculator_context,
int64 start_time_usec,
CalculatorProfile* calculator_profile);
int64_t AddInputStreamTimeSamples(const CalculatorContext& calculator_context,
int64_t start_time_usec,
CalculatorProfile* calculator_profile);

// Updates the Process() data for calculator.
// Requires ReaderLock for is_profiling_.
void AddProcessSample(const CalculatorContext& calculator_context,
int64 start_time_usec, int64 end_time_usec)
int64_t start_time_usec, int64_t end_time_usec)
ABSL_LOCKS_EXCLUDED(profiler_mutex_);

// Helper method to get trace_log_path. If the trace_log_path is empty and
Expand All @@ -311,7 +311,7 @@ class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
absl::StatusOr<std::string> GetTraceLogPath();

// Helper method to get the clock time in microsecond.
int64 TimeNowUsec() { return ToUnixMicros(clock_->TimeNow()); }
int64_t TimeNowUsec() { return ToUnixMicros(clock_->TimeNow()); }

private:
// The settings for this tracer.
Expand All @@ -332,7 +332,7 @@ class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
CalculatorProfileMap calculator_profiles_;
// Stores the production time of a packet, based on profiler's clock.
using PacketInfoMap =
ShardedMap<std::string, std::list<std::pair<int64, PacketInfo>>>;
ShardedMap<std::string, std::list<std::pair<int64_t, PacketInfo>>>;
PacketInfoMap packets_info_;

// Global mutex for the profiler.
Expand Down
4 changes: 3 additions & 1 deletion mediapipe/framework/profiler/graph_profiler_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef MEDIAPIPE_FRAMEWORK_PROFILER_MEDIAPIPE_PROFILER_STUB_H_
#define MEDIAPIPE_FRAMEWORK_PROFILER_MEDIAPIPE_PROFILER_STUB_H_

#include <cstdint>

#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/timestamp.h"

Expand Down Expand Up @@ -71,7 +73,7 @@ class TraceEvent {
inline TraceEvent& set_packet_data_id(const Packet* packet) { return *this; }
inline TraceEvent& set_thread_id(int thread_id) { return *this; }
inline TraceEvent& set_is_finish(bool is_finish) { return *this; }
inline TraceEvent& set_event_data(int64 data) { return *this; }
inline TraceEvent& set_event_data(int64_t data) { return *this; }
};

// GraphProfiler::CaptureProfile option, see the method for details.
Expand Down
Loading

0 comments on commit 6dc68f2

Please sign in to comment.