diff --git a/examples/cifar10/train.py b/examples/cifar10/train.py index b2ab4af689..9b07991fb4 100644 --- a/examples/cifar10/train.py +++ b/examples/cifar10/train.py @@ -31,24 +31,25 @@ import os import argparse +# sys.path.append(os.path.join(os.path.dirname(__file__), '../../build/python')) from singa import utils from singa import optimizer from singa import device from singa import tensor +from singa.proto import core_pb2 from caffe import caffe_net import cnn import vgg import resnet +from datetime import datetime +import time def load_dataset(filepath): print('Loading data file %s' % filepath) with open(filepath, 'rb') as fd: - try: - cifar10 = pickle.load(fd, encoding='latin1') - except TypeError: - cifar10 = pickle.load(fd) + cifar10 = pickle.load(fd) image = cifar10['data'].astype(dtype=np.uint8) image = image.reshape((-1, 3, 32, 32)) label = np.asarray(cifar10['labels'], dtype=np.uint8) @@ -129,7 +130,7 @@ def train(data, net, max_epoch, get_lr, weight_decay, batch_size=100, dev = device.get_default_device() else: print('Using GPU') - dev = device.create_cuda_gpu() + dev = device.create_cuda_gpu_on(0) net.to_device(dev) opt = optimizer.SGD(momentum=0.9, weight_decay=weight_decay) @@ -137,16 +138,27 @@ def train(data, net, max_epoch, get_lr, weight_decay, batch_size=100, opt.register(p, specs) tx = tensor.Tensor((batch_size, 3, 32, 32), dev) - ty = tensor.Tensor((batch_size,), dev, tensor.int32) + ty = tensor.Tensor((batch_size,), dev, core_pb2.kInt) train_x, train_y, test_x, test_y = data num_train_batch = train_x.shape[0] // batch_size num_test_batch = test_x.shape[0] // batch_size idx = np.arange(train_x.shape[0], dtype=np.int32) - for epoch in range(max_epoch): + fileTimeLog =open("epochTimeLog.text","a") + for epoch in range(1): np.random.shuffle(idx) loss, acc = 0.0, 0.0 print('Epoch %d' % epoch) - for b in range(num_train_batch): + print(datetime.now().timetz()) # miliseconds + print(int(round(time.time()*1000))) + fileTimeLog.write('Epoch %d: ' % epoch) + fileTimeLog.write(str(int(round(time.time()*1000)))) + fileTimeLog.write('\n') + for b in range(20): #num_train_batch): + print ("start of iteration %d: " %b) + #time.sleep(1) + fileTimeLog.write('iteration %d: ' % b) + fileTimeLog.write(str(int(round(time.time()*1000)))) + fileTimeLog.write('\n') x = train_x[idx[b * batch_size: (b + 1) * batch_size]] y = train_y[idx[b * batch_size: (b + 1) * batch_size]] tx.copy_from_numpy(x) @@ -164,7 +176,7 @@ def train(data, net, max_epoch, get_lr, weight_decay, batch_size=100, print(info) loss, acc = 0.0, 0.0 - for b in range(num_test_batch): + for b in range(0): x = test_x[b * batch_size: (b + 1) * batch_size] y = test_y[b * batch_size: (b + 1) * batch_size] tx.copy_from_numpy(x) @@ -175,14 +187,16 @@ def train(data, net, max_epoch, get_lr, weight_decay, batch_size=100, print('test loss = %f, test accuracy = %f' % ((loss / num_test_batch), (acc / num_test_batch))) + fileTimeLog.close() net.save('model', 20) # save model params into checkpoint file if __name__ == '__main__': parser = argparse.ArgumentParser(description='Train dcnn for cifar10') - parser.add_argument('model', choices=['vgg', 'cnn', 'resnet', 'caffe'], - default='vgg') + parser.add_argument('model', choices=['vgg', 'alexnet', 'resnet', 'caffe'], + default='alexnet') parser.add_argument('data', default='cifar-10-batches-py') parser.add_argument('--use_cpu', action='store_true') + parser.add_argument('batch_size',type=int, default=100) args = parser.parse_args() assert os.path.exists(args.data), \ 'Pls download the cifar10 dataset via "download_data.py py"' @@ -194,22 +208,22 @@ def train(data, net, max_epoch, get_lr, weight_decay, batch_size=100, net = caffe_net.create_net(args.use_cpu) # for cifar10_full_train_test.prototxt train((train_x, train_y, test_x, test_y), net, 160, alexnet_lr, 0.004, - use_cpu=args.use_cpu) + use_cpu=args.use_cpu,batch_size=args.batch_size) # for cifar10_quick_train_test.prototxt # train((train_x, train_y, test_x, test_y), net, 18, caffe_lr, 0.004, # use_cpu=args.use_cpu) - elif args.model == 'cnn': + elif args.model == 'alexnet': train_x, test_x = normalize_for_alexnet(train_x, test_x) - net = cnn.create_net(args.use_cpu) + net = alexnet.create_net(args.use_cpu) train((train_x, train_y, test_x, test_y), net, 2, alexnet_lr, 0.004, - use_cpu=args.use_cpu) + use_cpu=args.use_cpu,batch_size=args.batch_size) elif args.model == 'vgg': train_x, test_x = normalize_for_vgg(train_x, test_x) net = vgg.create_net(args.use_cpu) train((train_x, train_y, test_x, test_y), net, 250, vgg_lr, 0.0005, - use_cpu=args.use_cpu) + use_cpu=args.use_cpu,batch_size=args.batch_size) else: train_x, test_x = normalize_for_alexnet(train_x, test_x) net = resnet.create_net(args.use_cpu) train((train_x, train_y, test_x, test_y), net, 200, resnet_lr, 1e-4, - use_cpu=args.use_cpu) + use_cpu=args.use_cpu,batch_size=args.batch_size) \ No newline at end of file diff --git a/include/singa/core/common.h b/include/singa/core/common.h index 2c6d1d8668..ee6f07ce3d 100644 --- a/include/singa/core/common.h +++ b/include/singa/core/common.h @@ -24,7 +24,7 @@ #include #include #include "singa/utils/logging.h" - +#include #ifdef USE_CUDA #include #include @@ -52,24 +52,28 @@ typedef struct _Cuda { } Cuda; typedef struct _Opencl { } Opencl; } // namespace lang +class Device; +struct DeviceOptInfoToAppend; + + /// Block represent a chunk of memory (on device or host). class Block { public: - Block(void* ptr, size_t size, size_t offset = 0) - : data_(ptr), size_(size), offset_(offset) { + Block(void* ptr, size_t size, size_t offset = 0, Device* ptr_device = nullptr) + : data_(ptr), size_(size), offset_(offset), ptr_device_(ptr_device) { ref_count_ = 1; // std::make_shared>(1); } // Disabled as it is not used currently. // Block(void* ptr, size_t size, size_t offset, std::shared_ptr> // ref) : data_(ptr), size_(size), offset_(offset), ref_count_(ref) {} - void* mutable_data() { - initialized_ = true; - return static_cast(data_) + offset_; - } - const void* data() const { - CHECK(initialized_) << "Must initialize data before reading it"; - return static_cast(data_) + offset_; - } + void* mutable_data() ; + + const void* data() const; + + void* get_data() ; + + void update_data(void* data_new) ; + size_t size() const { return size_; } size_t offset() const { return offset_; } int IncRefCount() { @@ -89,12 +93,23 @@ class Block { void* data_ = nullptr; size_t size_ = 0; size_t offset_ = 0; + Device* ptr_device_; bool initialized_ = false; // Disabled as it is not used currently. // std::shared_ptr> ref_count_ = nullptr; std::atomic ref_count_; }; +// struct for Append purpose in device class. +struct DeviceOptInfoToAppend{ + string operation_type; + string block_ptr; + int size; + long t = (std::chrono::system_clock::now()).time_since_epoch().count(); + + DeviceOptInfoToAppend(string opt_type, string ptr,int s):operation_type(opt_type),block_ptr(ptr),size(s){} +}; + typedef struct _Context { std::mt19937 random_generator; #ifdef USE_CUDA @@ -114,4 +129,4 @@ typedef struct _Context { } Context; } // namespace singa -#endif // SINGA_CORE_COMMON_H_ +#endif // SINGA_CORE_COMMON_H_ \ No newline at end of file diff --git a/include/singa/core/device.h b/include/singa/core/device.h index 1a960d8ae7..7d9ed57757 100644 --- a/include/singa/core/device.h +++ b/include/singa/core/device.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "singa/singa_config.h" #include "singa/core/common.h" @@ -64,6 +65,8 @@ class Device { /// Called by Tensor. void FreeBlock(Block* block); + + void* UpdateGpuPtrInfo(const Block* block_ptr); /// Return the size (bytes) of memory in use /// TODO(wangwei) override this function for all devices. @@ -102,6 +105,8 @@ class Device { int id() const { return id_; } + virtual void* UpdateGpuPtr(const Block* block_ptr) = 0; + virtual void Append(DeviceOptInfoToAppend dev_opt_info) = 0; private: Device() {}; @@ -117,6 +122,8 @@ class Device { /// Free device memory. virtual void Free(void* ptr) = 0; + virtual void AppendAfterMalloc(Block* block,void* data_ptr,int size) = 0; + protected: int id_ = 0; @@ -146,6 +153,7 @@ class CppCPU : public Device { std::shared_ptr host() const override { return defaultDevice;} void SetRandSeed(unsigned seed) override; + void Append(DeviceOptInfoToAppend dev_opt_info) override {} protected: void DoExec(function&& fn, int executor) override; @@ -158,6 +166,10 @@ class CppCPU : public Device { /// Free cpu memory. void Free(void* ptr) override; + void AppendAfterMalloc(Block* block,void* data_ptr,int size) override {} + + void* UpdateGpuPtr(const Block* block_ptr) override {} + }; @@ -176,6 +188,8 @@ class CudaGPU : public Device { void SetRandSeed(unsigned seed) override; size_t GetAllocatedMem() override; + void Append(DeviceOptInfoToAppend dev_opt_info) override {} + protected: void DoExec(function&& fn, int executor) override; @@ -188,16 +202,192 @@ class CudaGPU : public Device { /// Free cpu memory. void Free(void* ptr) override; + void AppendAfterMalloc(Block* block,void* data_ptr,int size) override {} + void* UpdateGpuPtr(const Block* block_ptr) override; private: void Setup(); private: - shared_ptr pool_; + shared_ptr pool_; }; /// CudaCPU which uses cudaMallocHost to allocate pinned memory for host. +///SwapGPU +struct DeviceOptInfo{ + /* + members: [ptr, size, operation_type, idx] + */ + string ptr; + size_t size; + int operation_type; + int idx; + double t; + DeviceOptInfo(string p, size_t s, int M, int i):ptr(p),size(s),operation_type(M),idx(i){} +}; + +struct BlockMeta{ + /* + meta of swapping memory blocks + */ + Block* block_ = nullptr; + void* data_ = nullptr; + void* cpu_ptr = nullptr; + size_t size = 0; + cudaEvent_t out_event; + cudaEvent_t in_event; + cudaStream_t out_stream; + cudaStream_t in_stream; +}; + +struct SwapBlock{ + /* + meta of candidate blocks + */ + string ptr; + string cat; //sub category of the candidate blocks, read-read, write-read, etc. + int name; + size_t size; + //index of last read/write before swap out, and first read/write after swap in + int r_idx; //out idx + int d_idx; //in idx + //index of last read/write before swap out, and first read/write after swap in + double r_time; // out time + double d_time; //in time + double DOA; //Duation of Absence + double AOA; //Area of Absence + double DOA_origin; //t2-t1, DOA without taking out time spent + double WDOA = 0; //weighted DOA + double majority_voting = 0; + int r_idx_ready; //r_idx + buffer + + //below are index and time for scheduling + int idx_out_start = 0; + int idx_out_end = 0; + int idx_in_end = 0; + int idx_in_start = 0; + double t_out_start = 0; + double t_out_end = 0; + double t_in_end = 0; + double t_in_start = 0; + SwapBlock(string p, size_t s, int idx_out_start, int idx_in_end, double t_out_start, double t_in_end): + ptr(p), size(s), r_idx(idx_out_start),d_idx(idx_in_end),r_time(t_out_start), d_time(t_in_end) {} +}; +/// Device able to Swap memory between Nvidia GPU and CPU +class SwapGPU : public Device { + public: + ~SwapGPU(); + /// Construct the device using default mem pool setting. + SwapGPU(int id = 0); + /// Construct the device given the physical device ID and memory pool. + SwapGPU(int id, std::shared_ptr pool); + + void SetRandSeed(unsigned seed) override; + size_t GetAllocatedMem() override; + //Append at every index: free, read, mutable + void Append(DeviceOptInfoToAppend dev_opt_info) override; + + protected: + void DoExec(function&& fn, int executor) override; + + void CopyToFrom(void* dst, const void* src, size_t nBytes, + CopyDirection direction, Context* ctx) override; + + /// Allocate cpu memory. + void* Malloc(int size) override; + + /// Free cpu memory. + void Free(void* ptr) override; + + //append info after Malloc, as Block* is not available till Malloc() done. + void AppendAfterMalloc(Block* block,void* data_ptr,int size) override; + + //Detection and Plan + void DetectionPlan(); + + //test iteration, return GC + int Detection(vectorvec_block,int &iteration_length, int &location_of_2nd_iteration); + + //entire plan, from SelectBlock() to Scheduling(), BuildMetaTables() + void Plan(); + + //block selection algo + vector SelectBlock(vectorvec_swap,vector temp_load,double mem_limit,string mode); + + //schedule algo + void Scheduling(vector&vec_swap_selct, vector&vec_load_temp,double &overhead,double mem_limit,string mode); + + //make tables table_sched and table_meta + void BuildMetaTables(vectorvec_swap_selct); + + //update table_meta, during Append() + void UpdateMetaTables(Block* block_ptr); + + //swap/sync during Append() + void DeploySwap(); + + //exec DelpoySwap + void DeploySwapExec(int relative_counter); + + //load profile as per synchronous swap. + vector GetIdealLoad(vectorvec_load,vector vec_swap_selct); + + //in case gpu ptr wrong, updated it after swap_in ad hoc + void* UpdateGpuPtr(const Block* block_ptr) override; + + //Swap Synchronous, for early iterations + void SwapOutSynchronous(const Block* block_ptr); + void SwapInSynchronous(const Block* block_ptr); + + //Swap asynchronous, for middle iteraions + void SwapOut(const int idx); + void SwapIn(const int idx); + + private: + void Setup(); + + maptable_meta; + maptable_block_meta; //for measure speed only. + maptable_not_at_device; //int refers to its r_idx of the block/meta + map>table_sched; // changed to with sync_r_idx + + //vec_block + vectorvec_block; //iterations for Detection, i.e. detect iterations. + vectorvec_block_fresh; //iterations that are used for Planning, + vectorvec_block_mf; //iterations used to construct pool + vectorglobal_load; // load from begining + vectororigin_load; //3 iteration load, for planning. + vectorvec_run; + vectoroperation_sequence; //sequence of operations of one middle iteration + vectorsize_sequence; //size of all operations of one middle iteration + + int async_swap_flag = 0; //0 for sync, 1 for async. + int past_test_flag = 0; //0 means need to test, 1 means no need test anymore. + int global_index = 0; //global counter, index, add 1 after each Malloc/Free/read/write. + int global_index_threshold = -1; + int iteration_length = 0; + int location_of_2nd_iteration = 0; //index of start of 2nd iteration + int location_of_5th_iteration = 0; //index of start of 5th iteration + int three_more_iteration_global_index_threshold = -1; + + //design specs + float mem_limit_ratio = 0.70; + size_t smallest_block = 1<<20; //1 MB + int data_buffer = 4; // used to control readyIdx + int mutable_data_buffer = 6; + double max_load; + int max_idx; + double total_swap_in_time = 0; + double total_swap_out_time = 0; + double temp_time = 0; + double temp_time_baseline; //vec_run[0] time + int iteration_length_threshold = 1000; + + private: + shared_ptr pool_; +}; + #endif // USE_CUDA #ifdef USE_OPENCL @@ -218,7 +408,7 @@ class OpenclDevice : public singa::Device { virtual void CopyDataToFrom(Block* dst, Block* src, size_t nBytes, CopyDirection direction, int dst_offset = 0, int src_offset = 0) override; - + void Append(DeviceOptInfoToAppend dev_opt_info) override {} protected: /// The OpenCL device that this object represents. /// Each OpenclDevice contains exactly one cl::Device for the lifetime of the @@ -248,6 +438,10 @@ class OpenclDevice : public singa::Device { /// Converts the void pointer into a Buffer object, then deletes the object. /// This has the effect of freeing up device memory. void Free(void* ptr) override; + void AppendAfterMalloc(Block* block,void* data_ptr,int size) override {} + + void* UpdateGpuPtr(const Block* block_ptr) override {} + private: @@ -338,4 +532,4 @@ class Platform { } // namespace singa -#endif // SINGA_CORE_DEVICE_H_ +#endif // SINGA_CORE_DEVICE_H_ \ No newline at end of file diff --git a/include/singa/core/memory.h b/include/singa/core/memory.h index f664f95ced..b3dfd672ec 100644 --- a/include/singa/core/memory.h +++ b/include/singa/core/memory.h @@ -23,6 +23,17 @@ #include #include "singa/proto/core.pb.h" #include "singa/singa_config.h" +//for SmartMemPool +#include +#include +#include +#include +#include +#include +#include +#include /* malloc, free, rand */ +#include +using namespace std; #ifdef USE_CUDA #include "cnmem.h" @@ -38,7 +49,10 @@ class DeviceMemPool { public: virtual void Malloc(void** ptr, const size_t size) = 0; virtual void Free(void* ptr) = 0; + virtual void Append(string blockInfo) = 0; + virtual void PoolOpt(vector &vec_mf) = 0; + /// Return a pair for free and total memory managed by this pool. virtual std::pair GetMemUsage() { return std::make_pair(0u, 0u); @@ -60,7 +74,10 @@ class CnMemPool : public DeviceMemPool { void Malloc(void** ptr, const size_t size); void Free(void* ptr); + void Append(string blockInfo){} + void PoolOpt(vector &vec_mf) override {} + std::pair GetMemUsage() override; // release all memory and set cnmem manager to unintialized @@ -85,7 +102,122 @@ class CudaMemPool : public DeviceMemPool { public: void Malloc(void** ptr, const size_t size) override; void Free(void* ptr) override; + void Append(string blockInfo){} + + void PoolOpt(vector &vec_mf) override {} + }; + +//for SmartMemPool and SwapPool +struct PoolBlockMeta{ + /* + for memory pool Malloc look-up table. + */ + int r_idx; + int d_idx; + size_t size; + size_t offset; + void* ptr; + int occupied; //0 is free, 1 is occupied. + int cross_iteration; + int occupied_backup; +}; + +///struct Vertex +struct Vertex{ + int name; + size_t size; + int r; //arrive + int d; //depart + int cross_iteration =0; + pair color_range; + vector> vec_color_preoccupied; + Vertex(int n, size_t s, int r1, int d1):name(n),size(s),r(r1),d(d1){} + +}; + + +///SmartMemPool +class SmartMemPool: public DeviceMemPool { +public: + SmartMemPool(const MemPoolConf &conf); //constructor + void Malloc(void** ptr, const size_t size); + void Free(void* ptr); + ~SmartMemPool(); + std::pair GetMemUsage() override; + void GetMaxLoad(void); + void Append(string blockInfo); + vector Plan(vectorvec, int &idx_range, size_t &offset, size_t &offset_cross_iteration,string color_method); + int Detection(vectorvec_string_test, int &iteration_length, int &location_2nd_iteration); + + void PoolOpt(vector &vec_mf) override {} + +protected: + void Init(); +private: + MemPoolConf conf_; + // whether the (global) memory pool has been initialized + bool initialized_ = false; + // lock on the initialized variable + std::mutex mtx_; + + string color_method; + int malloc_flag = 0; //0 for cudaMalloc, 1 for coloringMalloc + int global_index = 0; //global counter each time Malloc/Free, add 1. + int global_index_threshold = -1; + int load_flag = 1; //record load at 1 + void* ptr_pool = NULL; + int idx_range = 0; + size_t offset = 0; + size_t offset_cross_iteration = 0; //cross iteration offset. + int iteration_length = 0; + int location_2nd_iteration = 0; + vector vec; + vector vec_block_rw; //read write only opt info + vector vec_block_rw_mf; //read write, malloc, free opt info + maptable_ridx_to_didx; //table match from r_idx to d_idx + maptable_didx_to_ridx; //table match from d_idx to r_idx + + vector>vec_block_meta; //vec of block meta, index in the vector refering to the r_idx + map>table_load; //global_index, + maptable_ptr_to_size; //for tracking load in Free. add when allocate, delete when deallocate. + maptable_ptr_to_ridx; //ptr for arrival idx, for look up Table during free + int check_point = 300; //for reduce number of test. + size_t max_total_load; + size_t max_mem_usage; +}; + + +///SwapPool +class SwapPool : public DeviceMemPool { +public: + SwapPool(const MemPoolConf &conf); //constructor + void Malloc(void** ptr, const size_t size); + void Free(void* ptr); + ~SwapPool(); + std::pair GetMemUsage() override; + void Append(string blockInfo); + + //PoolOpt() construct pool based on MF info after Swap constructed. + void PoolOpt(vector &vec_mf); +protected: + void Init(); +private: + MemPoolConf conf_; + // whether the (global) memory pool has been initialized + bool initialized_ = false; + // lock on the initialized variable + std::mutex mtx_; + + vector vec_block; + int pool_flag = 0; + int pool_index = 0; //like global counter in device class + int iteration_length_mf = 0; //max length of malloc free operation sequences. + void* ptr_pool = nullptr; + maptable_ptr_to_ridx; //map ptr to arrival idx, for look up Table during free + maptable_pool_meta; //table of pool block meta, key with r_idx +}; + #endif } // namespace singa -#endif // SINGA_CORE_MEMORY_H_ +#endif // SINGA_CORE_MEMORY_H_ \ No newline at end of file diff --git a/python/singa/tensor.py b/python/singa/tensor.py index 441431fc25..ae9ba5ba83 100644 --- a/python/singa/tensor.py +++ b/python/singa/tensor.py @@ -472,9 +472,9 @@ def __idiv__(self, x): x (float or Tensor): ''' if isinstance(x, Tensor): - self.data /= x.data + self.data *= 1/x.data else: - self.data /= float(x) + self.data *= 1/float(x) return self ''' diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7dd9bf7751..aa8d41f05b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -30,6 +30,7 @@ AUX_SOURCE_DIRECTORY(io io_source) AUX_SOURCE_DIRECTORY(io/network io_source) LIST(APPEND singa_sources ${io_source}) +AUX_SOURCE_DIRECTORY(core/common core_source) AUX_SOURCE_DIRECTORY(core/device core_source) AUX_SOURCE_DIRECTORY(core/memory core_source) AUX_SOURCE_DIRECTORY(core/scheduler core_source) diff --git a/src/core/common/common.cc b/src/core/common/common.cc new file mode 100644 index 0000000000..692c1c451f --- /dev/null +++ b/src/core/common/common.cc @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DISABLE_WARNINGS + +#include "singa/core/common.h" +#include "singa/core/device.h" +#include +#include +#include + +namespace singa { + +void* Block::mutable_data() { + initialized_ = true; + + //Append block info: opt_type, ptr, time_stamp + if (ptr_device_!=nullptr){ + stringstream strm; + strm<Append(dev_opt_info); + } + + //update ptr after swap in done, if variable is not swapped back yet as expected. + if (data_ == nullptr) { + auto tempData_ = ptr_device_->UpdateGpuPtrInfo(this); + return static_cast(tempData_) + offset_; + } + + return static_cast(data_) + offset_; + } + + +const void* Block::data() const { + CHECK(initialized_) << "Must initialize data before reading it"; + + //Append block info: opt_type, ptr, time_stamp + if (ptr_device_!=nullptr){ + stringstream strm; + strm<Append(dev_opt_info); + } + + //update ptr after swap in done, if variable is not swapped back yet as expected. + if (data_ == nullptr) { + auto tempData_ = ptr_device_->UpdateGpuPtrInfo(this); + return static_cast(tempData_) + offset_; + } + + return static_cast(data_) + offset_; + } + +void* Block::get_data() { + //get data without calling data(), to avoid append block info. + return data_; +} + +void Block::update_data(void* data_new) { + //update data_, after the swap in completes. + data_ = data_new; +} + + +} // namespace singa +#endif \ No newline at end of file diff --git a/src/core/device/cuda_gpu.cc b/src/core/device/cuda_gpu.cc index f6603d3632..523986f4f7 100644 --- a/src/core/device/cuda_gpu.cc +++ b/src/core/device/cuda_gpu.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include "singa/core/device.h" #include "singa/utils/cuda_utils.h" namespace singa { @@ -122,5 +123,10 @@ void CudaGPU::Free(void* ptr) { } } +void* CudaGPU::UpdateGpuPtr(const Block* block_){ + return nullptr; +} + + } // namespace singa -#endif // USE_CUDA +#endif // USE_CUDA \ No newline at end of file diff --git a/src/core/device/device.cc b/src/core/device/device.cc index cda1b9f942..5a1ac270ac 100644 --- a/src/core/device/device.cc +++ b/src/core/device/device.cc @@ -17,6 +17,9 @@ */ #include "singa/core/device.h" +#include +#include +#include namespace singa { Device::Device(int id, int num_executors) @@ -37,7 +40,9 @@ Block* Device::NewBlock(int size) { << "from size_t to int. In that case, the size is too large."; if (size > 0) { void* ptr = Malloc(size); - return new Block(ptr, size); + Block* block_ = new Block(ptr, size,0,this); + AppendAfterMalloc(block_,ptr,size); // make table and append vec_block. + return block_; } else { return nullptr; } @@ -46,11 +51,28 @@ Block* Device::NewBlock(int size) { // TODO(wangwei) return Block to the memory manager void Device::FreeBlock(Block* block) { if (block != nullptr) { - Free(block->mutable_data()); + auto tempPtr = block->mutable_data(); + Free(tempPtr); + + //append block info for free operation. + stringstream strm; + strm<size()); + auto t = (std::chrono::system_clock::now()).time_since_epoch().count(); + dev_opt_info.t = t; + Append(dev_opt_info); + delete block; } } + +void* Device::UpdateGpuPtrInfo(const Block* block_){ + return UpdateGpuPtr(block_); +} + + void Device::CopyDataToFrom(Block* dst, Block* src, size_t nBytes, CopyDirection direct, int dst_offset, int src_offset) { @@ -73,4 +95,4 @@ void Device::CopyDataFromHostPtr(Block* dst, const void* src, size_t nBytes, {}, {dst}); } void Device::Sync() {} -} // namespace singa +} // namespace singa \ No newline at end of file diff --git a/src/core/device/platform.cc b/src/core/device/platform.cc index 8ae15f8604..48b2a94520 100644 --- a/src/core/device/platform.cc +++ b/src/core/device/platform.cc @@ -128,11 +128,11 @@ Platform::CreateCudaGPUsOn(const vector &devices, size_t init_size) { conf.add_device(device); CHECK_LE(bytes, Platform::GetGPUMemSize(device).first); } - auto pool = std::make_shared(conf); + auto pool = std::make_shared(conf); vector > ret; for (auto device : devices) { - auto dev = std::make_shared(device, pool); + auto dev = std::make_shared(device, pool); ret.push_back(dev); } return ret; @@ -170,14 +170,12 @@ Platform::CreateOpenclDevices(const size_t num_devices) { } return (int)total_num_devices; } - static const std::vector> Platform::CreateOpenclDevices(const std::vector &id) { - } */ #endif // USE_OPENCL } // namespace singa -#endif +#endif \ No newline at end of file diff --git a/src/core/device/swap_gpu.cc b/src/core/device/swap_gpu.cc new file mode 100644 index 0000000000..4228f6e16a --- /dev/null +++ b/src/core/device/swap_gpu.cc @@ -0,0 +1,1160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "singa/singa_config.h" +#ifdef USE_CUDA +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // std::tuple, std::get, std::tie, std::ignore +#include "singa/core/device.h" +#include "singa/utils/cuda_utils.h" + + +using namespace std; +namespace singa { + +const cudaMemcpyKind copyKind[] = {cudaMemcpyHostToHost, cudaMemcpyHostToDevice, + cudaMemcpyDeviceToHost, + cudaMemcpyDeviceToDevice}; + +struct sort_by_ptr_idx_ascending{ + /* + sort DeviceOptInfo by ptr and then idx. + */ + inline bool operator() (const DeviceOptInfo& struct1, const DeviceOptInfo& struct2) + { + return ((struct1.ptr SplitOptString(string s, string delimiter) { + // string delimiter + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + vector res; + while ((pos_end = s.find(delimiter, pos_start)) != string::npos) { + token = s.substr(pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + res.push_back(token); + } + res.push_back(s.substr(pos_start)); + + return res; +} + + +vector DeviceOptSeqStrToStruct(vector vec, int &idx_range){ + /* + convert vector of string into vector of DeviceOptInfo, sorted by ptr + and then idx, and update idx_range to pieceMsgVec size. + format of DeviceOptInfo [ptr, size/-1, flag, idx, timestamp] + flag: 1 for malloc, -1 for free, 2 for read, 3 for layer,4 for mutable + */ + vectorvec_opt_info; + + for (int i=0;i v = SplitOptString(vec[i], " "); + int operation_type; + if (v[0]=="Malloc"){ + operation_type = 1; + }else if (v[0]=="Free"){ + operation_type = -1; + }else if (v[0]=="Mutable"){ + operation_type = 4; + }else if (v[0]=="Read"){ + operation_type = 2; + }else if (v[0]=="Layer"){ + operation_type = 3; + } + //DeviceOptInfo(string p, size_t s, int M, int i):ptr(p),size(s),operation_type(M),idx(i){} + size_t result; + stringstream convert(v[2]); + if (!(convert>>result)){ + result =-1; + cout<<"error for converting size from str to int."<>temp_time; + itm.t =temp_time; + vec_opt_info.push_back(itm); + } + + sort(vec_opt_info.begin(),vec_opt_info.end(),sort_by_ptr_idx_ascending()); + idx_range = static_cast(vec_opt_info.size()); + + return vec_opt_info; +} + + +vector DeviceOptSeqRepeatableTestPreProcess(vectorvec_opt_info){ + /* + pre process Device Operation Sequence Struct info for repeatable test, + return a vector of int for fast detection. + */ + vectorvec_opt_simplified_info; + string temp_str; + int temp_idx=0; + for (int i=0;ivec_rep; // vector of size_delta, name it as vec_rep for simlisity. + for (int i =0; irep, int &iteration_length, int &location_of_2nd_iteration, int iteration_length_threshold, int global_index ){ + /* + repeatable test, input vector of int, + in-place update max_legth (length of iteration) + and location_of_2nd_iteration (where 2nd iteration starts) + */ + int idx_range = (int)rep.size(); + int threshold = iteration_length_threshold; + vector>iteration_length_location_of_2nd_iteration; + + for (int i=0; ithreshold){ + break; + } + for (int len=1; len<(idx_range-i);len++){ + if (iteration_length>threshold){ + break; + } + if((equal(rep.begin()+i,rep.begin()+i-1+len,rep.begin()+i+len))&&(iteration_lengthstruct2.DOA_origin); + } +}; + +struct sort_by_WDOA_descending{ + /* + sort SwapBlock by weighted DOA_origin, descending + */ + inline bool operator() (const SwapBlock& struct1, const SwapBlock& struct2) + { + return (struct1.WDOA>struct2.WDOA); + } +}; + +struct sort_by_AOA_descending{ + /* + sort SwapBlock by pri, descending + */ + inline bool operator() (const SwapBlock& struct1, const SwapBlock& struct2) + { + return (struct1.AOA>struct2.AOA); + } +}; + +struct sort_by_idx_ascending_swap{ + /* + sort DeviceOptInfo_Swap by idx. + */ + inline bool operator() (const SwapBlock& struct1, const SwapBlock& struct2) + { + return (struct1.r_idxstruct2.d_idx); + } +}; + +struct sort_by_majority_voting_ascending{ + /* + sort majority voting, ascending + */ + inline bool operator() (const SwapBlock& struct1, const SwapBlock& struct2) + { + return (struct1.majority_voting GetOptIdxAboveLoadLimit(vectorvec_load, size_t mem_limit, int start_idx, int end_idx,int iteration_length){ + /* + get operation index (range) that above the load limit. + input: vec_load, mem_limit, range [start_idx, end_idx) + return range overlimit [first_over_limit, first_below_limit) + */ + int first_over_limit = start_idx; + int first_below_limit = end_idx; + + for (int i = start_idx+iteration_length; i < end_idx+iteration_length; i++){ + if (vec_load[i] > mem_limit){ + first_over_limit = i-iteration_length; + break; + } + } + + for (int i = end_idx+iteration_length; i > first_over_limit+iteration_length; i--){ + if (vec_load[i] > mem_limit){ + first_below_limit = i-1-iteration_length; + break; + } + } + + if (first_over_limit == start_idx) first_over_limit = -1; + + if (first_below_limit == end_idx) first_below_limit = -1; + + return std::make_pair(first_over_limit, first_below_limit); +} + + +pair GetLoadPeak(vectorvec_load_test,int iteration_length){ + /* + return value and index of load peak + */ + double max_load_test = 0; + int max_idx_test = 0; + for (int i = iteration_length; i < iteration_length*2; i++){ + if (max_load_test < vec_load_test[i]){ + max_load_test = vec_load_test[i]; + max_idx_test = i - iteration_length; + } + } + return std::make_pair(max_load_test,max_idx_test); +} + +void UpdateLoad(vector& vec_load,int start_idx, int end_idx, int plus_minus, size_t size,int iteration_length){ + /* + update load [start_idx, end_idx) by plus_minus*size + */ + for (int i = start_idx+iteration_length; i(size) * plus_minus; + } +} + + +///define SwapGPU member functions +vector SwapGPU::SelectBlock(vectorvec_swap,vector temp_load,double mem_limit,string mode){ + vectorvec_swap_selct; + /* + select swapping blocks based on a cetain priority score or BO score; + with load updated + */ + if (mode == "DOA_origin"){ + sort(vec_swap.begin(),vec_swap.end(),sort_by_DOA_origin_descending()); + } + + if (mode == "AOA"){ + sort(vec_swap.begin(),vec_swap.end(),sort_by_AOA_descending()); + } + + if (mode == "WDOA"){ + for (int i = 0; i < vec_swap.size(); i++){ + auto itm = vec_swap[i]; + for (int j = itm.r_idx; j < itm.d_idx; j++){ + itm.WDOA += origin_load[i+iteration_length] - mem_limit; + } + } + sort(vec_swap.begin(),vec_swap.end(),sort_by_WDOA_descending()); + } + + if (mode == "majority_voting"){ + //add order for DOA + sort(vec_swap.begin(),vec_swap.end(),sort_by_DOA_origin_descending()); + for (int i = 0; i < vec_swap.size();i++){ + vec_swap[i].majority_voting+=i; + } + //add order for AOA + sort(vec_swap.begin(),vec_swap.end(),sort_by_AOA_descending()); + for (int i = 0; i < vec_swap.size();i++){ + vec_swap[i].majority_voting+=i; + } + //add order for WDOA + for (int i = 0; i < vec_swap.size(); i++){ + auto itm = vec_swap[i]; + for (int j = itm.r_idx; j < itm.d_idx; j++){ + itm.WDOA += origin_load[i+iteration_length] - mem_limit; + } + } + sort(vec_swap.begin(),vec_swap.end(),sort_by_WDOA_descending()); + for (int i = 0; i < vec_swap.size();i++){ + vec_swap[i].majority_voting+=i; + } + sort(vec_swap.begin(),vec_swap.end(),sort_by_majority_voting_ascending()); + } + + + + //select block one by one till updated peak load is no larger than limit. + for (int i=0; i SwapGPU::GetIdealLoad(vectorvec_load,vector vec_swap_selct){ + /* + get load_ideal, which is equivalent to load by synchronous swapping. + */ + auto vec_load_return = vec_load; + for (int i =0; i&vec_swap_selct, vector&vec_load_temp,double &overhead,double mem_limit,string mode){ + /* + Swap Scheduling algo + update idx_out_end, idx_in_start + compute overhead time + mode selection: no overhead or stick to limit. + */ + + overhead = 0; + + /// mode that stick to the mem_limit + if (mode == "stick-to-limit"){ + sort(vec_swap_selct.begin(),vec_swap_selct.end(),sort_by_idx_ascending_swap()); + for (int i = 0; i 0){ + ready_idx = std::max(ready_idx,vec_swap_selct[i-1].idx_out_end); + } + + itm.idx_out_start = ready_idx; + itm.t_out_start = vec_run[ready_idx+iteration_length].t; + itm.t_out_end = itm.t_out_start + SwapOutTime(itm.size); + total_swap_out_time+=SwapOutTime(itm.size); + while (itm.t_out_end > vec_run[ready_idx+iteration_length].t){ + //ready means when able to finish swapOut, w/ or w/o overhead. + ready_idx++; + } + + //get min compare with max_idx and ready_idx. + ready_idx = std::min(max_idx,ready_idx); + UpdateLoad(vec_load_temp,ready_idx+1,itm.d_idx,-1,itm.size,iteration_length); + auto temp_over_limit_ = GetOptIdxAboveLoadLimit(vec_load_temp,mem_limit,0,iteration_length,iteration_length); + if ((temp_over_limit_.first != -1) && (temp_over_limit_.first <= ready_idx)) { + UpdateLoad(vec_load_temp,temp_over_limit_.first-1,ready_idx+1,-1,itm.size,iteration_length); + ready_idx = temp_over_limit_.first - 1; + overhead+=(itm.t_out_end-vec_run[ready_idx+iteration_length].t); + } + itm.idx_out_end = ready_idx; + vec_swap_selct[i] = itm; + } + + sort(vec_swap_selct.begin(),vec_swap_selct.end(),sort_by_idx_descending_swap()); + for (int i =0; i 0){ need_idx = std::min(need_idx,vec_swap_selct[i-1].idx_in_start); } + itm.idx_in_end = need_idx; + double prepareTime = vec_run[need_idx+iteration_length].t - SwapInTime(itm.size); + total_swap_in_time+=SwapInTime(itm.size); + while (prepareTime < vec_run[need_idx+iteration_length].t){ + need_idx--; + } + need_idx = std::max(need_idx,max_idx+1); + itm.idx_in_start = need_idx; + itm.t_in_start = prepareTime; + UpdateLoad(vec_load_temp,itm.idx_in_start,itm.d_idx,1,itm.size,iteration_length); + auto temp_over_limit_3 = GetOptIdxAboveLoadLimit(vec_load_temp,mem_limit,0,iteration_length,iteration_length); + + if ((temp_over_limit_3.second != -1) && (vec_run[temp_over_limit_3.second+iteration_length].t > itm.t_in_start)) { + overhead+=(vec_run[temp_over_limit_3.second+iteration_length].t - itm.t_in_start); + UpdateLoad(vec_load_temp,itm.idx_in_start,temp_over_limit_3.second+1,-1,itm.size,iteration_length); + itm.idx_in_start = temp_over_limit_3.second+1; + auto temp_over_limit_4 = GetOptIdxAboveLoadLimit(vec_load_temp,mem_limit,0,iteration_length,iteration_length); + } + vec_swap_selct[i] = itm; + } + }///end of first mode. + + + ///mode that incurs zero overhead + if (mode == "no-overhead"){ + //update idx_out_end + //sort by r_idx for idx_out_end update + sort(vec_swap_selct.begin(),vec_swap_selct.end(),sort_by_idx_ascending_swap()); + for (int i = 0; i 0){ + ready_idx = std::max(ready_idx,vec_swap_selct[i-1].idx_out_end); + } + itm.idx_out_start = ready_idx; + itm.t_out_start = vec_run[ready_idx].t; + itm.t_out_end = itm.t_out_start + SwapOutTime(itm.size); + while (itm.t_out_end > vec_run[ready_idx].t){ + ready_idx++; + } + itm.idx_out_end = ready_idx; + vec_swap_selct[i] = itm; + } + //update idx_in_start + sort(vec_swap_selct.begin(),vec_swap_selct.end(),sort_by_idx_descending_swap()); + for (int i =0; i 0){ need_idx = std::min(need_idx,vec_swap_selct[i-1].idx_in_start); } + itm.idx_in_end = need_idx; + double prepareTime = vec_run[need_idx].t - SwapInTime(itm.size); + while (prepareTime < vec_run[need_idx].t){ + need_idx--; + } + itm.idx_in_start = need_idx; + itm.t_in_start = prepareTime; + vec_swap_selct[i] = itm; + UpdateLoad(vec_load_temp,itm.idx_out_end,itm.idx_in_start+1,-1,itm.size,iteration_length); + } + + } + +} + + +void SwapGPU::BuildMetaTables(vectorvec_swap_selct){ + /* + construct tables: table_sched, and table_meta + */ + cudaStream_t stream1; + cudaStream_t stream2; + sort(vec_swap_selct.begin(),vec_swap_selct.end(),sort_by_idx_ascending_swap()); + //for each swap select, make table_sched and table_meta + // for (int i = static_cast(vec_swap_selct.size()-1);i>=0; i--){ + for (int i =0; i(table_sched.find(itm.idx_out_start)->second) = itm.r_idx; + std::get<1>(table_sched.find(itm.idx_out_start)->second) = 0; + } + //idx_in_start swap + if (table_sched.find(itm.idx_in_start) == table_sched.end()){ + table_sched[itm.idx_in_start] = std::make_tuple(itm.r_idx,1,-1,-1); + } else { + std::get<0>(table_sched.find(itm.idx_in_start)->second) = itm.r_idx; + std::get<1>(table_sched.find(itm.idx_in_start)->second) = 1; + } + // idx_out_end sync + if (table_sched.find(itm.idx_out_end) == table_sched.end()){ + table_sched[itm.idx_out_end] = std::make_tuple(-1,-1,itm.r_idx,0); + } else { + std::get<2>(table_sched.find(itm.idx_out_end)->second) = itm.r_idx; + std::get<3>(table_sched.find(itm.idx_out_end)->second) = 0; + } + //i2 sync + if (table_sched.find(itm.idx_in_end) == table_sched.end()){ + table_sched[itm.idx_in_end] = std::make_tuple(-1,-1,itm.r_idx,1); + } else { + std::get<2>(table_sched.find(itm.idx_in_end)->second) = itm.r_idx; + std::get<3>(table_sched.find(itm.idx_in_end)->second) = 1; + } + + ///Make table_meta + void* temp_ptr = nullptr; + cudaMallocHost(&temp_ptr,itm.size); //pinned memory. + BlockMeta meta; + meta.size = itm.size; + meta.cpu_ptr = temp_ptr; + meta.out_stream = stream1; + meta.in_stream = stream2; + table_meta[itm.r_idx] = meta; + } + +} + +void SwapGPU::UpdateMetaTables(Block* block_ptr){ + /* + update table_meta's block_ and data_; update once atfer swap test is passed. + enable to update negative r_idx. + it's safe in below procedure, as r_global_index and relative_counter should never be the same. + */ + + if (past_test_flag == 1) { + //update positive r_idx + int r_global_index = (global_index-location_of_2nd_iteration)%iteration_length; + if (!(table_meta.find(r_global_index)==table_meta.end())){ + table_meta.find(r_global_index)->second.block_ = block_ptr; + table_meta.find(r_global_index)->second.data_ = block_ptr->get_data(); + } + + //update negative r_idx + int relative_counter = r_global_index - iteration_length; + if (!(table_meta.find(relative_counter)==table_meta.end())){ + table_meta.find(relative_counter)->second.block_ = block_ptr; + table_meta.find(relative_counter)->second.data_ = block_ptr->get_data(); + } + } + +} + +int SwapGPU::Detection(vectorvec_block,int &iteration_length, int &location_of_2nd_iteration){ + /* + test repeatability, detect iteration, and return global_index_threshold. + */ + + ///vec_str (vec_block) to vec_opt_info, sort by ptr and idx. + int idx_range = 0; + vector vec_opt_info = DeviceOptSeqStrToStruct(vec_block,idx_range); + + ///rep test + vector vec_rep = DeviceOptSeqRepeatableTestPreProcess(vec_opt_info); + RepeatableTest(vec_rep,iteration_length,location_of_2nd_iteration,iteration_length_threshold,global_index); + + //Note here location_of_2nd_iteration not exactly start of one iteration, + //adjust to nearly start of one by restricting "Malloc" + int shift_counter = 0; + for (int i=0;i v = SplitOptString(vec_block[location_of_2nd_iteration+i], " "); + if (v[0]=="Malloc"){ + shift_counter = i; + break; + } + } + location_of_2nd_iteration =location_of_2nd_iteration+shift_counter; + + if (iteration_length vec_opt_info = DeviceOptSeqStrToStruct(vec_block,idx_range); + sort(vec_opt_info.begin(),vec_opt_info.end(),sort_by_idx_ascending()); + + // scale down idx, to middle iteration. + temp_time_baseline = vec_opt_info[location_of_5th_iteration].t; + for (int i=0; ione_itr(&vec_opt_info[location_of_2nd_iteration+4*iteration_length],&vec_opt_info[location_of_2nd_iteration+5*iteration_length]); + for (int i =0; itemp_vec_run(&vec_opt_info[location_of_2nd_iteration+3*iteration_length],&vec_opt_info[location_of_2nd_iteration+6*iteration_length]); + vec_run = temp_vec_run; + + vectortemp_vec_run2(&vec_opt_info[location_of_2nd_iteration],&vec_opt_info[location_of_2nd_iteration+3*iteration_length]); + auto vec_run2 = temp_vec_run2; + + + vectorvec_load(&global_load[location_of_2nd_iteration],&global_load[location_of_2nd_iteration+3*iteration_length]); + origin_load = vec_load; + + auto max_current = GetLoadPeak(vec_load,iteration_length); + max_load = max_current.first; + max_idx = max_current.second; + + //sort by ptr & idx, sorting the duplicate + auto vec_run_dup = vec_run; + sort(vec_run_dup.begin(),vec_run_dup.end(),sort_by_ptr_idx_ascending()); + + ///formulate swappable items. + vectorvec_swap; + + for (int i =1; i= smallest_block) && (vec_run_dup[i-1].idxmax_idx) + && (vec_run_dup[i-1].ptr ==vec_run_dup[i].ptr) + && ((vec_run_dup[i-1].operation_type==3) or (vec_run_dup[i-1].operation_type==2) or (vec_run_dup[i-1].operation_type==4))) + { + SwapBlock itm(vec_run_dup[i].ptr, vec_run_dup[i].size, vec_run_dup[i-1].idx, vec_run_dup[i].idx, vec_run_dup[i-1].t, vec_run_dup[i].t); + itm.DOA_origin = itm.d_time-itm.r_time; + itm.DOA = itm.d_time-itm.r_time-SwapOutTime(itm.size)-SwapOutTime(itm.size); + if (itm.DOA>=0){ + itm.AOA = itm.DOA * itm.size; + } else { + itm.AOA = itm.DOA * 1/itm.size; + } + //cat A + if (vec_run_dup[i-1].operation_type == 3){ itm.cat = "A1"; itm.r_idx_ready = itm.r_idx; } + if (vec_run_dup[i-1].operation_type == 2){ itm.cat = "A2"; itm.r_idx_ready = itm.r_idx + data_buffer;} + if (vec_run_dup[i-1].operation_type == 4){ itm.cat = "A3"; itm.r_idx_ready = itm.r_idx + mutable_data_buffer;} + + vec_swap.push_back(itm); + } + } + + ///load ideal, swap all vec_swap, lest possible memory by one-swap, for data collection only. + auto vec_load_ideal = GetIdealLoad(vec_load,vec_swap); + fstream file_load_ideal("load_ideal.csv", ios::in|ios::out|ios::app); + for (int i=iteration_length; i(conf); + Setup(); + +} + +SwapGPU::SwapGPU(int id, std::shared_ptr pool) + : Device(id, kNumCudaStream) { + CHECK(pool != nullptr); + pool_ = pool; + Setup(); +} + +void SwapGPU::Setup() { + lang_ = kCuda; + ctx_.stream = NULL; // use the default sync stream + // TODO(wangwei) create one handle for each steam? + CUDA_CHECK(cudaSetDevice(id_)); + // use curandCreateGeneratorHost for CudaHost device + CURAND_CHECK( + curandCreateGenerator(&ctx_.curand_generator, CURAND_RNG_PSEUDO_DEFAULT)); + auto seed = std::chrono::system_clock::now().time_since_epoch().count(); + SetRandSeed(seed); + // TODO(wangwei) if one generator per stream, then need diff offset per gen? + CURAND_CHECK(curandSetGeneratorOffset(ctx_.curand_generator, 0)); + CUBLAS_CHECK(cublasCreate(&(ctx_.cublas_handle))); + +#ifdef USE_CUDNN + // TODO(wangwei) create one handle for each stream? + auto status = cudnnCreate(&ctx_.cudnn_handle); + CHECK_EQ(status, CUDNN_STATUS_SUCCESS) << cudnnGetErrorString(status); +#endif // USE_CUDNN +} + +void SwapGPU::SetRandSeed(unsigned seed) { + CHECK(ctx_.curand_generator); + CURAND_CHECK(curandSetPseudoRandomGeneratorSeed(ctx_.curand_generator, seed)); +} + +void SwapGPU::DoExec(function&& fn, int executor) { fn(&ctx_); } + +void SwapGPU::CopyToFrom(void* dst, const void* src, size_t nBytes, + CopyDirection direction, Context* ctx) { + cudaMemcpy(dst, src, nBytes, copyKind[direction]); + // TODO(wangwei) use async copy + // cudaMemcpyAsync(dst, src, nBytes,cudaMemcpyDefault, ctx_.stream); +} + +size_t SwapGPU::GetAllocatedMem() { + if (pool_ != nullptr) { + auto ret = pool_->GetMemUsage(); + return ret.second - ret.first; + } + LOG(ERROR) << "The memory pool is not set"; + return 0u; +} + +/// Allocate gpu memory. +void* SwapGPU::Malloc(int size) { + + void* ptr = nullptr; + if (size > 0) { + CUDA_CHECK(cudaSetDevice(id_)); + pool_->Malloc((void**)&ptr, size); + + ///append vec_block_mf:for swap & pool + if ((async_swap_flag == 1) && ((global_index - 4*iteration_length) < three_more_iteration_global_index_threshold) + && ((global_index - iteration_length) >= three_more_iteration_global_index_threshold)){ + string temp_str1 ="Malloc "; + stringstream strm2; + strm2<Free(ptr); + ///append vec_block_mf: for swap & pool + if ((async_swap_flag == 1) && ((global_index - 4*iteration_length) < three_more_iteration_global_index_threshold) + && ((global_index - iteration_length) >= three_more_iteration_global_index_threshold)){ + string temp_str1 ="Free "; + stringstream strm2; + strm2< iteration_length_threshold) { + past_test_flag = 1; + three_more_iteration_global_index_threshold = global_index_threshold + 3*iteration_length; + location_of_5th_iteration = location_of_2nd_iteration + 3*iteration_length; + } + } + ///switch flag; next idx + if ((global_index+1) == three_more_iteration_global_index_threshold){ + Plan(); + async_swap_flag = 1; + } +} + +void SwapGPU::AppendAfterMalloc(Block* block_ptr,void* data_ptr,int size){ + /* + Append info right after Malloc; make block_ptr - data_ptr pair wise table. + as Block* is not available till Malloc() done. + */ + + //append info + stringstream strm; + strm<= three_more_iteration_global_index_threshold + iteration_length) && (!(table_sched.find(r_global_index_n) == table_sched.end()))) { + DeploySwapExec(r_global_index_n); + } + if ((global_index >= three_more_iteration_global_index_threshold + iteration_length) && (!(table_sched.find(r_global_index) == table_sched.end()))) { + DeploySwapExec(r_global_index); + } + } +} + + +void SwapGPU::DeploySwapExec(int r_global_index){ + //execute DeploySwap + auto swap_idx = std::get<0>(table_sched.find(r_global_index)->second); + auto swap_dir = std::get<1>(table_sched.find(r_global_index)->second); + auto sync_idx = std::get<2>(table_sched.find(r_global_index)->second); + auto sync_dir = std::get<3>(table_sched.find(r_global_index)->second); + if (swap_dir == 0){ + SwapOut(swap_idx); + } + if (swap_dir == 1){ + SwapIn(swap_idx); + } + if (sync_dir == 0){ + ///sync swap-out, including sync, update block's data_ to nullptr, free data_, update meta. + auto last_meta = table_meta.find(sync_idx)->second; + auto t1 = (std::chrono::system_clock::now()).time_since_epoch().count(); + cudaEventSynchronize(last_meta.in_event); + auto t2 = (std::chrono::system_clock::now()).time_since_epoch().count(); + + table_not_at_device[last_meta.block_] = sync_idx; + + last_meta.block_->update_data(nullptr); + pool_->Free(last_meta.data_); + ///append vec_block_mf + if ((async_swap_flag == 1) && ((global_index - 4*iteration_length) < three_more_iteration_global_index_threshold) + && ((global_index - iteration_length) >= three_more_iteration_global_index_threshold)){ + string temp_str1 ="Free "; + stringstream strm2; + strm2<second = last_meta; + } + if (sync_dir == 1){ + ///sync swap-in, including sync, update block's data_ to new gpu address, update meta. + auto last_meta = table_meta.find(sync_idx)->second; + auto t1 = (std::chrono::system_clock::now()).time_since_epoch().count(); + cudaEventSynchronize(last_meta.out_event); + auto t2 = (std::chrono::system_clock::now()).time_since_epoch().count(); + table_not_at_device.erase(last_meta.block_); + last_meta.block_->update_data(last_meta.data_); + table_meta.find(sync_idx)->second = last_meta; + } +} + +void SwapGPU::Append(DeviceOptInfoToAppend dev_opt_info){ + + //convert block_ptr from string to Block* + void* temp_ptr; + stringstream convert(dev_opt_info.block_ptr); + convert>>temp_ptr; + auto block_ptr = static_cast(temp_ptr); + + // update global load + if (iteration_length < iteration_length_threshold){ + if (dev_opt_info.operation_type == "Malloc"){ + if (global_load.size()>0){ + global_load.push_back(global_load[global_load.size()-1]+block_ptr->size()); + } else { + global_load.push_back(block_ptr->size()); + } + } else if (dev_opt_info.operation_type == "Free"){ + global_load.push_back(global_load[global_load.size()-1]-block_ptr->size()); + } else { + global_load.push_back(global_load[global_load.size()-1]); + } + } + + //append into vec_block + stringstream strm1; + strm1<size() != size_sequence[r_global_index]){ + async_swap_flag = 0; + cout<<"!!!! async_swap_flag changed back to 0"<PoolOpt(vec_block_mf); + } + +} + + + +void* SwapGPU::UpdateGpuPtr(const Block* block_ptr){ + /* + in case that block is not at device memory, swapIn ad hoc. + used in block class to update ptr after swap in done, if variable is not swapped back yet as expected. + */ + auto r_idx = table_not_at_device.find(block_ptr)->second; + cudaError_t err; + BlockMeta meta = table_meta.find(r_idx)->second; + cudaEventCreate (&meta.in_event); + void* ptr = nullptr; + pool_->Malloc((void**)&ptr, meta.size); + meta.data_ = ptr; + err = cudaMemcpyAsync(meta.data_,meta.cpu_ptr,meta.size,cudaMemcpyHostToDevice,meta.in_stream); + cudaEventRecord(meta.in_event,meta.in_stream); + cudaEventSynchronize(meta.out_event); + table_meta.find(r_idx)->second = meta; + + return ptr; +} + +void SwapGPU::SwapOut(const int idx){ + /* + memory copy asynchronously GPU -> CPU, and update meta. + */ + cudaError_t err; + BlockMeta meta = table_meta.find(idx)->second; + cudaEventCreate (&meta.out_event); + err = cudaMemcpyAsync(meta.cpu_ptr,meta.data_,meta.size,cudaMemcpyDeviceToHost,meta.out_stream); + cudaEventRecord(meta.out_event,meta.out_stream); + table_meta.find(idx)->second = meta; +} + +void SwapGPU::SwapIn(const int idx){ + /* + memory copy asynchronously CPU -> GPU, and update meta. + */ + + cudaError_t err; + BlockMeta meta = table_meta.find(idx)->second; + cudaEventCreate (&meta.in_event); + void* ptr = nullptr; + pool_->Malloc((void**)&ptr, meta.size); + + ///append vec_block_mf + if ((async_swap_flag == 1) && ((global_index - 4*iteration_length) < three_more_iteration_global_index_threshold) + && ((global_index - iteration_length) >= three_more_iteration_global_index_threshold)){ + string temp_str1 ="Malloc "; + stringstream strm2; + strm2<second = meta; +} + +void SwapGPU::SwapOutSynchronous(const Block* block_ptr){ + /* + for synchronous swap, collect speed info + */ + if (global_index < 1000 && block_ptr->size() > 1<<20) { + fstream file_block5("speed.csv", ios::in|ios::out|ios::app); + BlockMeta meta; + meta.data_ = meta.block_->get_data(); + void* temp_ptr = nullptr; + cudaMallocHost(&temp_ptr,block_ptr->size()); //pinned memory. + meta.cpu_ptr = temp_ptr; + table_block_meta[block_ptr] = meta; + auto t1 = (std::chrono::system_clock::now()).time_since_epoch().count(); + cudaError_t err; + err = cudaMemcpy(meta.cpu_ptr, meta.data_,block_ptr->size(),cudaMemcpyDeviceToHost); + auto t2 = (std::chrono::system_clock::now()).time_since_epoch().count(); + file_block5<<"Out "<size()<<' '<size() > 1<<20) { + fstream file_block5("speed.csv", ios::in|ios::out|ios::app); + BlockMeta meta = table_block_meta.find(block_ptr)->second; + auto t1 = (std::chrono::system_clock::now()).time_since_epoch().count(); + cudaError_t err; + err = cudaMemcpy(meta.data_, meta.cpu_ptr,block_ptr->size(),cudaMemcpyHostToDevice); + auto t2 = (std::chrono::system_clock::now()).time_since_epoch().count(); + file_block5<<"In "<size()<<' '< +#include //a. +#include +//for SmartMemoryPool +using namespace std; #ifdef USE_CUDA @@ -94,9 +98,12 @@ void CnMemPool::Malloc(void **ptr, const size_t size) { void CnMemPool::Free(void *ptr) { CHECK(initialized_) << "Cannot free the memory as the pool is not initialzied"; + // cout<<"to free ptr "<struct2.size); + } +}; + +struct sort_by_size_r_idx_descending{ + /* + sort PoolBlockLifeTime by descending size and r_idx + */ + inline bool operator() (const PoolBlockLifeTime& struct1, const PoolBlockLifeTime& struct2) + { + return ((struct1.size>struct2.size)||((struct1.size==struct2.size)&&(struct1.r_idx SplitString(string s, string delimiter) { + /// string delimiter + size_t pos_start = 0, pos_end, delim_len = delimiter.length(); + string token; + vector res; + while ((pos_end = s.find(delimiter, pos_start)) != string::npos) { + token = s.substr(pos_start, pos_end - pos_start); + pos_start = pos_end + delim_len; + res.push_back(token); + } + res.push_back(s.substr(pos_start)); + return res; +} + + +vector PoolOptSeqStrToStruct(vector vec, int &idx_range){ + /* + convert vector of string into vector of PoolOptInfo, + sorted by ptr and then idx, and update idx_range to pieceMsgVec size. + */ + vectorvec_pool_opt_info; + for (int i=0;i v = SplitString(vec[i], " "); + if (v[0]=="Malloc"){ + //convert v[2] from str to size_t + size_t result; + stringstream convert(v[2]); + if (!(convert>>result)){ + result =-1; + cout<<"error for converting size from str to int."<(vec_pool_opt_info.size()); + + return vec_pool_opt_info; +} + + +pair,vector> PoolOptInfoToBlockLifeTime(vectorvec_pool_opt_info, int idx_range){ + /* + convert vector of opt info into vector of block life time + return a pair of vectors: 1. normal blocks 2. cross-iteration blocks. + */ + vectorvec_block_life_time1; + vectorvec_block_life_time2; + int i=0; + + //while loop processes a pair at each time, if got a pair. + while (i<(vec_pool_opt_info.size()-1)){ + //condition: start with free. do nothing. + if (vec_pool_opt_info[i].operation_type==-1){ + i+=1; + } + //condition: start with Malloc, next item same ptr and is free. + if ((vec_pool_opt_info[i].operation_type==1)&& (vec_pool_opt_info[i+1].operation_type==-1)&&((vec_pool_opt_info[i].ptr==vec_pool_opt_info[i+1].ptr))){ + PoolBlockLifeTime temp_block_life_time(vec_pool_opt_info[i].idx,vec_pool_opt_info[i].size,vec_pool_opt_info[i].idx,vec_pool_opt_info[i+1].idx); + vec_block_life_time1.push_back(temp_block_life_time); + i+=2; + } + // condition: start with Malloc, no free. + if ((vec_pool_opt_info[i].operation_type==1)&&(vec_pool_opt_info[i].ptr!=vec_pool_opt_info[i+1].ptr)){ + PoolBlockLifeTime temp_block_life_time(vec_pool_opt_info[i].idx,vec_pool_opt_info[i].size,vec_pool_opt_info[i].idx,idx_range); + vec_block_life_time2.push_back(temp_block_life_time); + i+=1; + } + }//end of while + //condition: if still left with the last item + if ((i,vector>pair_vec_block_life_time(vec_block_life_time1,vec_block_life_time2); + + return pair_vec_block_life_time; +} + +///Section implementing coloring algorithm. +vector> MergeColoredSegments(vector> vec_color_preoccupied){ + /* + merge consecutive/overlapping segments of vec_color_preoccupied + input:the collection of color ranges that is once occupied by some block during a block's life time. + output: merged segments in ascending order. + time complexity: O(n) for run, O(n^2) for verify section(optional), where n is size of vec_color_preoccupied. + */ + sort(vec_color_preoccupied.begin(), vec_color_preoccupied.end()); + + if(vec_color_preoccupied.size()<=1){ + return vec_color_preoccupied; + } + + int m = 0; + while (m<(vec_color_preoccupied.size()-1)){ + if ((vec_color_preoccupied[m].second +2)> vec_color_preoccupied[m+1].first){ + pairtempItem(vec_color_preoccupied[m].first,max(vec_color_preoccupied[m].second,vec_color_preoccupied[m+1].second)); + //remove m+1 and m + vec_color_preoccupied.erase(vec_color_preoccupied.begin()+m+1); + vec_color_preoccupied.erase(vec_color_preoccupied.begin()+m); + //insert the combined range + vec_color_preoccupied.insert(vec_color_preoccupied.begin()+m,tempItem); + }else{ + m+=1; + } + }//end of while loop + + return vec_color_preoccupied; +} + + +pair FirstFitAllocation(vector> vec_color_merged,size_t size, size_t local_offset){ + /* + First Fit weighted coloring + return a pair standing for color_range. + local_offset shifts the returned color_range, allowing multiple Plan(). + local_offset not changable, whereas offset is changable. + */ + // condition: if no occupied, put after the local_offset + if (vec_color_merged.size()==0){ + return pair(0+local_offset,size-1+local_offset); + } + + // condition: able to fit before first block, after the local_offset + if ((size+local_offset)<(vec_color_merged[0].first+1)){ + return pair(0+local_offset,size-1+local_offset); + } + + size_t y_location= -1; + if (vec_color_merged.size()>1) { + int n = 0; + while (n<(vec_color_merged.size()-1)){ + // condition: able to fit in between middle blocks. + if ((vec_color_merged[n+1].first-vec_color_merged[n].second-1)>=size){ + y_location = vec_color_merged[n].second+1; + break; + } + n+=1; + }//end of while loop. + // condition: allocate after the last block. + if (y_location == -1){ + y_location = vec_color_merged[vec_color_merged.size()-1].second+1; + } + }// end of if loop, conditon C and D. + + // condition: colorMeger len =1, allocate after the last block. + if (vec_color_merged.size()==1){ + y_location = vec_color_merged[0].second+1; + } + + if (y_location==-1){ + cout<<"error in FirstFitAllocation!!!"<(y_location,y_location+size-1); +} + + +pair BestFitAllocation(vector> vec_color_merged,size_t size, size_t local_offset){ + /* + Best Fit allocation, input and output same as FirstFitAllocation + */ + // condition: if no occupied, put after the local_offset + if (vec_color_merged.size()==0){ + return pair(0+local_offset,size-1+local_offset); + } + //condition: if size=1, able to fit before the first block + if ((vec_color_merged.size()==1)&&((size+local_offset)<(vec_color_merged[0].first+1))){ + return pair(0+local_offset,size-1+local_offset); + } + + //condition: lese of second condition + if ((vec_color_merged.size()==1)&&((size+local_offset)>=(vec_color_merged[0].first+1))){ + return pair(vec_color_merged[0].second+1,vec_color_merged[0].second+size); + } + + size_t y_location=-1; + pairtemp_hole(-1,-1); // n, hole size between n and n+1 + if (vec_color_merged.size()>1) { + int n = 0; + while (n<(vec_color_merged.size()-1)){ + // condition: able to fit in between middle blocks. select smallest. + if (((vec_color_merged[n+1].first-vec_color_merged[n].second-1)>=size)&&((vec_color_merged[n+1].first-vec_color_merged[n].second-1)(y_location,y_location+size-1); +} + +vector AssignColorToVertices(vector vec_block_life_time, size_t &offset,string color_method){ + /* + color all or 1/2 vertices using MergeColoredSegments() and FirstFitAllocation(), with updated offset. + time complexity: O(n^2). + */ + size_t local_offset = offset; //feed into FirstFitAllocation, shall never change. + int m = static_cast(vec_block_life_time.size()); + //init all vertices + vectorvertices; + for (int i=0; i>vec_color_merged = MergeColoredSegments(vertices[i].vec_color_preoccupied); + + if(color_method=="FF"){ + vertices[i].color_range = FirstFitAllocation(vec_color_merged,vertices[i].size, local_offset); + + }else{ //BF + vertices[i].color_range = BestFitAllocation(vec_color_merged,vertices[i].size, local_offset); + } + + //update of offset, largest memory footprint as well. + if (vertices[i].color_range.second >=offset){ + offset = vertices[i].color_range.second+1; + } + }//end of for loop. + + return vertices; +} + + +pair,map> GetCrossIterationBlocks(vectorvec_double, int location_2nd_iteration, int iteration_length, int &double_range){ + ///get cross-iteration duration blocks + vectorvec_pool_opt_info2 = PoolOptSeqStrToStruct(vec_double,double_range); + pair,vector>pair_vec_block_life_time2=PoolOptInfoToBlockLifeTime(vec_pool_opt_info2,double_range); + + maptable_ridx_to_didx; //full duration info, cross-iteration duration. + maptable_didx_to_ridx; + for (int i=0;i,map>(table_ridx_to_didx,table_didx_to_ridx); +} + + +///Section of test functions. +vector PoolOptSeqRepeatableTestPreProcess(pair,vector>pair_vec_block_life_time){ + /* + pre process pair of vector of block life time info, for ease of repeatable test. + */ + vectorvec_block_life_time1 = pair_vec_block_life_time.first; + vectorvec_block_life_time2 = pair_vec_block_life_time.second; + vectorvec_pool_opt_simplified_info; + + //process Malloc and Free pair, i.e. normal blocks + for (int i =0; i(vec_block_life_time1[i].d_idx-vec_block_life_time1[i].r_idx); + PoolOptSimplifiedInfo tempIterF(temp_s_d,-1,vec_block_life_time1[i].d_idx); + vec_pool_opt_simplified_info.push_back(tempIterF); + } + + //process Malloc-only blocks, i.e. cross-iteration blocks + for (int i =0; ivec_rep; // vector of size_delta, name it as vec_rep for simlisity. + for (int i =0; i PoolRepeatableTest(vectorrep, int idx_range, int &iteration_length, int &location_2nd_iteration){ + /* + get max repeated non-overlapping Seg of a vector, return the repeated segment, + update iteration_length, and location_2nd_iteration of where Seg starts to repeat. + brtue force method using equal() + time complexity O(n^2) + */ + for (int i=0; isub_sequence(&rep[location_2nd_iteration],&rep[location_2nd_iteration+iteration_length]); + if(!(equal(rep.begin()+location_2nd_iteration,rep.begin()+iteration_length-1+location_2nd_iteration,sub_sequence.begin()) && equal(rep.begin()+location_2nd_iteration+iteration_length,rep.begin()+2*iteration_length-1+location_2nd_iteration,sub_sequence.begin()))){ + cout<<"error in get the maxRep"<sub_sequence, int &iteration_length, int &location_2nd_iteration){ + /* + to cut, in case the repeated Segment returned by PoolRepeatableTest contains multiple iterations. + */ + int temp_iteration_length = 0; + int temp_location_2nd_iteration = 0; + int temp_idx_range = iteration_length; + + //verify by testing its subsequence again + vectortempsub_sequence = PoolRepeatableTest(sub_sequence,temp_idx_range,temp_iteration_length, temp_location_2nd_iteration); + + //tunable threshold. + int threshold = 50; + + if (temp_iteration_length>threshold){ + iteration_length = temp_iteration_length; + location_2nd_iteration += temp_location_2nd_iteration; + } +} + + +///verify if coloring got overlapping +void OverlapVerification(vector vertices){ + size_t s = vertices.size(); + int i,j; + for (i=0; ivec_string_test, int &iteration_length, int &location_2nd_iteration){ + /* + Testing repeatability from raw operation sequence + returns global_index_threshold, which is when flag shall be switched, + update iteration_length and location_2nd_iteration of where the repeated Seg starts. + */ + int idx_range_test=0; + vectorvec_pool_opt_info3 = PoolOptSeqStrToStruct(vec_string_test,idx_range_test); + pair,vector>pair_vec_block_life_time = PoolOptInfoToBlockLifeTime(vec_pool_opt_info3,idx_range_test); + vectorvec_rep = PoolOptSeqRepeatableTestPreProcess(pair_vec_block_life_time); + + //repeatable test with verification + vectorsub_sequence = PoolRepeatableTest(vec_rep,idx_range_test,iteration_length,location_2nd_iteration); + VerifyRepeatableTest(sub_sequence, iteration_length, location_2nd_iteration); + + //update global_index_threshold if test past, i.e. iteration_length exceed certain threshold + if (iteration_length>100){ //tunable threshold. + global_index_threshold = idx_range_test+iteration_length-(idx_range_test-location_2nd_iteration)%iteration_length; + } + return global_index_threshold; +} + + +/// main run funtion +vector SmartMemPool::Plan(vectorvec, int &idx_range, size_t &offset, size_t &offset_cross_iteration,string color_method){ + /* + Planning, i.e. Assign Color to Vertices from raw operation sequence info. + input vector of strings, return colored vertices, + update idx_range, offset. + time complexity: O(n^2) where n is iteration_length. + */ + + vectorvec_pool_opt_info = PoolOptSeqStrToStruct(vec,idx_range); + pair,vector>pair_vec_block_life_time=PoolOptInfoToBlockLifeTime(vec_pool_opt_info,idx_range); + + //coloring normal blocks and cross-iteration blocks separately, cannot be miss-matched. + vectorvec_block_life_time1 = pair_vec_block_life_time.first; + vectorvec_block_life_time2 = pair_vec_block_life_time.second; + + //color cross-iteration blocks + vectorvertices_2 = AssignColorToVertices(vec_block_life_time2,offset,color_method); + + for (int i=0; ivertices = AssignColorToVertices(vec_block_life_time1,offset,color_method); + + //merge after coloring + vertices.insert(vertices.end(),vertices_2.begin(),vertices_2.end()); + + return vertices; +} + + +///Malloc +void SmartMemPool::Malloc(void** ptr, const size_t size){ + /* + 1. switch flag when global_index == global_index_threshold, construct lookup table and malloc the whole pool. + 2. if flag=0, malloc/cudaMalloc, collect vec string + 3. if flag=1, look up table, malloc/cudaMalloc if not in the Table + 4. test repeated sequence every 100 blocks, update global_index_threshold. + */ + + if (!initialized_){ + Init(); + } + + void* allocated_ptr = NULL; //ptr to be returned + + /// 1. switch flag when global_index == global_index_threshold, construct lookup table and malloc the whole pool. + if (global_index == global_index_threshold){ + + malloc_flag = 1; + vectorvec_raw_opt_info(&vec[location_2nd_iteration],&vec[location_2nd_iteration+iteration_length]); + + //color vertices + vectorvertices = Plan(vec_raw_opt_info,idx_range,offset,offset_cross_iteration,color_method); + + //here to verify if the coloring got overlapping. for verify purpose only. + //OverlapVerification(vertices); + + //obtain the cross-iteration duration info + int double_range=0; + vectorvec_double(&vec[location_2nd_iteration],&vec[location_2nd_iteration+2*iteration_length]); + pair,map>pairs =GetCrossIterationBlocks(vec_double,location_2nd_iteration,iteration_length,double_range); + table_ridx_to_didx = pairs.first; + table_didx_to_ridx = pairs.second; + + //make pool + cudaMalloc(&ptr_pool,offset); //poolSize or memory foot print offset. + + //make vec_block_meta for lookup purpose after pool is constructed + for (int i=0; isecond; + temp.size =vertices[i].size; + temp.offset=vertices[i].color_range.first; + temp.ptr = (void*)((char*)ptr_pool+temp.offset*sizeof(char)); + temp.occupied =0; + temp.cross_iteration = vertices[i].cross_iteration; + temp.occupied_backup =0; + //build tables for lookup. + vec_block_meta[vertices[i].r].second= temp; + } + } + /// 2. if flag=0, malloc/cudaMalloc, accumulate vec_info at the beginning iterations. + if(malloc_flag ==0){ + cudaMalloc(ptr, size); + allocated_ptr = *ptr; + //update load + if(load_flag==1){ + if (global_index>0){ + table_load[global_index]=make_pair(table_load.find(global_index-1)->second.first+size,table_load.find(global_index-1)->second.second); + }else{ //very first block + table_load[global_index]=make_pair(size,0); + } + } + //push_back the string for later test and run. + string temp_str1 ="Malloc "; + stringstream strm2; + strm2<second.first,table_load.find(global_index-1)->second.second+size); + } + }else if ((vec_block_meta[lookup_idx].second.cross_iteration==1) && (vec_block_meta[lookup_idx].second.occupied==1) && (vec_block_meta[lookup_idx].second.occupied_backup ==0)) { + //condition: cross_iteration's backup + allocated_ptr = (void*)((char*)vec_block_meta[lookup_idx].second.ptr+offset_cross_iteration*sizeof(char)); + vec_block_meta[lookup_idx].second.occupied_backup=1; + table_ptr_to_ridx[allocated_ptr]=lookup_idx; + //update load + if(load_flag==1){ + table_load[global_index]=make_pair(table_load.find(global_index-1)->second.first,table_load.find(global_index-1)->second.second+size); + } + } + }else{ + //condition: size not proper or both occupied. + cudaMalloc(ptr, size); + allocated_ptr = *ptr; + //update load + if(load_flag==1){ + table_load[global_index]=make_pair(table_load.find(global_index-1)->second.first+size,table_load.find(global_index-1)->second.second); + } + } + } //end of loop for flag=1 + + ///4. test repeated sequence every 300 index, update global_index_threshold. + if (((global_index+1)%300==0) && (malloc_flag ==0) && (global_index_threshold==-1)&&(global_index+2>check_point)){ + global_index_threshold = Detection(vec,iteration_length,location_2nd_iteration); + check_point=check_point*2; + } + + ///get load info, when global_index == global_index+2iteration_length + if (global_index==(global_index_threshold+2*iteration_length)&& (global_index_threshold>0)){ + GetMaxLoad(); + load_flag=0; + } + + global_index++; + //update it for load tracking purpose. + table_ptr_to_size[allocated_ptr]=size; + + //update *ptr + *ptr = allocated_ptr; + + ///update block_RWMF + string temp_str1 ="Malloc "; + stringstream strm2; + strm2<(std::chrono::system_clock::now().time_since_epoch()).count(); + stringstream strm4; + strm4<second; + + /// at the begining iterations, via cudaFree, accumulate opt info. + if ((global_index_threshold==-1)||(global_indexsecond.first-deallocatedSize,table_load.find(global_index-1)->second.second); + } + // before flag switch, for sure all free shall be done by free() + cudaFree(ptr); + }else{ + /// cases that no need accumulating opt info + + /// free a ptr that is in the memory pool + if (!(table_ptr_to_ridx.find(ptr)==table_ptr_to_ridx.end())){ + int resp_rIdx = table_ptr_to_ridx.find(ptr)->second; + table_ptr_to_ridx.erase(ptr); + + if (ptr == vec_block_meta[resp_rIdx].second.ptr){ + vec_block_meta[resp_rIdx].second.occupied =0; //freed, able to allocate again. + }else if (ptr == (void*)((char*)vec_block_meta[resp_rIdx].second.ptr+offset_cross_iteration*sizeof(char))){ + vec_block_meta[resp_rIdx].second.occupied_backup =0; + } else{ + if (((float)((char*)ptr-((char*)ptr_pool+offset_cross_iteration*sizeof(char)))>0) && ((float)((char*)ptr-((char*)ptr_pool+2*offset_cross_iteration*sizeof(char)))<0)){ + vec_block_meta[resp_rIdx].second.occupied_backup =0; + }else{ + vec_block_meta[resp_rIdx].second.occupied =0; + } + } + //update load + if(load_flag==1){ + table_load[global_index]=make_pair(table_load.find(global_index-1)->second.first,table_load.find(global_index-1)->second.second-deallocatedSize); + } + }else{ + /// free a ptr that is NOT in the memory pool + + //update load + if(load_flag==1){ + table_load[global_index]=make_pair(table_load.find(global_index-1)->second.first-deallocatedSize,table_load.find(global_index-1)->second.second); + } + cudaFree(ptr); + } + + } + + global_index++; + + ///update block_RWMF + string temp_str1 ="Free "; + stringstream strm2; + strm2<(std::chrono::system_clock::now().time_since_epoch()).count(); + stringstream strm4; + strm4<vec_load_log; + for (int i=0; isecond.first); + } + size_t max_cuda_load = *max_element(vec_load_log.begin(),vec_load_log.end()); + int idx_max_cuda_load = static_cast(distance(vec_load_log.begin(),max_element(vec_load_log.begin(),vec_load_log.end()))); + + vectorvec_color_load; + for (int i=0; isecond.second); + } + size_t max_color_load = *max_element(vec_color_load.begin(),vec_color_load.end()); + int idx_max_color_load = static_cast(distance(vec_color_load.begin(),max_element(vec_color_load.begin(),vec_color_load.end()))); + size_t offset_color_load = table_load.find(idx_max_color_load)->second.first; + + max_total_load = max(max_cuda_load,max_color_load+offset_color_load); + max_mem_usage = max(max_cuda_load,offset+offset_color_load); + +} + +std::pair SmartMemPool::GetMemUsage() { + //note here the pair is different from that of CnMemPool. + return std::make_pair(max_mem_usage, max_total_load); +} + +void SmartMemPool::Append(string blockInfo) { + vec_block_rw.push_back(blockInfo); + vec_block_rw_mf.push_back(blockInfo); +} + +///SwapPool +SwapPool::SwapPool(const MemPoolConf &conf){ + conf_ = conf; +} + +void SwapPool::Init(){ + + mtx_.lock(); + if(!initialized_){ + initialized_ =true; + } + mtx_.unlock(); +} + + +void SwapPool::PoolOpt(vector &vec_mf) { + + vectorvec_pool_opt_info; + iteration_length_mf = vec_mf.size()/3; //cos input vec_mf is of 3 iteration + + //convert raw opt info into struct: PoolOptInfo + for (int i = 0;i < vec_mf.size();i++){ + vector v = SplitString(vec_mf[i], " "); + + if (v[0]=="Malloc"){ + size_t result; + stringstream convert(v[2]); + if (!(convert>>result)){ + result = -1; + cout<<"error for converting size from str to int."<vec_block_life_time; + int i = 0; + + while (i<(vec_pool_opt_info.size()-1)){ + + if (vec_pool_opt_info[i].operation_type==-1){ + //condition: start with free. do nothing. + i+=1; + } else { + if ((vec_pool_opt_info[i].operation_type==1)&& (vec_pool_opt_info[i+1].operation_type==-1) + &&((vec_pool_opt_info[i].ptr==vec_pool_opt_info[i+1].ptr))){ + //condition: start with Malloc, next item same ptr and is free. + if ((vec_pool_opt_info[i].idx >=0 && vec_pool_opt_info[i].idx =0 && vec_pool_opt_info[i+1].idx (vec_block_life_time.size()); + vectorvertices; + for (int i=0; i>vec_color_merged = MergeColoredSegments(vertices[i].vec_color_preoccupied); + + // vertices[i].color_range = FirstFitAllocation(vec_color_merged,vertices[i].size, local_offset); + vertices[i].color_range = BestFitAllocation(vec_color_merged,vertices[i].size, offset); + + //update of offset, largest memory footprint as well. + if (vertices[i].color_range.second >=offset){ + offset = vertices[i].color_range.second+1; + } + }//end of for loop. + + //delete adj, the edges + for (int i=0; isecond.size))){ + //not in table of negative r_idx + cudaError_t status = cudaMalloc(ptr, size); + CHECK_EQ(status, cudaError_t::cudaSuccess); + } else{ + //in the table of negative r_idx + auto temp_meta = table_pool_meta.find(pool_index - iteration_length_mf)->second; + allocated_ptr = temp_meta.ptr; + *ptr = allocated_ptr; + table_ptr_to_ridx[allocated_ptr]=pool_index - iteration_length_mf; + + } + } else{ + //8 9 10th iteration + int r_pool_index = pool_index%iteration_length_mf; + if ((table_pool_meta.find(r_pool_index) == table_pool_meta.end()) || (!(size == table_pool_meta.find(r_pool_index)->second.size))){ + //not here, should be abnormal + cudaError_t status = cudaMalloc(ptr, size); + CHECK_EQ(status, cudaError_t::cudaSuccess); + } else{ + //in the table + auto temp_meta = table_pool_meta.find(r_pool_index)->second; + allocated_ptr = temp_meta.ptr; + *ptr = allocated_ptr; + table_ptr_to_ridx[allocated_ptr]=r_pool_index; + } + } + } + + pool_index++; + } + + +void SwapPool::Free(void *ptr) { + if (pool_flag == 0){ + cudaError_t status = cudaFree(ptr); + CHECK_EQ(status, cudaError_t::cudaSuccess); + } else{ + if (table_ptr_to_ridx.find(ptr)==table_ptr_to_ridx.end()){ + cudaError_t status = cudaFree(ptr); + CHECK_EQ(status, cudaError_t::cudaSuccess); + } + } + +} + +void SwapPool::Append(string blockInfo) { + //NA +} + + +void GetMaxLoad (){ + //empty +} + +std::pair SwapPool::GetMemUsage() { + //empty + return std::make_pair(0, 0); +} + +SwapPool::~SwapPool(){ + //NA +} + } -#endif #endif + +#endif \ No newline at end of file