forked from php/frankenphp
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworkerextension.go
More file actions
167 lines (142 loc) · 5.36 KB
/
Copy pathworkerextension.go
File metadata and controls
167 lines (142 loc) · 5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package frankenphp
import (
"context"
"log/slog"
"net/http"
"sync"
"sync/atomic"
)
// EXPERIMENTAL: Worker allows you to register a worker where instead of calling FrankenPHP handlers on
// frankenphp_handle_request(), the ProvideRequest method is called. You may provide a standard
// http.Request that will be conferred to the underlying worker script.
//
// A worker script with the provided Name and FileName will be registered, along with the provided
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
// These methods are only called once at startup, so register them in an init() function.
//
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
//
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type Worker interface {
Name() string
FileName() string
Env() PreparedEnv
GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
ProvideRequest() *WorkerRequest
InjectRequest(r *WorkerRequest)
}
// EXPERIMENTAL
type WorkerRequest struct {
// The request for your worker script to handle
Request *http.Request
// Response is a response writer that provides the output of the provided request, it must not be nil to access the request body
Response http.ResponseWriter
// CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback
CallbackParameters any
// AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter
AfterFunc func(callbackReturn any)
}
var extensionWorkers = make(map[string]Worker)
var extensionWorkersMutex sync.Mutex
// EXPERIMENTAL
func RegisterWorker(worker Worker) {
extensionWorkersMutex.Lock()
defer extensionWorkersMutex.Unlock()
extensionWorkers[worker.Name()] = worker
}
// startWorker creates a pipe from a worker to the main worker.
func startWorker(w *worker, extensionWorker Worker, thread *phpThread) {
for {
rq := extensionWorker.ProvideRequest()
var fc *frankenPHPContext
if rq.Request == nil {
fc = newFrankenPHPContext()
fc.logger = logger
} else {
fr, err := NewRequestWithContext(rq.Request, WithOriginalRequest(rq.Request))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
continue
}
var ok bool
if fc, ok = fromContext(fr.Context()); !ok {
continue
}
}
fc.worker = w
fc.responseWriter = rq.Response
fc.handlerParameters = rq.CallbackParameters
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
w.requestChan <- fc
if rq.AfterFunc != nil {
go func() {
<-fc.done
if rq.AfterFunc != nil {
rq.AfterFunc(fc.handlerReturn)
}
}()
}
}
}
func NewWorker(name, fileName string, minThreads int, env PreparedEnv) Worker {
return &defaultWorker{
name: name,
fileName: fileName,
env: env,
minThreads: minThreads,
requestChan: make(chan *WorkerRequest),
activatedCount: atomic.Int32{},
drainCount: atomic.Int32{},
}
}
type defaultWorker struct {
name string
fileName string
env PreparedEnv
minThreads int
requestChan chan *WorkerRequest
activatedCount atomic.Int32
drainCount atomic.Int32
}
func (w *defaultWorker) Name() string {
return w.name
}
func (w *defaultWorker) FileName() string {
return w.fileName
}
func (w *defaultWorker) Env() PreparedEnv {
return w.env
}
func (w *defaultWorker) GetMinThreads() int {
return w.minThreads
}
func (w *defaultWorker) ThreadActivatedNotification(_ int) {
w.activatedCount.Add(1)
}
func (w *defaultWorker) ThreadDrainNotification(_ int) {
w.drainCount.Add(1)
}
func (w *defaultWorker) ThreadDeactivatedNotification(_ int) {
w.drainCount.Add(-1)
w.activatedCount.Add(-1)
}
func (w *defaultWorker) ProvideRequest() *WorkerRequest {
return <-w.requestChan
}
func (w *defaultWorker) InjectRequest(r *WorkerRequest) {
w.requestChan <- r
}