Skip to content

Commit 3c06490

Browse files
Merge pull request #84 from balamg/issue80
Issue80
2 parents ca6b3af + fb49954 commit 3c06490

File tree

8 files changed

+190
-53
lines changed

8 files changed

+190
-53
lines changed

common/model/types.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ type RuleSession interface {
7272
//RtcTransactionHandler
7373
RegisterRtcTransactionHandler(txnHandler RtcTransactionHandler, handlerCtx interface{})
7474

75+
//replay existing tuples into a rule
76+
ReplayTuplesForRule(ruleName string) (err error)
7577
}
7678

7779
//ConditionEvaluator is a function pointer for handling condition evaluations on the server side
@@ -92,16 +94,14 @@ type ValueChangeListener interface {
9294

9395
type RtcTxn interface {
9496
//map of type and map of key/tuple
95-
GetRtcAdded () map[string]map[string]Tuple
97+
GetRtcAdded() map[string]map[string]Tuple
9698
GetRtcModified() map[string]map[string]RtcModified
9799
GetRtcDeleted() map[string]map[string]Tuple
98-
99100
}
100101

101102
type RtcModified interface {
102103
GetTuple() Tuple
103104
GetModifiedProps() map[string]bool
104105
}
105106

106-
type RtcTransactionHandler func (ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{})
107-
107+
type RtcTransactionHandler func(ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{})

go.mod

+5
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,10 @@ require (
44
github.com/aws/aws-sdk-go v1.18.3
55
github.com/gorilla/websocket v1.4.0
66
github.com/oklog/ulid v1.3.1
7+
github.com/project-flogo/contrib/trigger/kafka v0.9.0
8+
github.com/project-flogo/contrib/trigger/rest v0.9.0
79
github.com/project-flogo/core v0.9.3
10+
github.com/stretchr/testify v1.3.0
811
)
12+
13+
go 1.13

rete/classnode.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type classNode interface {
1414
addClassNodeLink(classNodeLink)
1515
removeClassNodeLink(classNodeLink)
1616
getClassNodeLinks() *list.List
17-
assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool)
17+
assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string)
1818
}
1919

