Skip to content

Commit 3738a5c

Browse files
authored
Release v0.6.0 (#7)
Incremental Datalog with Negation.
1 parent 348e814 commit 3738a5c

File tree

6 files changed

+359
-3
lines changed

6 files changed

+359
-3
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ whose queries can both change during runtime __and__ respond to new data being s
3939

4040
* [Graph Reachability](notebooks/benchmark.ipynb)
4141
* [Datalog Interpretation](notebooks/datalog.ipynb)
42+
* [Stratified Datalog Interpretation] (notebooks/stratified_negation.ipynb)
4243
* [Not-interpreted Datalog](notebooks/rdfs.ipynb)
4344
* [Streaming Pandas](notebooks/readme.ipynb)
4445
* [Streaming Pandas on the GPU](notebooks/readme_gpu.ipynb)

notebooks/stratified_negation.ipynb

+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"id": "ac5db869-9bb9-4c1e-827a-b32e57bc63ea",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"from pydbsp.zset import ZSet, ZSetAddition\n",
11+
"from pydbsp.stream import step_until_fixpoint\n",
12+
"from typing import Tuple, List\n",
13+
"\n",
14+
"from pydbsp.algorithms.datalog import (\n",
15+
" EDB,\n",
16+
" Fact,\n",
17+
" Program,\n",
18+
" Rule,\n",
19+
")\n",
20+
"\n",
21+
"Edge = Tuple[int, int]\n",
22+
"GraphZSet = ZSet[Edge]\n",
23+
"\n",
24+
"def create_test_zset_graph(n: int) -> GraphZSet:\n",
25+
" return ZSet({(k, k + 1): 1 for k in range(n)})\n",
26+
"\n",
27+
"def create_test_edb(n: int) -> EDB:\n",
28+
" test_graph = create_test_zset_graph(n)\n",
29+
" group: ZSetAddition[Fact] = ZSetAddition()\n",
30+
"\n",
31+
" test_edb = group.identity()\n",
32+
" for k, v in test_graph.items():\n",
33+
" test_edb.inner[(\"E\", (k[0], k[1]))] = v\n",
34+
"\n",
35+
" return test_edb\n",
36+
"\n",
37+
"def from_facts_into_zset(facts: List[Tuple[Fact, int]]) -> EDB:\n",
38+
" edb: EDB = ZSetAddition().identity()\n",
39+
" for fact, weight in facts:\n",
40+
" edb.inner[fact] = weight\n",
41+
"\n",
42+
" return edb\n",
43+
"\n",
44+
"\n",
45+
"def from_rule_into_zset(rule: Rule, weight: int) -> Program:\n",
46+
" program: Program = ZSetAddition().identity()\n",
47+
" program.inner[rule] = weight\n",
48+
"\n",
49+
" return program\n",
50+
"\n"
51+
]
52+
},
53+
{
54+
"cell_type": "code",
55+
"execution_count": 2,
56+
"id": "e884e314-3f2f-40fc-bb6c-cfd6a5a2d4e4",
57+
"metadata": {},
58+
"outputs": [
59+
{
60+
"name": "stdout",
61+
"output_type": "stream",
62+
"text": [
63+
"[(0, 0), (1, 0), (1, 1), (1, 2), (1, 3), (1, 4), (2, 0), (2, 1), (2, 2), (3, 0), (3, 1), (3, 2), (3, 3), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]\n"
64+
]
65+
}
66+
],
67+
"source": [
68+
"from pydbsp.algorithms.datalog import IncrementalDatalogWithNegation, Variable\n",
69+
"from pydbsp.stream import Stream, StreamHandle\n",
70+
"from pydbsp.stream.operators.linear import stream_elimination\n",
71+
"\n",
72+
"edb_group: ZSetAddition[Fact] = ZSetAddition()\n",
73+
"edb_stream = Stream(edb_group)\n",
74+
"edb_stream_h = StreamHandle(lambda: edb_stream)\n",
75+
"\n",
76+
"program_group: ZSetAddition[Rule]= ZSetAddition()\n",
77+
"program_stream = Stream(program_group)\n",
78+
"program_stream_h = StreamHandle(lambda: program_stream)\n",
79+
"\n",
80+
"seed: Rule = ((\"T\", (Variable(\"X\"), Variable(\"Y\"))), (\"E\", (Variable(\"X\"), Variable(\"Y\"))))\n",
81+
"transitivity: Rule = (\n",
82+
" (\"T\", (Variable(\"X\"), Variable(\"Z\"))),\n",
83+
" (\"T\", (Variable(\"X\"), Variable(\"Y\"))),\n",
84+
" (\"T\", (Variable(\"Y\"), Variable(\"Z\"))),\n",
85+
")\n",
86+
"node_left: Rule = (\n",
87+
" (\"Node\", (Variable(\"X\"),)),\n",
88+
" (\"E\", (Variable(\"X\"), Variable(\"Y\")))\n",
89+
")\n",
90+
"node_right: Rule = (\n",
91+
" (\"Node\", (Variable(\"Y\"),)),\n",
92+
" (\"E\", (Variable(\"X\"), Variable(\"Y\")))\n",
93+
")\n",
94+
"complement_transitivity: Rule = (\n",
95+
" (\"not_T\", (Variable(\"X\"), Variable(\"Y\"))),\n",
96+
" (\"Node\", (Variable(\"X\"),)),\n",
97+
" (\"Node\", (Variable(\"Y\"),)),\n",
98+
" (\"!T\", (Variable(\"X\"), Variable(\"Y\")))\n",
99+
")\n",
100+
"\n",
101+
"reasoner = IncrementalDatalogWithNegation(edb_stream_h, program_stream_h, None)\n",
102+
"edb_stream.send(from_facts_into_zset([((\"E\", (2, 3)), 1), ((\"E\", (0, 2)), 1), ((\"E\", (0, 1)), 1), ((\"R\", (1, 2)), 1) , ((\"E\", (3, 4)), 1)]))\n",
103+
"program_stream.send(from_rule_into_zset(transitivity, 1))\n",
104+
"program_stream.send(from_rule_into_zset(node_left, 1))\n",
105+
"program_stream.send(from_rule_into_zset(node_right, 1))\n",
106+
"program_stream.send(from_rule_into_zset(complement_transitivity, 1))\n",
107+
"program_stream.send(from_rule_into_zset(seed, 1))\n",
108+
"\n",
109+
"step_until_fixpoint(reasoner)\n",
110+
"fixedpoint = [ k[1] for k, _v in stream_elimination(reasoner.output()).items() if k[0] == \"not_T\" ]\n",
111+
"\n",
112+
"print(sorted(fixedpoint))"
113+
]
114+
},
115+
{
116+
"cell_type": "code",
117+
"execution_count": 3,
118+
"id": "ccb1ee9c-a34b-4ffd-84c0-ee19c6f55f77",
119+
"metadata": {},
120+
"outputs": [
121+
{
122+
"name": "stdout",
123+
"output_type": "stream",
124+
"text": [
125+
"[(0, 1, 2, 3), (1, 2, 3, 0), (2, 3, 0, 1), (3, 0, 1, 2)]\n"
126+
]
127+
}
128+
],
129+
"source": [
130+
"edb_group: ZSetAddition[Fact] = ZSetAddition()\n",
131+
"edb_stream = Stream(edb_group)\n",
132+
"edb_stream_h = StreamHandle(lambda: edb_stream)\n",
133+
"\n",
134+
"program_group: ZSetAddition[Rule]= ZSetAddition()\n",
135+
"program_stream = Stream(program_group)\n",
136+
"program_stream_h = StreamHandle(lambda: program_stream)\n",
137+
"\n",
138+
"cycle_4_not_overlapping_cycle_3: Rule = (\n",
139+
" (\"cycle_4_not_overlapping_cycle_3\", (\n",
140+
" Variable(\"A\"), \n",
141+
" Variable(\"B\"), \n",
142+
" Variable(\"C\"), \n",
143+
" Variable(\"D\")\n",
144+
" )),\n",
145+
" (\"E\", (Variable(\"A\"), Variable(\"B\"))),\n",
146+
" (\"E\", (Variable(\"B\"), Variable(\"C\"))),\n",
147+
" (\"!E\", (Variable(\"A\"), Variable(\"C\"))),\n",
148+
" (\"E\", (Variable(\"C\"), Variable(\"D\"))),\n",
149+
" (\"!E\", (Variable(\"B\"), Variable(\"D\"))),\n",
150+
" (\"!E\", (Variable(\"C\"), Variable(\"A\"))),\n",
151+
" (\"E\", (Variable(\"D\"), Variable(\"A\"))),\n",
152+
" (\"!E\", (Variable(\"D\"), Variable(\"B\")))\n",
153+
")\n",
154+
"\n",
155+
"reasoner = IncrementalDatalogWithNegation(edb_stream_h, program_stream_h, None)\n",
156+
"edb_stream.send(from_facts_into_zset([\n",
157+
" #// First 4-cycle: 0 -> 1 -> 2 -> 3 -> 0\n",
158+
" ((\"E\", (0, 1)), 1),\n",
159+
" ((\"E\", (1, 2)), 1),\n",
160+
" ((\"E\", (2, 3)), 1),\n",
161+
" ((\"E\", (3, 0)), 1),\n",
162+
" \n",
163+
" #// Second pattern (4-cycle with 3-cycle): 4 -> 5 -> 6 -> 7 -> 4 and 6 -> 4\n",
164+
" ((\"E\", (4, 5)), 1),\n",
165+
" ((\"E\", (5, 6)), 1),\n",
166+
" ((\"E\", (6, 7)), 1),\n",
167+
" ((\"E\", (7, 4)), 1),\n",
168+
" ((\"E\", (6, 4)), 1) #// Creates the 3-cycle\n",
169+
"]))\n",
170+
"program_stream.send(from_rule_into_zset(cycle_4_not_overlapping_cycle_3, 1))\n",
171+
"\n",
172+
"step_until_fixpoint(reasoner)\n",
173+
"fixedpoint = [ k[1] for k, v in stream_elimination(reasoner.output()).items() if k[0] == \"cycle_4_not_overlapping_cycle_3\" ]\n",
174+
"\n",
175+
"print(sorted(fixedpoint))"
176+
]
177+
}
178+
],
179+
"metadata": {
180+
"kernelspec": {
181+
"display_name": "Python 3 (ipykernel)",
182+
"language": "python",
183+
"name": "python3"
184+
},
185+
"language_info": {
186+
"codemirror_mode": {
187+
"name": "ipython",
188+
"version": 3
189+
},
190+
"file_extension": ".py",
191+
"mimetype": "text/x-python",
192+
"name": "python",
193+
"nbconvert_exporter": "python",
194+
"pygments_lexer": "ipython3",
195+
"version": "3.12.4"
196+
}
197+
},
198+
"nbformat": 4,
199+
"nbformat_minor": 5
200+
}

