Skip to content

Cannot read europe file #5

@ghost

Description

I am experiencing issue with reading the 17 GB Europe extract. The application always crashes on line 186 in parsehelpers.h while trying to do
blobsRead += dbufs.size();
Somehow variables inFIle,processor,mtx,blobsRead,doProcessing,threadPrivateProcesor and maxBlobstoRead can´t be read by a thread, at least that´s what i see when break on exception in debugger. Please try the code i am trying to run if you experience same issues. The parser works with 1,4 GB Africa extract though. Both europe and africa extract were downlaoded from http://download.geofabrik.de/.

My example counter application accordign to your examples.

//leak detector
#include <vld.h>

////parser
#include <osmpbf/parsehelpers.h>
#include <osmpbf/inode.h>
#include <osmpbf/iway.h>
#include <osmpbf/irelation.h>
#include <osmpbf/filter.h>
#include <google/protobuf/stubs/common.h>


//cassandra driver - connecting to ScyllaDB
#include <cassandra.h>
#include <stdio.h>

//filtering functions
#include <highway/HighwayTagChecker.h>
#include <poi/PoiTagChecker.h>

//serialization to DB/disk
#include <serialization/disk/DiskSerializer.h>

//additional includes
#include <chrono>
#include <tuple>
#include <iostream>

using namespace std::chrono;
class commaPunct : public std::numpunct<char>
{
protected:
    virtual std::string do_grouping() const
    {
        return "\03";
    }

    virtual char do_thousands_sep() const
    {
        return ',';
    }
};


inline std::string primitiveTypeToString(osmpbf::PrimitiveType t) {
    switch (t) {
    case osmpbf::PrimitiveType::NodePrimitive:
        return "node";
    case osmpbf::PrimitiveType::WayPrimitive:
        return "way";
    case osmpbf::PrimitiveType::RelationPrimitive:
        return "relation";
    default:
        return "invalid";
    }
}


struct SharedState 
{
    std::mutex lock;

    //pocet elementov
    uint64_t nodeCount;
    uint64_t wayCount;
    uint64_t relationCount;

    //pocet ciest / pcoet hran pridanych do grafu
    uint64_t highway_count;
    uint64_t edge_count;

    //pocet poi
    uint64_t nodePoiCount;
    uint64_t wayPoiCount;
    SharedState() : nodeCount(0), wayCount(0), relationCount(0),nodePoiCount(0),wayPoiCount(0), highway_count(0), edge_count(0) {}
};


struct MyCounter 
{
    SharedState * state;
    uint64_t nodeCount;
    uint64_t wayCount;
    uint64_t relationCount;
    uint64_t highway_count;
    uint64_t edge_count;
    uint64_t nodePoiCount;
    uint64_t wayPoiCount;
    HighwayTagChecker ht;
    PoiTagChecker pt;
    Serializer serializer;

    MyCounter(SharedState * state) : state(state), nodeCount(0), wayCount(0), relationCount(0), highway_count(0), edge_count(0)
    {
        ht = HighwayTagChecker();
        pt = PoiTagChecker();
    }
    MyCounter(const MyCounter & other) : state(other.state), nodeCount(0), wayCount(0), relationCount(0), highway_count(0), edge_count(0)
    {
        ht = HighwayTagChecker();
        pt = PoiTagChecker();
    }
    void operator()(osmpbf::PrimitiveBlockInputAdaptor & pbi) 
    {
        nodeCount = wayCount = relationCount = nodePoiCount = wayPoiCount = edge_count = highway_count =  0;
        std::vector<Node> nodes;
        std::vector<Edge> edges;

        for (osmpbf::INodeStream node(pbi.getNodeStream()); !node.isNull(); node.next()) 
        {
            //std::cout << "<node id=" << node.id() << " lat=" << node.latd() << " lon=" << node.lond() << ">" << std::endl;
            Poi* poi = pt.getPoi(node);
            if (poi != nullptr)
            {
                ++nodePoiCount;
                delete poi;
            }

            Node n;
            n.id = node.id();
            n.lat = node.latd();
            n.lon = node.lond();
            nodes.push_back(n);

            ++nodeCount;
        }
        for (osmpbf::IWayStream way(pbi.getWayStream()); !way.isNull(); way.next())
        {
            //ak je highway, tak vytvor hrany
            HighWay* hw = ht.getHighway(way);
            if (hw != nullptr)
            {


                highway_count++;
            }


            Poi* poi = pt.getPoi(way);
            if (poi != nullptr)
            {
                ++wayPoiCount;
                delete poi;
            }

            ++wayCount;
        }
        for (osmpbf::IRelationStream rel(pbi.getRelationStream()); !rel.isNull(); rel.next())
        {
            /*std::cout << "<relation id=" << rel.id() << ">" << std::endl;
            for (osmpbf::IMemberStream mem(rel.getMemberStream()); !mem.isNull(); mem.next()) 
            {
                //std::cout << "\t<member type=" << primitiveTypeToString(mem.type()) << " ref=" << mem.id() << " role=" << mem.role() << "/>" << std::endl;
            }
            for (uint32_t i = 0, s = rel.tagsSize(); i < s; ++i) 
            {
                //std::cout << "\t<tag k=" << relation.key(i) << " v=" << relation.value(i) << ">" << std::endl;
            }
            //std::cout << "</relation>" << std::endl;
            */

            ++relationCount;
        }


        //now flush everything to shared state
        std::unique_lock<std::mutex> lck(state->lock);
        state->nodeCount += nodeCount;
        state->wayCount += wayCount;
        state->relationCount += relationCount;
        state->highway_count += highway_count;
        state->edge_count += edge_count;
        state->nodePoiCount += nodePoiCount;
        state->wayPoiCount += wayPoiCount;
    }
};




int main(int argc, char ** argv) 
{
    std::string fileName(argv[1]);
    SharedState state;
    osmpbf::OSMFileIn inFile(fileName);



    if (!inFile.open()) 
    {
        std::cout << "Failed to open " << fileName << std::endl;
        google::protobuf::ShutdownProtobufLibrary();
        return -1;
    }

    uint32_t threadCount = std::max<int>(std::thread::hardware_concurrency(), 1); //use 2 threads, usually 4 are more than enough
    uint32_t readBlobCount = 2; //parse 2 blocks at once
    bool threadPrivateProcessor = true; //set to true so that MyCounter is copied


    high_resolution_clock::time_point t1 = high_resolution_clock::now();
    osmpbf::parseFileCPPThreads(inFile, MyCounter(&state), threadCount, readBlobCount, threadPrivateProcessor);

    high_resolution_clock::time_point t2 = high_resolution_clock::now();
    int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();

    std::cout << "Celkovy cas " << duration << " ms" << std::endl;

    std::locale comma(std::locale(), new commaPunct());
    std::cout.imbue(comma);

    std::cout << "File " << fileName << " has the following amounts of matching primitives:\n";
    std::cout << "Nodes: " << state.nodeCount  << "\n";
    std::cout << "Ways: " << state.wayCount << "\n";
    std::cout << "Relations: " << state.relationCount << "\n";
    std::cout << "Highways: " << state.highway_count << "\n";
    std::cout << "Added edges: " << state.edge_count << "\n";
    std::cout << "Node poi: " << state.nodePoiCount << "\n";
    std::cout << "Way poi: " << state.wayPoiCount << "\n";
    std::cout << std::flush;

    google::protobuf::ShutdownProtobufLibrary();


    return 0;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions