$\newcommand{\O}{\mathrm{O}}$ My Algorithm : kopricky アルゴリズムライブラリ

kopricky アルゴリズムライブラリ

Parallel Quick Sort

コードについての説明(個人的メモ)

クイックソートの並列計算アルゴリズム.
(簡単版) より (高速版) の方が 30 %ほど高速.

(関数)
parallel_quick_sort$(first, last, comp)$ : 通常の std::sort のように呼び出す(comp は optional で default は std::less).
※ thread 関連のライブラリを用いているのでコンパイルオプションとして -pthread を指定する.

コード(簡単版)

template<typename BidirectionalIterator, class Compare>
class ParallelQuickSortSolver {
private:
    const BidirectionalIterator first, last;
    const Compare comp;
    std::mt19937 mt;
    static const unsigned long THRESHOLD = (1u << 14);
    void parallel_partial_sort(const BidirectionalIterator _first, const BidirectionalIterator _last);

public:
    ParallelQuickSortSolver(const BidirectionalIterator _first, const BidirectionalIterator _last, const Compare _comp)
        : first(_first), last(_last), comp(_comp), mt(std::random_device{}()){}
    void operator()()
    {
        parallel_partial_sort(first, last);
    }
};


template<typename BidirectionalIterator, class Compare>
void ParallelQuickSortSolver<BidirectionalIterator, Compare>::parallel_partial_sort
    (const BidirectionalIterator _first, const BidirectionalIterator _last)
{
    const unsigned long length = std::distance(_first, _last);
    if(length <= THRESHOLD)
    {
        if(length >= 2)
            std::sort(_first, _last);
        return;
    }
    std::uniform_int_distribution<> uid(0, length-1);
    const auto pivot_itr = std::next(_first, uid(mt));
    const auto pivot = *pivot_itr;
    std::iter_swap(_first, pivot_itr);
    const BidirectionalIterator middle = std::partition(_first + 1, _last, std::bind(comp, std::placeholders::_1, std::cref(pivot)));
    std::iter_swap(_first, middle - 1);
    auto fut = std::async(std::launch::async, &ParallelQuickSortSolver<BidirectionalIterator, Compare>::parallel_partial_sort,
                            this, _first, middle - 1);
    parallel_partial_sort(middle, _last);
    fut.wait();
}

template<typename BidirectionalIterator, class Compare=std::less<typename BidirectionalIterator::value_type> >
void parallel_quick_sort(const BidirectionalIterator first, const BidirectionalIterator last, const Compare comp=Compare())
{
    ParallelQuickSortSolver<BidirectionalIterator, Compare> solver(first, last, comp);
    solver();
}

コード(高速版)

template<typename RandomAccessIterator, class UnaryPredicate>
class ParallelPartitionSolver
{
private:
    const RandomAccessIterator first, last;
    const UnaryPredicate func;
    const unsigned length;
    const unsigned THRESHOLD;

    class BlockCounter
    {
    private:
        const unsigned leftBlock = 1 << 16;
        const unsigned rightBlock = 1;
        const unsigned mask = leftBlock - 1;
        const unsigned min_blockSize = 1024;
        const unsigned max_blockCount = (1 << 15) - 1;

    public:
        const unsigned block_size, block_count;
        std::atomic<unsigned> counter;
        BlockCounter(unsigned length)
            : block_size(std::max((length - 1) / max_blockCount + 1, min_blockSize)),
                block_count(length / block_size), counter(0){}
        bool get_left_block(int& left)
        {
            int ignore;
            return get_block(leftBlock, left, ignore);
        }
        bool get_right_block(int& right)
        {
            int ignore;
            return get_block(rightBlock, ignore, right);
        }
        bool get_block(const unsigned block, int& left, int& right)
        {
            const unsigned value = counter.fetch_add(block, std::memory_order_acq_rel) + block;
            left = (value >> 16);
            right = (value & mask);
            return left-- + right-- <= (int)block_count;
        }
    };

