Skip to content

Distributed OWL Reasoners #176

@Demirrr

Description

@Demirrr

Description

  1. At each node, we store the fragment of ABOX and the full TBOX/RBOX.
  2. At each node, we load the data into a reasoner.
  3. Send a query to nodes, e.g. Female
  4. Return a set of instance per node
  5. Aggregate the results

Here is an example over csv data.

Step 1: Create Data Fragments

python -c "import pandas as pd; \
pd.DataFrame([{'id': 1, 'name': 'Alice', 'country': 'Germany'}, {'id': 2, 'name': 'Bob', 'country': 'France'}]).to_csv('db_part_1.csv', index=False); \
pd.DataFrame([{'id': 3, 'name': 'Charlie', 'country': 'Germany'}, {'id': 4, 'name': 'David', 'country': 'Spain'}]).to_csv('db_part_2.csv', index=False)"

Step 2: Start the Infrastructure

Open three terminals and run these commands. The --resources flag tells Ray which terminal has which file.

Terminal 1 (Head Node + Fragment 1)

ray start --head --port=6379 --num-cpus=1 --resources='{"server_alpha": 1}'

Terminal 2 (Worker Node + Fragment 2)

ray start --address='192.168.2.225:6379' --num-cpus=1 --resources='{"server_beta": 1}'

Terminal 3 (Client)

Keep this terminal open to run your Python code.

import ray
import pandas as pd

ray.init(address='auto')

@ray.remote
class DatabaseShard:
    def __init__(self, shard_id, file_path):
        self.shard_id = shard_id
        # Load data into RAM once
        self.df = pd.read_csv(file_path)
        print(f"--- Shard {shard_id} initialized and data loaded into RAM ---")

    def query(self, country):
        print(f"Shard {self.shard_id} searching for: {country}")
        result = self.df[self.df['country'] == country]
        return result.to_dict('records')

if __name__ == "__main__":
    # 1. Instantiate Actors on specific nodes based on resources
    shard_1 = DatabaseShard.options(resources={"server_alpha": 1}).remote("Alpha", "db_part_1.csv")
    shard_2 = DatabaseShard.options(resources={"server_beta": 1}).remote("Beta", "db_part_2.csv")

    # 2. Perform a distributed search
    search_target = "Germany"
    print(f"\nRequesting '{search_target}' from all shards...")

    # Both shards search their own memory in parallel
    results = ray.get([
        shard_1.query.remote(search_target),
        shard_2.query.remote(search_target)
    ])

    # 3. Merge results (The 'Reduce' step)
    flat_results = [item for sublist in results for item in sublist]
    print(f"\nFound {len(flat_results)} records:")
    for record in flat_results:
        print(f" - {record['name']} lives in {record['country']}")

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions