-
-
Notifications
You must be signed in to change notification settings - Fork 320
Expand file tree
/
Copy pathengine.go
More file actions
186 lines (151 loc) · 4.72 KB
/
engine.go
File metadata and controls
186 lines (151 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package ferret
import (
"context"
"errors"
"fmt"
"github.com/MontFerret/ferret/v2/pkg/bytecode"
"github.com/MontFerret/ferret/v2/pkg/bytecode/artifact"
"github.com/MontFerret/ferret/v2/pkg/compiler"
"github.com/MontFerret/ferret/v2/pkg/runtime"
"github.com/MontFerret/ferret/v2/pkg/source"
"github.com/MontFerret/ferret/v2/pkg/vm"
)
// Engine compiles queries into reusable plans and runs them against the configured host.
type Engine struct {
compiler *compiler.Compiler
loader *artifact.Loader
host *host
hooks *hookRegistry
limiter *sessionLimiter
idleCap int
totalCap int
}
// New constructs an Engine from the provided options, registers all modules,
// builds the host, and runs engine init hooks. It returns an error if any
// option, module registration, or init hook fails.
func New(setters ...Option) (*Engine, error) {
opts, err := newOptions(setters)
if err != nil {
return nil, err
}
boot, err := newBootstrap(opts)
if err != nil {
return nil, fmt.Errorf("bootstrap: %w", err)
}
for _, m := range opts.modules {
if err := m.Register(boot); err != nil {
if closeErr := boot.hooks.engine.runCloseHooks(); closeErr != nil {
return nil, errors.Join(err, fmt.Errorf("close hooks: %w", closeErr))
}
return nil, err
}
}
h, err := boot.host.Build()
if err != nil {
if closeErr := boot.hooks.engine.runCloseHooks(); closeErr != nil {
return nil, errors.Join(err, fmt.Errorf("close hooks: %w", closeErr))
}
return nil, err
}
hooks := boot.hooks.clone()
// Run init hooks after bootstrap is finalized and before returning the engine.
if err := hooks.engine.runInitHooks(); err != nil {
initErr := fmt.Errorf("init hooks: %w", err)
if closeErr := hooks.engine.runCloseHooks(); closeErr != nil {
return nil, errors.Join(initErr, fmt.Errorf("close hooks: %w", closeErr))
}
return nil, initErr
}
return &Engine{
compiler: compiler.New(opts.compiler...),
loader: opts.programLoader,
host: h,
hooks: hooks,
limiter: newSessionLimiter(opts.maxActiveSessions),
idleCap: opts.maxIdleVMsPerPlan,
totalCap: opts.maxVMsPerPlan,
}, nil
}
// Compile compiles source into a reusable execution plan.
func (e *Engine) Compile(ctx context.Context, src *source.Source) (*Plan, error) {
if e == nil {
return nil, runtime.Error(runtime.ErrInvalidOperation, "engine is nil")
}
if err := e.hooks.plan.runBeforeCompileHooks(ctx); err != nil {
return nil, fmt.Errorf("before compile hooks: %w", err)
}
prog, err := e.compiler.Compile(src)
// After-compile hooks always run and receive the compilation error (if any).
if hookErr := e.hooks.plan.runAfterCompileHooks(ctx, err); hookErr != nil {
return nil, errors.Join(err, fmt.Errorf("after compile hooks: %w", hookErr))
}
if err != nil {
return nil, err
}
return e.newPlan(prog)
}
// Load decodes a serialized program artifact and wraps it in a reusable plan.
func (e *Engine) Load(data []byte) (*Plan, error) {
if e == nil {
return nil, runtime.Error(runtime.ErrInvalidOperation, "engine is nil")
}
prog, err := e.loader.Load(data)
if err != nil {
return nil, err
}
return e.newPlan(prog)
}
// Run compiles source, executes it in a fresh session, and returns encoded output and an error.
// Similar to Session.Run, it may return a non-nil *Output together with a non-nil error
// (for example, if execution produced output but a deferred cleanup step failed).
func (e *Engine) Run(ctx context.Context, src *source.Source, opts ...SessionOption) (*Output, error) {
plan, err := e.Compile(ctx, src)
if err != nil {
return nil, err
}
var session *Session
defer func() {
logger := e.host.logger
if session != nil {
if closeErr := session.Close(); closeErr != nil {
logger.Error().
Err(closeErr).
Str("phase", "session").
Str("operation", "close").
Msg("deferred cleanup failed")
}
}
if closeErr := plan.Close(); closeErr != nil {
logger.Error().
Err(closeErr).
Str("phase", "plan").
Str("operation", "close").
Msg("deferred cleanup failed")
}
}()
session, err = plan.NewSession(ctx, opts...)
if err != nil {
return nil, err
}
return session.Run(ctx)
}
// Close runs the engine close hooks and releases engine-scoped resources.
func (e *Engine) Close() error {
if err := e.hooks.engine.runCloseHooks(); err != nil {
return fmt.Errorf("close hooks: %w", err)
}
return nil
}
func (e *Engine) newPlan(prog *bytecode.Program) (*Plan, error) {
if e == nil {
return nil, runtime.Error(runtime.ErrInvalidOperation, "engine is nil")
}
return &Plan{
prog: prog,
host: e.host,
hooks: e.hooks.plan,
sessionHooks: e.hooks.session,
limiter: e.limiter,
pool: vm.NewPoolWithLimits(prog, e.idleCap, e.totalCap),
}, nil
}