    class ChunkResult
    {
    public:
        int left_remain, right_remain, leftMost_blocks, rightMost_blocks;
        ChunkResult() : left_remain(std::numeric_limits<int>::max()), right_remain(std::numeric_limits<int>::max())
            , leftMost_blocks(-1), rightMost_blocks(-1){}
    };

    BlockCounter counter;
    std::vector<ChunkResult> remain;

    int arrange_blocks
        (RandomAccessIterator& leftFrom, const RandomAccessIterator leftTo, RandomAccessIterator& rightFrom, const RandomAccessIterator rightTo);
    auto arrange_chunk(const unsigned index);
    inline void swapBlocks(RandomAccessIterator from, RandomAccessIterator to, const int blockSize);
    const int arrange_leftBlocks();
    const int arrange_rightBlocks();

public:
    ParallelPartitionSolver
        (const RandomAccessIterator _first, const RandomAccessIterator _last, const UnaryPredicate _func, const unsigned _THRESHOLD)
        : first(_first), last(_last), func(_func), length(std::distance(_first, _last)), THRESHOLD(_THRESHOLD), counter(length){}
    RandomAccessIterator operator()();
};

template<typename RandomAccessIterator, class UnaryPredicate>
int ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_blocks
    (RandomAccessIterator& leftFrom, const RandomAccessIterator leftTo, RandomAccessIterator& rightFrom, const RandomAccessIterator rightTo)
{
    while(leftFrom < leftTo && rightFrom < rightTo)
    {
        while(func(*leftFrom) && ++leftFrom < leftTo);
        while(!func(*rightFrom) && ++rightFrom < rightTo);
        if(leftFrom == leftTo || rightFrom == rightTo)
            break;
        std::iter_swap(leftFrom, rightFrom);
        ++leftFrom, ++rightFrom;
    }
    if(leftFrom == leftTo && rightFrom == rightTo)
        return 0;
    if(leftFrom == leftTo)
        return -1;
    return 1;
}

template<typename RandomAccessIterator, class UnaryPredicate>
auto ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_chunk(const unsigned index)
{
    int localLeftMost = -1, localRightMost = -1;
    int leftBlock = localLeftMost, rightBlock = localRightMost;
    RandomAccessIterator leftFrom, rightFrom;
    RandomAccessIterator leftTo = leftFrom, rightTo = rightFrom;
    unsigned block_size = counter.block_size;
    int result = 0;
    while(true)
    {
        if(result <= 0)
        {
            localLeftMost = leftBlock;
            if(!counter.get_left_block(leftBlock))
                break;
            leftFrom = first + leftBlock * block_size;
            leftTo = leftFrom + block_size;
        }
        if(result >= 0)
        {
            localRightMost = rightBlock;
            if(!counter.get_right_block(rightBlock))
                break;
            rightTo = last - rightBlock * block_size;
            rightFrom = rightTo - block_size;
        }
        result = arrange_blocks(leftFrom, leftTo, rightFrom, rightTo);
    }
    ChunkResult info;
    if(leftFrom != leftTo)
        info.left_remain = leftBlock;
    if(rightFrom != rightTo)
        info.right_remain = rightBlock;
    info.leftMost_blocks = localLeftMost, info.rightMost_blocks = localRightMost;
    return info;
}

template<typename RandomAccessIterator, class UnaryPredicate>
inline void ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::swapBlocks
    (RandomAccessIterator from, RandomAccessIterator to, const int blockSize)
{
    for(int i = 0; i < blockSize; ++i, ++from, ++to)
    {
        std::iter_swap(from, to);
    }
}