2020
type classNodeImpl struct {
@@ -74,13 +74,18 @@ func (cn *classNodeImpl) String() string {
7474
return ret
7575
}
7676

77-
func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool) {
77+
func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string) {
7878
handle := getOrCreateHandle(ctx, tuple)
7979
handles := make([]reteHandle, 1)
8080
handles[0] = handle
8181
propagate := false
8282
for e := cn.getClassNodeLinks().Front(); e != nil; e = e.Next() {
8383
classNodeLinkVar := e.Value.(classNodeLink)
84+
if forRule != "" {
85+
if classNodeLinkVar.getRule().GetName() != forRule {
86+
continue
87+
}
88+
}
8489
if changedProps != nil {
8590
depProps, found := classNodeLinkVar.getRule().GetDeps()[model.TupleType(cn.name)]
8691
if found { // rule depends on this type
@@ -98,5 +103,8 @@ func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedP
98103
if propagate {
99104
classNodeLinkVar.propagateObjects(ctx, handles)
100105
}
106+
if forRule != "" {
107+
break
108+
}
101109
}
102110
}

rete/network.go

+66-45
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import (
44
"context"
55
"fmt"
66
"math"
7+
"time"
78

89
"github.com/project-flogo/rules/common/model"
910

1011
"container/list"
1112
"sync"
12-
"time"
1313
)
1414

1515
type RtcOprn int
@@ -23,7 +23,7 @@ const (
2323

2424
//Network ... the rete network
2525
type Network interface {
26-
AddRule(model.Rule) error
26+
AddRule(rule model.Rule) error
2727
String() string
2828
RemoveRule(string) model.Rule
2929
GetRules() []model.Rule
@@ -34,7 +34,7 @@ type Network interface {
3434

3535
retractInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn)
3636

37-
assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn)
37+
assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string)
3838
getOrCreateHandle(ctx context.Context, tuple model.Tuple) reteHandle
3939
getHandle(tuple model.Tuple) reteHandle
4040

@@ -43,6 +43,7 @@ type Network interface {
4343
GetAssertedTupleByStringKey(key string) model.Tuple
4444
//RtcTransactionHandler
4545
RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{})
46+
ReplayTuplesForRule(ruleName string, rs model.RuleSession) (err error)
4647
}
4748

4849
type reteNetworkImpl struct {
@@ -63,7 +64,7 @@ type reteNetworkImpl struct {
6364
currentId int
6465

6566
assertLock sync.Mutex
66-
crudLock sync.Mutex
67+
//crudLock sync.Mutex
6768
txnHandler model.RtcTransactionHandler
6869
txnContext interface{}
6970
}
@@ -84,9 +85,8 @@ func (nw *reteNetworkImpl) initReteNetwork() {
8485
}
8586

8687
func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) {
87-
88-
nw.crudLock.Lock()
89-
defer nw.crudLock.Unlock()
88+
nw.assertLock.Lock()
89+
defer nw.assertLock.Unlock()
9090

9191
if nw.allRules[rule.GetName()] != nil {
9292
return fmt.Errorf("Rule already exists.." + rule.GetName())
@@ -137,6 +137,22 @@ func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) {
137137

138138
//Add NodeLinks
139139
nw.ruleNameClassNodeLinksOfRule[rule.GetName()] = classNodeLinksOfRule
140+
141+
return nil
142+
}
143+
144+
func (nw *reteNetworkImpl) ReplayTuplesForRule(ruleName string, rs model.RuleSession) error {
145+
if rule, exists := nw.allRules[ruleName]; !exists {
146+
return fmt.Errorf("Rule not found [%s]", ruleName)
147+
} else {
148+
for _, h := range nw.allHandles {
149+
tt := h.getTuple().GetTupleType()
150+
if ContainedByFirst(rule.GetIdentifiers(), []model.TupleType{tt}) {
151+
//assert it but only for this rule.
152+
nw.assert(nil, rs, h.getTuple(), nil, ADD, ruleName)
153+
}
154+
}
155+
}
140156
return nil
141157
}
142158

@@ -146,8 +162,8 @@ func (nw *reteNetworkImpl) setClassNodeAndLinkJoinTables(nodesOfRule *list.List,
146162

147163
func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule {
148164

149-
nw.crudLock.Lock()
150-
defer nw.crudLock.Unlock()
165+
nw.assertLock.Lock()
166+
defer nw.assertLock.Unlock()
151167

152168
rule := nw.allRules[ruleName]
153169
delete(nw.allRules, ruleName)
@@ -180,7 +196,8 @@ func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule {
180196
}
181197
}
182198
}
183-
199+
rstr := nw.String()
200+
fmt.Printf(rstr)
184201
return rule
185202
}
186203

@@ -262,7 +279,7 @@ func (nw *reteNetworkImpl) buildNetwork(rule model.Rule, nodesOfRule *list.List,
262279
lastNode = fNode
263280
}
264281
//Yoohoo! We have a Rule!!
265-
ruleNode := newRuleNode(rule)
282+
ruleNode := newRuleNode(nw, rule)
266283
newNodeLink(nw, lastNode, ruleNode, false)
267284
nodesOfRule.PushBack(ruleNode)
268285
} else {
@@ -528,36 +545,7 @@ func (nw *reteNetworkImpl) printClassNode(ruleName string, classNodeImpl *classN
528545
}
529546

530547
func (nw *reteNetworkImpl) Assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) {
531-
532-
if ctx == nil {
533-
ctx = context.Background()
534-
}
535-
536-
reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs)
537-
538-
if !isRecursive {
539-
nw.crudLock.Lock()
540-
defer nw.crudLock.Unlock()
541-
nw.assertInternal(newCtx, tuple, changedProps, mode)
542-
reteCtxVar.getConflictResolver().resolveConflict(newCtx)
543-
//if Timeout is 0, remove it from rete
544-
td := model.GetTupleDescriptor(tuple.GetTupleType())
545-
if td != nil {
546-
if td.TTLInSeconds == 0 { //remove immediately.
547-
nw.removeTupleFromRete(tuple)
548-
} else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE
549-
go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() {
550-
nw.removeTupleFromRete(tuple)
551-
})
552-
} //else, its -ve and means, never expire
553-
}
554-
if nw.txnHandler != nil {
555-
rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted())
556-
nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext)
557-
}
558-
} else {
559-
reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode))
560-
}
548+
nw.assert(ctx, rs, tuple, changedProps, mode, "")
561549
}
562550

