$\newcommand{\O}{\mathrm{O}}$
クイックソートの並列計算アルゴリズム.
(簡単版) より (高速版) の方が 30 %ほど高速.
(関数)
parallel_quick_sort$(first, last, comp)$ : 通常の std::sort のように呼び出す(comp は optional で default は std::less).
※ thread 関連のライブラリを用いているのでコンパイルオプションとして -pthread を指定する.
template<typename BidirectionalIterator, class Compare> class ParallelQuickSortSolver { private: const BidirectionalIterator first, last; const Compare comp; std::mt19937 mt; static const unsigned long THRESHOLD = (1u << 14); void parallel_partial_sort(const BidirectionalIterator _first, const BidirectionalIterator _last); public: ParallelQuickSortSolver(const BidirectionalIterator _first, const BidirectionalIterator _last, const Compare _comp) : first(_first), last(_last), comp(_comp), mt(std::random_device{}()){} void operator()() { parallel_partial_sort(first, last); } }; template<typename BidirectionalIterator, class Compare> void ParallelQuickSortSolver<BidirectionalIterator, Compare>::parallel_partial_sort (const BidirectionalIterator _first, const BidirectionalIterator _last) { const unsigned long length = std::distance(_first, _last); if(length <= THRESHOLD) { if(length >= 2) std::sort(_first, _last); return; } std::uniform_int_distribution<> uid(0, length-1); const auto pivot_itr = std::next(_first, uid(mt)); const auto pivot = *pivot_itr; std::iter_swap(_first, pivot_itr); const BidirectionalIterator middle = std::partition(_first + 1, _last, std::bind(comp, std::placeholders::_1, std::cref(pivot))); std::iter_swap(_first, middle - 1); auto fut = std::async(std::launch::async, &ParallelQuickSortSolver<BidirectionalIterator, Compare>::parallel_partial_sort, this, _first, middle - 1); parallel_partial_sort(middle, _last); fut.wait(); } template<typename BidirectionalIterator, class Compare=std::less<typename BidirectionalIterator::value_type> > void parallel_quick_sort(const BidirectionalIterator first, const BidirectionalIterator last, const Compare comp=Compare()) { ParallelQuickSortSolver<BidirectionalIterator, Compare> solver(first, last, comp); solver(); }
template<typename RandomAccessIterator, class UnaryPredicate> class ParallelPartitionSolver { private: const RandomAccessIterator first, last; const UnaryPredicate func; const unsigned length; const unsigned THRESHOLD; class BlockCounter { private: const unsigned leftBlock = 1 << 16; const unsigned rightBlock = 1; const unsigned mask = leftBlock - 1; const unsigned min_blockSize = 1024; const unsigned max_blockCount = (1 << 15) - 1; public: const unsigned block_size, block_count; std::atomic<unsigned> counter; BlockCounter(unsigned length) : block_size(std::max((length - 1) / max_blockCount + 1, min_blockSize)), block_count(length / block_size), counter(0){} bool get_left_block(int& left) { int ignore; return get_block(leftBlock, left, ignore); } bool get_right_block(int& right) { int ignore; return get_block(rightBlock, ignore, right); } bool get_block(const unsigned block, int& left, int& right) { const unsigned value = counter.fetch_add(block, std::memory_order_acq_rel) + block; left = (value >> 16); right = (value & mask); return left-- + right-- <= (int)block_count; } }; class ChunkResult { public: int left_remain, right_remain, leftMost_blocks, rightMost_blocks; ChunkResult() : left_remain(std::numeric_limits<int>::max()), right_remain(std::numeric_limits<int>::max()) , leftMost_blocks(-1), rightMost_blocks(-1){} }; BlockCounter counter; std::vector<ChunkResult> remain; int arrange_blocks (RandomAccessIterator& leftFrom, const RandomAccessIterator leftTo, RandomAccessIterator& rightFrom, const RandomAccessIterator rightTo); auto arrange_chunk(const unsigned index); inline void swapBlocks(RandomAccessIterator from, RandomAccessIterator to, const int blockSize); const int arrange_leftBlocks(); const int arrange_rightBlocks(); public: ParallelPartitionSolver (const RandomAccessIterator _first, const RandomAccessIterator _last, const UnaryPredicate _func, const unsigned _THRESHOLD) : first(_first), last(_last), func(_func), length(std::distance(_first, _last)), THRESHOLD(_THRESHOLD), counter(length){} RandomAccessIterator operator()(); }; template<typename RandomAccessIterator, class UnaryPredicate> int ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_blocks (RandomAccessIterator& leftFrom, const RandomAccessIterator leftTo, RandomAccessIterator& rightFrom, const RandomAccessIterator rightTo) { while(leftFrom < leftTo && rightFrom < rightTo) { while(func(*leftFrom) && ++leftFrom < leftTo); while(!func(*rightFrom) && ++rightFrom < rightTo); if(leftFrom == leftTo || rightFrom == rightTo) break; std::iter_swap(leftFrom, rightFrom); ++leftFrom, ++rightFrom; } if(leftFrom == leftTo && rightFrom == rightTo) return 0; if(leftFrom == leftTo) return -1; return 1; } template<typename RandomAccessIterator, class UnaryPredicate> auto ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_chunk(const unsigned index) { int localLeftMost = -1, localRightMost = -1; int leftBlock = localLeftMost, rightBlock = localRightMost; RandomAccessIterator leftFrom, rightFrom; RandomAccessIterator leftTo = leftFrom, rightTo = rightFrom; unsigned block_size = counter.block_size; int result = 0; while(true) { if(result <= 0) { localLeftMost = leftBlock; if(!counter.get_left_block(leftBlock)) break; leftFrom = first + leftBlock * block_size; leftTo = leftFrom + block_size; } if(result >= 0) { localRightMost = rightBlock; if(!counter.get_right_block(rightBlock)) break; rightTo = last - rightBlock * block_size; rightFrom = rightTo - block_size; } result = arrange_blocks(leftFrom, leftTo, rightFrom, rightTo); } ChunkResult info; if(leftFrom != leftTo) info.left_remain = leftBlock; if(rightFrom != rightTo) info.right_remain = rightBlock; info.leftMost_blocks = localLeftMost, info.rightMost_blocks = localRightMost; return info; } template<typename RandomAccessIterator, class UnaryPredicate> inline void ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::swapBlocks (RandomAccessIterator from, RandomAccessIterator to, const int blockSize) { for(int i = 0; i < blockSize; ++i, ++from, ++to) { std::iter_swap(from, to); } } template<typename RandomAccessIterator, class UnaryPredicate> const int ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_leftBlocks() { const int blockSize = counter.block_size; int j = -1; int leftmostBlock = std::max_element(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2) { return arg1.leftMost_blocks < arg2.leftMost_blocks; })->leftMost_blocks; std::sort(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2) { return arg1.left_remain < arg2.left_remain; }); for(unsigned i = 0; i < remain.size(); ++i) if(remain[i].left_remain < leftmostBlock) j = i; for(unsigned i = 0; i < remain.size() && remain[i].left_remain <= leftmostBlock;){ if(remain[j].left_remain == leftmostBlock) { --j; } else { swapBlocks(first + remain[i].left_remain * blockSize, first + leftmostBlock * blockSize, blockSize); ++i; } --leftmostBlock; } return leftmostBlock; } template<typename RandomAccessIterator, class UnaryPredicate> const int ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_rightBlocks() { const int blockSize = counter.block_size; int j = -1; int rightmostBlock = std::max_element(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2) { return arg1.rightMost_blocks < arg2.rightMost_blocks; })->rightMost_blocks; std::sort(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2) { return arg1.right_remain < arg2.right_remain; }); for(unsigned i = 0; i < remain.size(); ++i) if(remain[i].right_remain < rightmostBlock) j = i; for(unsigned i = 0; i < remain.size() && remain[i].right_remain <= rightmostBlock;) { if(remain[j].right_remain == rightmostBlock) { --j; } else { swapBlocks(last - (remain[i].right_remain + 1) * blockSize, last - (rightmostBlock + 1) * blockSize, blockSize); ++i; } --rightmostBlock; } return rightmostBlock; } template<typename RandomAccessIterator, class UnaryPredicate> RandomAccessIterator ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::operator()() { if(length <= THRESHOLD) return std::partition(first, last, func); const unsigned block_count = counter.block_count; const unsigned block_size = counter.block_size; const unsigned workerCount = std::min(std::thread::hardware_concurrency(), block_count / 2); std::vector<std::future<ChunkResult> > fut; for(unsigned i = 0; i < workerCount; ++i) { fut.push_back(std::async(&ParallelPartitionSolver::arrange_chunk, this, i)); } for(auto& entry : fut) { remain.push_back(entry.get()); } const int leftMostBlock = arrange_leftBlocks(); const int rightMostBlock = arrange_rightBlocks(); return std::partition(first + (leftMostBlock + 1) * block_size, last - (rightMostBlock + 1) * block_size, func); } template<typename RandomAccessIterator, class UnaryPredicate> RandomAccessIterator parallel_partition (const RandomAccessIterator first, const RandomAccessIterator last, const UnaryPredicate func, const unsigned THRESHOLD) { ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate> solver(first, last, func, THRESHOLD); return solver(); } template<typename T> class ThreadsafeQueue { private: class Node { public: std::shared_ptr<T> data; std::unique_ptr<Node> next; }; std::mutex head_mutex; std::unique_ptr<Node> head; std::mutex tail_mutex; Node* tail; std::condition_variable data_cond; std::atomic<bool> complete; Node* get_tail() { std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; } void pop_head() { std::unique_ptr<Node> old_head = std::move(head); head = std::move(old_head->next); } public: ThreadsafeQueue() : head(new Node), tail(head.get()), complete(false){} void push(const T new_value) { std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value))); std::unique_ptr<Node> p(new Node); { std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; Node* const new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } data_cond.notify_one(); } bool wait_and_pop(T& value, std::atomic<unsigned>& num) { std::unique_lock<std::mutex> head_lock(head_mutex); if(locked_empty() && num.load(std::memory_order_acquire) == 0) { finish(); return false; } bool check; data_cond.wait(head_lock, [&] { return (check = !locked_empty()) || (num.load(std::memory_order_acquire) == 0); }); if(!check) { finish(); return false; } num.fetch_add(1, std::memory_order_acq_rel); value = std::move(*head->data); pop_head(); return true; } void finish() { if(!complete.load(std::memory_order_acquire)) { complete.store(true, std::memory_order_release); data_cond.notify_all(); } } bool locked_empty() { return (head.get() == get_tail()); } }; template<typename RandomAccessIterator, class Compare> class ParallelQuickSortSolver { private: const RandomAccessIterator first, last; const Compare comp; ThreadsafeQueue<std::pair<RandomAccessIterator, RandomAccessIterator> > task_queue; std::mt19937 mt; std::atomic<unsigned> actthread_num; const unsigned total_length; const unsigned THRESHOLD; void getTask_and_run(); void parallel_partial_sort(const RandomAccessIterator _first, const RandomAccessIterator _last); public: ParallelQuickSortSolver(const RandomAccessIterator _first, const RandomAccessIterator _last, const Compare _comp) : first(_first), last(_last), comp(_comp), task_queue(), mt(std::random_device{}()), actthread_num(0), total_length(std::distance(first, last)), THRESHOLD(std::max(total_length / 200, (1u << 14))) { task_queue.push(std::make_pair(first, last)); } void operator()(); }; template<typename RandomAccessIterator, class Compare> void ParallelQuickSortSolver<RandomAccessIterator, Compare>::getTask_and_run() { std::pair<RandomAccessIterator, RandomAccessIterator> task; while(task_queue.wait_and_pop(task, actthread_num)) { parallel_partial_sort(task.first, task.second); } } template<typename RandomAccessIterator, class Compare> void ParallelQuickSortSolver<RandomAccessIterator, Compare>::parallel_partial_sort(const RandomAccessIterator _first, const RandomAccessIterator _last) { const unsigned length = std::distance(_first, _last); if(length <= THRESHOLD) { if(length >= 2) std::sort(_first, _last); actthread_num.fetch_sub(1, std::memory_order_acq_rel); return; } std::uniform_int_distribution<> uid(0, length-1); const RandomAccessIterator pivot_itr = std::next(_first, uid(mt)); const auto pivot = *pivot_itr; std::iter_swap(_first, pivot_itr); const RandomAccessIterator middle = parallel_partition(_first + 1, _last, std::bind(comp, std::placeholders::_1, std::cref(pivot)), std::max(total_length / 4, (1u << 17))); std::iter_swap(_first, middle - 1); task_queue.push(std::make_pair(_first, middle - 1)); parallel_partial_sort(middle, _last); } template<typename RandomAccessIterator, class Compare> void ParallelQuickSortSolver<RandomAccessIterator, Compare>::operator()() { const unsigned hardware_threads = std::thread::hardware_concurrency(); std::vector<std::future<void> > fut; for(unsigned i = 0; i < hardware_threads; ++i) { fut.push_back(std::async(&ParallelQuickSortSolver<RandomAccessIterator, Compare>::getTask_and_run, this)); } for(auto& entry : fut) { entry.get(); } } template<typename RandomAccessIterator, class Compare=std::less<typename RandomAccessIterator::value_type> > void parallel_quick_sort(const RandomAccessIterator first, const RandomAccessIterator last, const Compare comp=Compare()) { ParallelQuickSortSolver<RandomAccessIterator, Compare> solver(first, last, comp); solver(); }
verify していません(verify 問題を知らない)
上記の 2 つのコードは手元でかなりテストはした.