template<typename RandomAccessIterator, class UnaryPredicate>
const int ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_leftBlocks()
{
    const int blockSize = counter.block_size;
    int j = -1;
    int leftmostBlock = std::max_element(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2)
    {
        return arg1.leftMost_blocks < arg2.leftMost_blocks;
    })->leftMost_blocks;
    std::sort(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2)
    {
        return arg1.left_remain < arg2.left_remain;
    });
    for(unsigned i = 0; i < remain.size(); ++i)
        if(remain[i].left_remain < leftmostBlock)
            j = i;
    for(unsigned i = 0; i < remain.size() && remain[i].left_remain <= leftmostBlock;){
        if(remain[j].left_remain == leftmostBlock)
        {
            --j;
        }
        else
        {
            swapBlocks(first + remain[i].left_remain * blockSize, first + leftmostBlock * blockSize, blockSize);
            ++i;
        }
        --leftmostBlock;
    }
    return leftmostBlock;
}

template<typename RandomAccessIterator, class UnaryPredicate>
const int ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::arrange_rightBlocks()
{
    const int blockSize = counter.block_size;
    int j = -1;
    int rightmostBlock = std::max_element(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2)
    {
        return arg1.rightMost_blocks < arg2.rightMost_blocks;
    })->rightMost_blocks;
    std::sort(remain.begin(), remain.end(), [](const ChunkResult& arg1, const ChunkResult& arg2)
    {
        return arg1.right_remain < arg2.right_remain;
    });
    for(unsigned i = 0; i < remain.size(); ++i)
        if(remain[i].right_remain < rightmostBlock)
            j = i;
    for(unsigned i = 0; i < remain.size() && remain[i].right_remain <= rightmostBlock;)
    {
        if(remain[j].right_remain == rightmostBlock)
        {
            --j;
        }
        else
        {
            swapBlocks(last - (remain[i].right_remain + 1) * blockSize, last - (rightmostBlock + 1) * blockSize, blockSize);
            ++i;
        }
        --rightmostBlock;
    }
    return rightmostBlock;
}

template<typename RandomAccessIterator, class UnaryPredicate>
RandomAccessIterator ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate>::operator()()
{
    if(length <= THRESHOLD)
        return std::partition(first, last, func);
    const unsigned block_count = counter.block_count;
    const unsigned block_size = counter.block_size;
    const unsigned workerCount = std::min(std::thread::hardware_concurrency(), block_count / 2);
    std::vector<std::future<ChunkResult> > fut;
    for(unsigned i = 0; i < workerCount; ++i)
    {
        fut.push_back(std::async(&ParallelPartitionSolver::arrange_chunk, this, i));
    }
    for(auto& entry : fut)
    {
        remain.push_back(entry.get());
    }
    const int leftMostBlock = arrange_leftBlocks();
    const int rightMostBlock = arrange_rightBlocks();
    return std::partition(first + (leftMostBlock + 1) * block_size,
                            last - (rightMostBlock + 1) * block_size, func);
}

template<typename RandomAccessIterator, class UnaryPredicate>
RandomAccessIterator parallel_partition
    (const RandomAccessIterator first, const RandomAccessIterator last, const UnaryPredicate func, const unsigned THRESHOLD)
{
    ParallelPartitionSolver<RandomAccessIterator, UnaryPredicate> solver(first, last, func, THRESHOLD);
    return solver();
}

template<typename T>
class ThreadsafeQueue
{
private:
    class Node
    {
    public:
        std::shared_ptr<T> data;
        std::unique_ptr<Node> next;
    };
    std::mutex head_mutex;
    std::unique_ptr<Node> head;
    std::mutex tail_mutex;
    Node* tail;
    std::condition_variable data_cond;
    std::atomic<bool> complete;

    Node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    void pop_head()
    {
        std::unique_ptr<Node> old_head = std::move(head);
        head = std::move(old_head->next);
    }

public:
    ThreadsafeQueue() : head(new Node), tail(head.get()), complete(false){}
    void push(const T new_value)
    {
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<Node> p(new Node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data = new_data;
            Node* const new_tail = p.get();
            tail->next = std::move(p);
            tail = new_tail;
        }
        data_cond.notify_one();
    }
    bool wait_and_pop(T& value, std::atomic<unsigned>& num)
    {
        std::unique_lock<std::mutex> head_lock(head_mutex);
        if(locked_empty() && num.load(std::memory_order_acquire) == 0)
        {
            finish();
            return false;
        }
        bool check;
        data_cond.wait(head_lock, [&]
        {
            return (check = !locked_empty()) || (num.load(std::memory_order_acquire) == 0);
        });
        if(!check)
        {
            finish();
            return false;
        }
        num.fetch_add(1, std::memory_order_acq_rel);
        value = std::move(*head->data);
        pop_head();
        return true;
    }
    void finish()
    {
        if(!complete.load(std::memory_order_acquire))
        {
            complete.store(true, std::memory_order_release);
            data_cond.notify_all();
        }
    }
    bool locked_empty()
    {
        return (head.get() == get_tail());
    }
};

