Skip to content

Commit d69e528

Browse files
committed
updated service description
1 parent cfe96f8 commit d69e528

3 files changed

Lines changed: 166 additions & 168 deletions

File tree

containers/aggregator/model/rdf.go

Lines changed: 166 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package model
22

33
import (
4+
"bytes"
45
"sort"
56
"strings"
7+
"time"
68

9+
"github.com/google/uuid"
710
"github.com/maartyman/rdfgo"
811
"github.com/sirupsen/logrus"
912
)
@@ -22,11 +25,12 @@ func FnOC(id string) rdfgo.INamedNode {
2225
return rdfgo.NewNamedNode(`https://w3id.org/function/vocabulary/composition#` + id)
2326
}
2427

25-
type Execution struct {
26-
URI string
27-
Transformation *Transformation
28-
Params map[string]rdfgo.ITerm
29-
Outputs map[string]rdfgo.ITerm
28+
func Dcat(id string) rdfgo.INamedNode {
29+
return rdfgo.NewNamedNode(`http://www.w3.org/ns/dcat#` + id)
30+
}
31+
32+
func Prov(id string) rdfgo.INamedNode {
33+
return rdfgo.NewNamedNode(`http://www.w3.org/ns/prov#` + id)
3034
}
3135

3236
type Application struct {
@@ -149,3 +153,160 @@ func RDFListToSlice(store rdfgo.Store, headNode rdfgo.ITerm) []rdfgo.ITerm {
149153

150154
return elements
151155
}
156+
157+
func (service *Service) FnORepresentation() ([]byte, error) {
158+
stream := rdfgo.NewStream()
159+
svcNode := rdfgo.NewNamedNode(service.URI)
160+
161+
go func() {
162+
defer close(stream)
163+
164+
// service is a agg:Service
165+
quad, err := rdfgo.NewQuad(
166+
svcNode,
167+
rdfgo.IRI.RDF.Type,
168+
Agg("Service"),
169+
nil,
170+
)
171+
if err != nil {
172+
return
173+
}
174+
stream <- quad
175+
// service is a dcat:DataService
176+
quad, err = rdfgo.NewQuad(
177+
svcNode,
178+
rdfgo.IRI.RDF.Type,
179+
Dcat("DataService"),
180+
nil,
181+
)
182+
if err != nil {
183+
return
184+
}
185+
stream <- quad
186+
// service is a prov:SoftwareAgent
187+
quad, err = rdfgo.NewQuad(
188+
svcNode,
189+
rdfgo.IRI.RDF.Type,
190+
Prov("SoftwareAgent"),
191+
nil,
192+
)
193+
if err != nil {
194+
return
195+
}
196+
stream <- quad
197+
198+
// SERVICE DETAILS
199+
// service status
200+
quad, err = rdfgo.NewQuad(
201+
svcNode,
202+
Agg("status"),
203+
rdfgo.NewStringLiteral(service.Status(), "en"),
204+
nil,
205+
)
206+
if err != nil {
207+
return
208+
}
209+
stream <- quad
210+
211+
// service createdAt
212+
quad, err = rdfgo.NewQuad(
213+
svcNode,
214+
Agg("createdAt"),
215+
rdfgo.NewLiteral(service.CreatedAt.Format(time.RFC3339), "", DateTime),
216+
nil,
217+
)
218+
if err != nil {
219+
return
220+
}
221+
stream <- quad
222+
223+
// service performs
224+
quad, err = rdfgo.NewQuad(
225+
svcNode,
226+
Agg("performs"),
227+
rdfgo.NewNamedNode(service.Application.Transformation.URI),
228+
nil,
229+
)
230+
if err != nil {
231+
return
232+
}
233+
stream <- quad
234+
235+
// service applies
236+
appNode := rdfgo.NewBlankNode(uuid.New().String())
237+
quad, err = rdfgo.NewQuad(
238+
svcNode,
239+
Agg("applies"),
240+
appNode,
241+
nil,
242+
)
243+
if err != nil {
244+
return
245+
}
246+
stream <- quad
247+
248+
// Application details
249+
// application applies transformation
250+
quad, err = rdfgo.NewQuad(
251+
appNode,
252+
FnOC("applies"),
253+
rdfgo.NewNamedNode(service.Application.Transformation.URI),
254+
nil,
255+
)
256+
if err != nil {
257+
return
258+
}
259+
stream <- quad
260+
261+
// application binds parameters
262+
for param, value := range service.Application.Params {
263+
bindingNode := rdfgo.NewBlankNode(uuid.New().String())
264+
quad, err = rdfgo.NewQuad(
265+
bindingNode,
266+
FnOC("parameterBinding"),
267+
value,
268+
nil,
269+
)
270+
if err != nil {
271+
return
272+
}
273+
stream <- quad
274+
275+
// application parameterBinding boundParameter
276+
quad, err = rdfgo.NewQuad(
277+
bindingNode,
278+
FnOC("boundParameter"),
279+
rdfgo.NewNamedNode(param),
280+
nil,
281+
)
282+
if err != nil {
283+
return
284+
}
285+
stream <- quad
286+
287+
// application parameterBinding boundToTerm
288+
quad, err = rdfgo.NewQuad(
289+
bindingNode,
290+
FnOC("boundToTerm"),
291+
value,
292+
nil,
293+
)
294+
if err != nil {
295+
return
296+
}
297+
stream <- quad
298+
}
299+
300+
// outputs
301+
302+
}()
303+
304+
// serialize to turtle
305+
var buf bytes.Buffer
306+
_, err := rdfgo.Write(stream.ToIStream(), &buf, rdfgo.WriterOptions{Format: "turtle"})
307+
if err != nil {
308+
return nil, err
309+
}
310+
311+
return buf.Bytes(), nil
312+
}
Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package model
22

33
import (
4-
"bytes"
54
"context"
65
"fmt"
76
"time"
87

9-
"github.com/maartyman/rdfgo"
108
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
119
)
1210

@@ -104,97 +102,3 @@ func (service *Service) Status() string {
104102
// All deployments have available replicas & services have endpoints
105103
return "running"
106104
}
107-
108-
func (service *Service) FnORepresentation() ([]byte, error) {
109-
stream := rdfgo.NewStream()
110-
svcNode := rdfgo.NewNamedNode(service.URI)
111-
112-
go func() {
113-
defer close(stream)
114-
115-
// service URI is a service and execution
116-
quad, err := rdfgo.NewQuad(
117-
svcNode,
118-
rdfgo.IRI.RDF.Type,
119-
Agg("Service"),
120-
nil,
121-
)
122-
if err != nil {
123-
return
124-
}
125-
stream <- quad
126-
quad, err = rdfgo.NewQuad(
127-
svcNode,
128-
rdfgo.IRI.RDF.Type,
129-
FnO("Execution"),
130-
nil,
131-
)
132-
if err != nil {
133-
return
134-
}
135-
stream <- quad
136-
137-
// SERVICE DETAILS
138-
// service status
139-
quad, err = rdfgo.NewQuad(
140-
svcNode,
141-
Agg("status"),
142-
rdfgo.NewStringLiteral(service.Status(), "en"),
143-
nil,
144-
)
145-
if err != nil {
146-
return
147-
}
148-
stream <- quad
149-
150-
// service createdAt
151-
quad, err = rdfgo.NewQuad(
152-
svcNode,
153-
Agg("createdAt"),
154-
rdfgo.NewLiteral(service.CreatedAt.Format(time.RFC3339), "", DateTime),
155-
nil,
156-
)
157-
if err != nil {
158-
return
159-
}
160-
stream <- quad
161-
162-
// EXECUTION DETAILS
163-
// transformation
164-
quad, err = rdfgo.NewQuad(
165-
svcNode,
166-
FnO("executes"),
167-
rdfgo.NewNamedNode(service.Application.Transformation.URI),
168-
nil,
169-
)
170-
if err != nil {
171-
return
172-
}
173-
stream <- quad
174-
175-
// parameters
176-
for param, value := range service.Application.Params {
177-
quad, err = rdfgo.NewQuad(
178-
svcNode,
179-
rdfgo.NewNamedNode(param),
180-
value,
181-
nil,
182-
)
183-
if err != nil {
184-
return
185-
}
186-
stream <- quad
187-
}
188-
189-
// outputs
190-
}()
191-
192-
// serialize to turtle
193-
var buf bytes.Buffer
194-
_, err := rdfgo.Write(stream.ToIStream(), &buf, rdfgo.WriterOptions{Format: "turtle"})
195-
if err != nil {
196-
return nil, err
197-
}
198-
199-
return buf.Bytes(), nil
200-
}

containers/aggregator/services/util.go

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -43,73 +43,6 @@ func ValidServiceUri(uri string) (string, string, error) {
4343
return servicePath, serviceId, nil
4444
}
4545

46-
func ParseRequestBodyOld(fno string) (model.Execution, error) {
47-
quadStream, errChan := rdfgo.Parse(
48-
strings.NewReader(fno),
49-
rdfgo.ParserOptions{Format: "text/turtle"},
50-
)
51-
52-
go func() {
53-
for parseErr := range errChan {
54-
if parseErr != nil {
55-
logrus.WithError(parseErr).Warn("Error parsing FnO description")
56-
}
57-
}
58-
}()
59-
60-
store := rdfgo.NewStore()
61-
store.Import(quadStream)
62-
63-
// Find all executions
64-
executionsIter := store.Match(nil, rdfgo.IRI.RDF.Type, model.FnO("Execution"), nil)
65-
var executions []model.Execution
66-
for q := range executionsIter {
67-
exeUri := q.GetSubject()
68-
69-
// Get transformation uri
70-
for t := range store.Match(exeUri, model.FnO("executes"), nil, nil) {
71-
tfUri := strings.Trim(t.GetObject().ToString(), "<>")
72-
// Load and parse transformation
73-
tf, err := LoadTransformationCR(tfUri)
74-
if err != nil {
75-
return model.Execution{}, err
76-
}
77-
78-
// Parse parameter inputs
79-
params := make(map[string]rdfgo.ITerm)
80-
for _, pred := range tf.Params {
81-
for quad := range store.Match(exeUri, rdfgo.NewNamedNode(pred), nil, nil) {
82-
params[pred] = quad.GetObject()
83-
}
84-
}
85-
86-
// Check if all parameters have an input
87-
// TODO: check if all required parameters have an input
88-
if len(params) != len(tf.Params) {
89-
return model.Execution{}, fmt.Errorf("Not all inputs provided for execution %s", exeUri.ToString())
90-
}
91-
92-
executions = append(executions, model.Execution{
93-
URI: strings.Trim(exeUri.ToString(), "<>"),
94-
Transformation: tf,
95-
Params: params,
96-
Outputs: make(map[string]rdfgo.ITerm),
97-
})
98-
}
99-
}
100-
101-
// Enforce exactly one execution
102-
// TODO: allow multiple executions
103-
if len(executions) == 0 {
104-
return model.Execution{}, fmt.Errorf("no fno:Execution found in FnO description")
105-
}
106-
if len(executions) > 1 {
107-
return model.Execution{}, fmt.Errorf("multiple fno:Execution found; expected exactly one")
108-
}
109-
110-
return executions[0], nil
111-
}
112-
11346
func ParseRequestBody(fno string) (*model.Service, error) {
11447
quadStream, errChan := rdfgo.Parse(
11548
strings.NewReader(fno),

0 commit comments

Comments
 (0)