563551
func (nw *reteNetworkImpl) removeTupleFromRete(tuple model.Tuple) {
@@ -575,8 +563,8 @@ func (nw *reteNetworkImpl) Retract(ctx context.Context, tuple model.Tuple, chang
575563
}
576564
reteCtxVar, isRecursive, _ := getOrSetReteCtx(ctx, nw, nil)
577565
if !isRecursive {
578-
nw.crudLock.Lock()
579-
defer nw.crudLock.Unlock()
566+
nw.assertLock.Lock()
567+
defer nw.assertLock.Unlock()
580568
nw.retractInternal(ctx, tuple, changedProps, mode)
581569
if nw.txnHandler != nil && mode == DELETE {
582570
rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted())
@@ -622,12 +610,12 @@ func (nw *reteNetworkImpl) GetAssertedTupleByStringKey(key string) model.Tuple {
622610
return nil
623611
}
624612

625-
func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) {
613+
func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) {
626614
tupleType := tuple.GetTupleType()
627615
listItem := nw.allClassNodes[string(tupleType)]
628616
if listItem != nil {
629617
classNodeVar := listItem.(classNode)
630-
classNodeVar.assert(ctx, tuple, changedProps)
618+
classNodeVar.assert(ctx, tuple, changedProps, forRule)
631619
}
632620
td := model.GetTupleDescriptor(tuple.GetTupleType())
633621
if td != nil {
@@ -667,3 +655,36 @@ func (nw *reteNetworkImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTra
667655
nw.txnHandler = txnHandler
668656
nw.txnContext = txnContext
669657
}
658+
659+
func (nw *reteNetworkImpl) assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) {
660+
661+
if ctx == nil {
662+
ctx = context.Background()
663+
}
664+
665+
reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs)
666+
667+
if !isRecursive {
668+
nw.assertLock.Lock()
669+
defer nw.assertLock.Unlock()
670+
nw.assertInternal(newCtx, tuple, changedProps, mode, forRule)
671+
reteCtxVar.getConflictResolver().resolveConflict(newCtx)
672+
//if Timeout is 0, remove it from rete
673+
td := model.GetTupleDescriptor(tuple.GetTupleType())
674+
if td != nil {
675+
if td.TTLInSeconds == 0 { //remove immediately.
676+
nw.removeTupleFromRete(tuple)
677+
} else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE
678+
go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() {
679+
nw.removeTupleFromRete(tuple)
680+
})
681+
} //else, its -ve and means, never expire
682+
}
683+
if nw.txnHandler != nil {
684+
rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted())
685+
nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext)
686+
}
687+
} else {
688+
reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode))
689+
}
690+
}

rete/opsList.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func newAssertEntry(tuple model.Tuple, changeProps map[string]bool, mode RtcOprn
3636

3737
func (ai *assertEntryImpl) execute(ctx context.Context) {
3838
reteCtx := getReteCtx(ctx)
39-
reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode)
39+
reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode, "")
4040
}
4141

4242
//Modify Entry

rete/rulenode.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ type ruleNodeImpl struct {
1818
rule model.Rule
1919
}
2020

21-
func newRuleNode(rule model.Rule) ruleNode {
21+
func newRuleNode(nw Network, rule model.Rule) ruleNode {
2222
rn := ruleNodeImpl{}
23+
rn.nodeImpl.initNodeImpl(nw, rule, rule.GetIdentifiers())
2324
rn.identifiers = rule.GetIdentifiers()
2425
rn.rule = rule
2526
return &rn

ruleapi/rulesession.go

+4
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,7 @@ func (rs *rulesessionImpl) GetAssertedTuple(key model.TupleKey) model.Tuple {
174174
func (rs *rulesessionImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{}) {
175175
rs.reteNetwork.RegisterRtcTransactionHandler(txnHandler, txnContext)
176176
}
177+
178+
func (rs *rulesessionImpl) ReplayTuplesForRule(ruleName string) (err error) {
179+
return rs.reteNetwork.ReplayTuplesForRule(ruleName, rs)
180+
}

0 commit comments

Comments
 (0)