template<typename RandomAccessIterator, class Compare>
class ParallelQuickSortSolver {
private:
    const RandomAccessIterator first, last;
    const Compare comp;
    ThreadsafeQueue<std::pair<RandomAccessIterator, RandomAccessIterator> > task_queue;
    std::mt19937 mt;
    std::atomic<unsigned> actthread_num;
    const unsigned total_length;
    const unsigned THRESHOLD;

    void getTask_and_run();
    void parallel_partial_sort(const RandomAccessIterator _first, const RandomAccessIterator _last);

public:
    ParallelQuickSortSolver(const RandomAccessIterator _first, const RandomAccessIterator _last, const Compare _comp)
        : first(_first), last(_last), comp(_comp), task_queue(), mt(std::random_device{}()),
            actthread_num(0), total_length(std::distance(first, last)), THRESHOLD(std::max(total_length / 200, (1u << 14)))
    {
        task_queue.push(std::make_pair(first, last));
    }
    void operator()();
};

template<typename RandomAccessIterator, class Compare>
void ParallelQuickSortSolver<RandomAccessIterator, Compare>::getTask_and_run()
{
    std::pair<RandomAccessIterator, RandomAccessIterator> task;
    while(task_queue.wait_and_pop(task, actthread_num))
    {
        parallel_partial_sort(task.first, task.second);
    }
}

template<typename RandomAccessIterator, class Compare>
void ParallelQuickSortSolver<RandomAccessIterator, Compare>::parallel_partial_sort(const RandomAccessIterator _first, const RandomAccessIterator _last)
{
    const unsigned length = std::distance(_first, _last);
    if(length <= THRESHOLD)
    {
        if(length >= 2)
            std::sort(_first, _last);
        actthread_num.fetch_sub(1, std::memory_order_acq_rel);
        return;
    }
    std::uniform_int_distribution<> uid(0, length-1);
    const RandomAccessIterator pivot_itr = std::next(_first, uid(mt));
    const auto pivot = *pivot_itr;
    std::iter_swap(_first, pivot_itr);
    const RandomAccessIterator middle
        = parallel_partition(_first + 1, _last, std::bind(comp, std::placeholders::_1, std::cref(pivot)), std::max(total_length / 4, (1u << 17)));
    std::iter_swap(_first, middle - 1);
    task_queue.push(std::make_pair(_first, middle - 1));
    parallel_partial_sort(middle, _last);
}

template<typename RandomAccessIterator, class Compare>
void ParallelQuickSortSolver<RandomAccessIterator, Compare>::operator()()
{
    const unsigned hardware_threads = std::thread::hardware_concurrency();
    std::vector<std::future<void> > fut;
    for(unsigned i = 0; i < hardware_threads; ++i)
    {
        fut.push_back(std::async(&ParallelQuickSortSolver<RandomAccessIterator, Compare>::getTask_and_run, this));
    }
    for(auto& entry : fut)
    {
        entry.get();
    }
}

template<typename RandomAccessIterator, class Compare=std::less<typename RandomAccessIterator::value_type> >
void parallel_quick_sort(const RandomAccessIterator first, const RandomAccessIterator last, const Compare comp=Compare())
{
    ParallelQuickSortSolver<RandomAccessIterator, Compare> solver(first, last, comp);
    solver();
}

verify 用の問題

verify していません(verify 問題を知らない)
上記の 2 つのコードは手元でかなりテストはした.