Thread-safe lock-free min where both operands can change c++

53 views Asked by At

I'm doing the parallel version of Bellman-Ford algorithm in c++ using std::atomic This is my main function executed in multiple threads

void calculateDistances(size_t start, size_t end, const Graph& graph, std::vector<std::atomic<double>>& distances, bool& haveChange)
{
    for (size_t source = start; source < end; ++source) {
        for (const auto& connection : graph[source]) {
            const size_t& destination = connection.destination;
            const double& distance = connection.distance;

            double oldDistance = distances[destination];
            while (distances[source] + distance < oldDistance)
            {
                if (distances[destination].compare_exchange_strong(oldDistance, distances[source] + distance)) {
                    haveChange = true;
                    break;
                }
            }
        }
    }
}

Here I'm trying to update distances[destination] with distances[source] + distance if second is smaller. However both: distances[destination] and distances[source] can be changed in other thread during this operation, so I'm using compare_exchange_strong here. But even when using this code - data race is still present and some of the iterations are skipped, resulting in failure of algorithm on some input data. Why is this going on and how I can fix this? The main function for context

std::vector<std::atomic<double>> BellmanFordAsync(const Graph& graph, size_t tasks, size_t start)
{
    size_t nodesCount = graph.getNodesCount();
    std::vector<std::atomic<double>> distances(nodesCount);

    for (auto& atomic : distances)
        atomic = std::numeric_limits<double>::infinity();

    distances[start] = 0;

    size_t chunkSize = (nodesCount + tasks - 1) / tasks;

    std::vector<std::future<void>> return_results;
    bool haveChange = true;

    for (size_t i = 1; i < nodesCount; ++i)
    {
        haveChange = false;

        for (size_t i = 0; i < tasks; ++i) {
            size_t start = i * chunkSize;
            size_t end = std::min((i + 1) * chunkSize, nodesCount);

            std::future<void> future = std::async(calculateDistances,
                start, end, std::cref(graph), std::ref(distances), std::ref(haveChange));

            return_results.push_back(std::move(future));
        }

        for (auto& result : return_results)
            result.wait();

        return_results.clear();
        if (!haveChange) break;
    }

    bool haveNegativeCycles = false;

    for (size_t i = 0; i < tasks; ++i) {
        size_t start = i * chunkSize;
        size_t end = std::min((i + 1) * chunkSize, nodesCount);

        std::future<void> future = std::async(checkNegativeCycles<std::atomic<double>>, start, end, std::cref(graph), std::cref(distances), std::ref(haveNegativeCycles));

        return_results.push_back(std::move(future));
    }

    for (auto& result : return_results) {
        result.wait();
        if (haveNegativeCycles) return std::vector<double>();
    }

    return distances;
}
0

There are 0 answers