pydbsp/algorithms/datalog.py

+155-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
BinaryOperator,
66
Lift1,
77
LiftedGroupAdd,
8+
LiftedGroupNegate,
89
Stream,
910
StreamHandle,
1011
step_until_fixpoint_and_return,
@@ -16,6 +17,7 @@
1617
)
1718
from pydbsp.zset import ZSet, ZSetAddition
1819
from pydbsp.zset.operators.bilinear import DeltaLiftedDeltaLiftedJoin
20+
from pydbsp.zset.operators.linear import LiftedLiftedProject, LiftedLiftedSelect
1921
from pydbsp.zset.operators.unary import DeltaLiftedDeltaLiftedDistinct
2022

2123
Constant = Any
@@ -217,7 +219,6 @@ def rewrite_product_projection(
217219
edb_identity: EDB = ZSetAddition().identity()
218220
provenance_indexed_rewrite_identity: ProvenanceChain = ZSetAddition().identity()
219221

220-
221222
class IncrementalDatalog(BinaryOperator[EDB, Program, EDB]):
222223
# Program transformations
223224
lifted_sig: LiftedSig
@@ -332,6 +333,159 @@ def step(self) -> bool:
332333
new_rewrites_stream = self.distinct_rewrites.output().latest()
333334
return new_facts_stream.is_identity() and new_rewrites_stream.is_identity()
334335

336+
class IncrementalDatalogWithNegation(BinaryOperator[EDB, Program, EDB]):
337+
# Program transformations
338+
lifted_sig: LiftedSig
339+
lifted_intro_lifted_sig: LiftedStreamIntroduction[GroundingSignals]
340+
lifted_dir: LiftedDir
341+
lifted_intro_lifted_dir: LiftedStreamIntroduction[ProvenanceChain]
342+
343+
# EDB transformations
344+
lifted_intro_edb: LiftedStreamIntroduction[EDB]
345+
346+
# Rewrite transformations
347+
rewrites: StreamHandle[ZSet[ProvenanceIndexedRewrite]]
348+
lifted_rewrites: LiftedStreamIntroduction[ZSet[ProvenanceIndexedRewrite]]
349+
350+
# Joins
351+
gatekeep: DeltaLiftedDeltaLiftedJoin[ProvenanceIndexedRewrite, Direction, AtomWithSourceRewriteAndProvenance]
352+
positive_atoms: LiftedLiftedSelect[AtomWithSourceRewriteAndProvenance]
353+
product: DeltaLiftedDeltaLiftedJoin[AtomWithSourceRewriteAndProvenance, Fact, ProvenanceIndexedRewrite]
354+
355+
negative_atoms: LiftedLiftedSelect[AtomWithSourceRewriteAndProvenance]
356+
proj: LiftedLiftedProject[AtomWithSourceRewriteAndProvenance, ProvenanceIndexedRewrite]
357+
358+
anti_product: DeltaLiftedDeltaLiftedJoin[AtomWithSourceRewriteAndProvenance, Fact, ProvenanceIndexedRewrite]
359+
negated_product: LiftedGroupNegate[Stream[ZSet[ProvenanceIndexedRewrite]]]
360+
361+
final_product_0: LiftedGroupAdd[Stream[ZSet[ProvenanceIndexedRewrite]]]
362+
final_product: LiftedGroupAdd[Stream[ZSet[ProvenanceIndexedRewrite]]]
363+
364+
ground: DeltaLiftedDeltaLiftedJoin[ProvenanceIndexedRewrite, Signal, Fact]
365+
366+
# Distincts
367+
distinct_rewrites: DeltaLiftedDeltaLiftedDistinct[ProvenanceIndexedRewrite]
368+
distinct_facts: DeltaLiftedDeltaLiftedDistinct[Fact]
369+
370+
# Delays
371+
delay_distinct_facts: Delay[Stream[ZSet[Fact]]]
372+
delay_distinct_rewrites: Delay[Stream[ZSet[ProvenanceIndexedRewrite]]]
373+
374+
# Pluses
375+
fresh_facts_plus_edb: LiftedGroupAdd[Stream[EDB]]
376+
rewrite_product_plus_rewrites: LiftedGroupAdd[Stream[ZSet[ProvenanceIndexedRewrite]]]
377+
378+
# Stream elimination
379+
lifted_elim_fresh_facts: LiftedStreamElimination[EDB]
380+
381+
def set_input_a(self, stream_handle_a: StreamHandle[EDB]) -> None:
382+
self.input_stream_handle_a = stream_handle_a
383+
self.lifted_intro_edb = LiftedStreamIntroduction(self.input_stream_handle_a)
384+
385+
provenance_indexed_rewrite_group: ZSetAddition[ProvenanceIndexedRewrite] = ZSetAddition()
386+
rewrite_stream: Stream[ZSet[ProvenanceIndexedRewrite]] = Stream(provenance_indexed_rewrite_group)
387+
self.rewrites = StreamHandle(lambda: rewrite_stream)
388+
empty_rewrite_set = rewrite_stream.group().identity()
389+
empty_rewrite_set.inner[(0, RewriteMonoid().identity())] = 1
390+
391+
self.rewrites.get().send(empty_rewrite_set)
392+
self.lifted_rewrites = LiftedStreamIntroduction(self.rewrites)
393+
394+
def set_input_b(self, stream_handle_b: StreamHandle[Program]) -> None:
395+
self.input_stream_handle_b = stream_handle_b
396+
397+
self.lifted_sig = LiftedSig(self.input_stream_handle_b)
398+
self.lifted_intro_lifted_sig = LiftedStreamIntroduction(self.lifted_sig.output_handle())
399+
400+
self.lifted_dir = LiftedDir(self.input_stream_handle_b)
401+
self.lifted_intro_lifted_dir = LiftedStreamIntroduction(self.lifted_dir.output_handle())
402+
403+
self.gatekeep = DeltaLiftedDeltaLiftedJoin(
404+
None, None, lambda left, right: left[0] == right[0], lambda left, right: (right[1], right[2], left[1])
405+
)
406+
407+
self.positive_atoms = LiftedLiftedSelect(self.gatekeep.output_handle(), lambda gkeep: gkeep[1] is None or ("!" not in gkeep[1][0]))
408+
409+
self.product = DeltaLiftedDeltaLiftedJoin(
410+
self.positive_atoms.output_handle(),
411+
None,
412+
lambda left, right: left[1] is None
413+
or (left[1][0] == right[0] and unify(left[2].apply(left[1]), right) is not None),
414+
rewrite_product_projection,
415+
)
416+
417+
self.negative_atoms = LiftedLiftedSelect(self.gatekeep.output_handle(), lambda gkeep: not (gkeep[1] is None or ("!" not in gkeep[1][0])))
418+
419+
# Bypass rewrites around product so that we can apply the anti join kills by retracting anti join matches in product from the bypassed rewrites
420+
self.proj = LiftedLiftedProject(self.negative_atoms.output_handle(), lambda gkeep: (gkeep[0], gkeep[2]))
421+
422+
self.anti_product = DeltaLiftedDeltaLiftedJoin(
423+
self.negative_atoms.output_handle(),
424+
None,
425+
lambda left, right: left[1] is None
426+
or (left[1][0].strip("!") == right[0] and unify(left[2].apply(left[1]), right) is not None),
427+
lambda left, _: (left[0], left[2]),
428+
)
429+
430+
self.negated_product = LiftedGroupNegate(self.anti_product.output_handle())
431+
432+
self.final_product_0 = LiftedGroupAdd(self.negated_product.output_handle(), self.proj.output_handle())
433+
self.final_product = LiftedGroupAdd(self.final_product_0.output_handle(), self.product.output_handle())
434+
435+
self.ground = DeltaLiftedDeltaLiftedJoin(
436+
self.final_product.output_handle(),
437+
self.lifted_intro_lifted_sig.output_handle(),
438+
lambda left, right: left[0] == right[0],
439+
lambda left, right: left[1].apply(right[1]),
440+
)
441+
442+
self.fresh_facts_plus_edb = LiftedGroupAdd(self.ground.output_handle(), self.lifted_intro_edb.output_handle())
443+
self.distinct_facts = DeltaLiftedDeltaLiftedDistinct(self.fresh_facts_plus_edb.output_handle())
444+
445+
self.rewrite_product_plus_rewrites = LiftedGroupAdd(
446+
self.final_product.output_handle(), self.lifted_rewrites.output_handle()
447+
)
448+
self.distinct_rewrites = DeltaLiftedDeltaLiftedDistinct(self.rewrite_product_plus_rewrites.output_handle())
449+
450+
self.delay_distinct_facts = Delay(self.distinct_facts.output_handle())
451+
self.delay_distinct_rewrites = Delay(self.distinct_rewrites.output_handle())
452+
self.gatekeep.set_input_a(self.delay_distinct_rewrites.output_handle())
453+
self.gatekeep.set_input_b(self.lifted_intro_lifted_dir.output_handle())
454+
self.product.set_input_b(self.delay_distinct_facts.output_handle())
455+
self.anti_product.set_input_b(self.delay_distinct_facts.output_handle())
456+
457+
self.lifted_elim_fresh_facts = LiftedStreamElimination(self.distinct_facts.output_handle())
458+
self.output_stream_handle = self.lifted_elim_fresh_facts.output_handle()
459+
460+
def step(self) -> bool:
461+
self.lifted_intro_edb.step()
462+
self.lifted_rewrites.step()
463+
self.lifted_sig.step()
464+
self.lifted_dir.step()
465+
self.lifted_intro_lifted_sig.step()
466+
self.lifted_intro_lifted_dir.step()
467+
self.gatekeep.step()
468+
self.positive_atoms.step()
469+
self.product.step()
470+
self.negative_atoms.step()
471+
self.proj.step()
472+
self.anti_product.step()
473+
self.negated_product.step()
474+
self.final_product_0.step()
475+
self.final_product.step()
476+
self.ground.step()
477+
self.fresh_facts_plus_edb.step()
478+
self.distinct_facts.step()
479+
self.rewrite_product_plus_rewrites.step()
480+
self.distinct_rewrites.step()
481+
self.delay_distinct_facts.step()
482+
self.delay_distinct_rewrites.step()
483+
self.lifted_elim_fresh_facts.step()
484+
485+
new_facts_stream = self.distinct_facts.output().latest()
486+
new_rewrites_stream = self.distinct_rewrites.output().latest()
487+
return new_facts_stream.is_identity() and new_rewrites_stream.is_identity()
488+
335489

336490
ColumnReference = Tuple[int, ...]
337491
JoinIndex = Tuple[Predicate, ColumnReference]

0 commit comments

Comments
 (0)