Skip to content

Commit f20fb67

Browse files
committed
Experimental support for accessing ziti components over the mgmt/ctrl channels. Fixes #2439
1 parent 8b410b9 commit f20fb67

File tree

34 files changed

+3313
-681
lines changed

34 files changed

+3313
-681
lines changed

common/datapipe/config.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
Copyright NetFoundry Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datapipe
18+
19+
import (
20+
"fmt"
21+
"github.com/gliderlabs/ssh"
22+
"github.com/michaelquigley/pfxlog"
23+
"github.com/openziti/identity"
24+
gossh "golang.org/x/crypto/ssh"
25+
"os"
26+
"path"
27+
"strconv"
28+
"strings"
29+
)
30+
31+
type LocalAccessType string
32+
33+
const (
34+
LocalAccessTypeNone LocalAccessType = ""
35+
LocalAccessTypePort LocalAccessType = "local-port"
36+
LocalAccessTypeEmbeddedSshServer LocalAccessType = "embedded-ssh-server"
37+
)
38+
39+
type Config struct {
40+
Enabled bool
41+
LocalAccessType LocalAccessType // values: 'none', 'localhost:port', 'embedded'
42+
DestinationPort uint16
43+
AuthorizedKeysFile string
44+
HostKey gossh.Signer
45+
ShellPath string
46+
}
47+
48+
func (self *Config) IsLocalAccessAllowed() bool {
49+
return self.Enabled && self.LocalAccessType != LocalAccessTypeNone
50+
}
51+
52+
func (self *Config) IsLocalPort() bool {
53+
return self.LocalAccessType == LocalAccessTypePort
54+
}
55+
56+
func (self *Config) IsEmbedded() bool {
57+
return self.LocalAccessType == LocalAccessTypeEmbeddedSshServer
58+
}
59+
60+
func (self *Config) LoadConfig(m map[interface{}]interface{}) error {
61+
log := pfxlog.Logger()
62+
if v, ok := m["enabled"]; ok {
63+
if enabled, ok := v.(bool); ok {
64+
self.Enabled = enabled
65+
} else {
66+
self.Enabled = strings.EqualFold("true", fmt.Sprintf("%v", v))
67+
}
68+
}
69+
if v, ok := m["enableExperimentalFeature"]; ok {
70+
if enabled, ok := v.(bool); ok {
71+
if !enabled {
72+
self.Enabled = false
73+
}
74+
} else if !strings.EqualFold("true", fmt.Sprintf("%v", v)) {
75+
self.Enabled = false
76+
}
77+
} else {
78+
self.Enabled = false
79+
}
80+
81+
if self.Enabled {
82+
log.Infof("mgmt.pipe enabled")
83+
if v, ok := m["destination"]; ok {
84+
if destination, ok := v.(string); ok {
85+
if strings.HasPrefix(destination, "127.0.0.1:") {
86+
self.LocalAccessType = LocalAccessTypePort
87+
portStr := strings.TrimPrefix(destination, "127.0.0.1:")
88+
port, err := strconv.ParseUint(portStr, 10, 16)
89+
if err != nil {
90+
log.WithError(err).Warn("mgmt.pipe is enabled, but destination not valid. Must be '127.0.0.1:<port>' or 'embedded'")
91+
self.Enabled = false
92+
return nil
93+
}
94+
self.DestinationPort = uint16(port)
95+
} else if destination == "embedded-ssh-server" {
96+
self.LocalAccessType = LocalAccessTypeEmbeddedSshServer
97+
98+
if v, ok = m["authorizedKeysFile"]; ok {
99+
if keysFile, ok := v.(string); ok {
100+
self.AuthorizedKeysFile = keysFile
101+
} else {
102+
log.Warnf("mgmt.pipe is enabled, but 'embedded' destination configured and authorizedKeysFile configuration is not type string, but %T", v)
103+
self.Enabled = false
104+
return nil
105+
}
106+
}
107+
108+
if v, ok = m["shell"]; ok {
109+
if s, ok := v.(string); ok {
110+
self.ShellPath = s
111+
} else {
112+
log.Warnf("mgmt.pipe is enabled, but 'embedded' destination configured and shell configuration is not type string, but %T", v)
113+
}
114+
}
115+
} else {
116+
log.Warn("mgmt.pipe is enabled, but destination not valid. Must be 'localhost:port' or 'embedded'")
117+
self.Enabled = false
118+
return nil
119+
}
120+
}
121+
} else {
122+
self.Enabled = false
123+
log.Warn("mgmt.pipe is enabled, but destination not specified. mgmt.pipe disabled.")
124+
return nil
125+
}
126+
} else {
127+
log.Infof("mgmt.pipe disabled")
128+
}
129+
return nil
130+
}
131+
132+
func (self *Config) NewSshRequestHandler(identity *identity.TokenId) (*SshRequestHandler, error) {
133+
if self.HostKey == nil {
134+
signer, err := gossh.NewSignerFromKey(identity.Cert().PrivateKey)
135+
if err != nil {
136+
return nil, err
137+
}
138+
self.HostKey = signer
139+
}
140+
141+
keysFile := self.AuthorizedKeysFile
142+
if keysFile == "" {
143+
homeDir, err := os.UserHomeDir()
144+
if err != nil {
145+
return nil, fmt.Errorf("could not set up ssh request handler, failing get home dir, trying to load default authorized keys (%w)", err)
146+
}
147+
keysFile = path.Join(homeDir, ".ssh", "authorized_keys")
148+
}
149+
150+
keysFileContents, err := os.ReadFile(keysFile)
151+
if err != nil {
152+
return nil, fmt.Errorf("could not set up ssh request handler, failed to load authorized keys from '%s' (%w)", keysFile, err)
153+
}
154+
155+
authorizedKeys := map[string]struct{}{}
156+
entryIdx := 0
157+
for len(keysFileContents) > 0 {
158+
pubKey, _, _, rest, err := gossh.ParseAuthorizedKey(keysFileContents)
159+
if err != nil {
160+
return nil, fmt.Errorf("could not set up ssh request handler, failed to load authorized key at index %d from '%s' (%w)", entryIdx, keysFile, err)
161+
}
162+
163+
authorizedKeys[string(pubKey.Marshal())] = struct{}{}
164+
keysFileContents = rest
165+
entryIdx++
166+
}
167+
168+
publicKeyOption := ssh.PublicKeyAuth(func(ctx ssh.Context, key ssh.PublicKey) bool {
169+
_, found := authorizedKeys[string(key.Marshal())]
170+
return found
171+
})
172+
173+
return &SshRequestHandler{
174+
config: self,
175+
options: []ssh.Option{publicKeyOption},
176+
}, nil
177+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package datapipe
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/michaelquigley/pfxlog"
7+
"github.com/openziti/channel/v3"
8+
"github.com/openziti/foundation/v2/concurrenz"
9+
"io"
10+
"net"
11+
"sync/atomic"
12+
"time"
13+
)
14+
15+
type MessageTypes struct {
16+
DataMessageType int32
17+
PipeIdHeaderType int32
18+
CloseMessageType int32
19+
}
20+
21+
func NewEmbeddedSshConn(ch channel.Channel, id uint32, msgTypes *MessageTypes) *EmbeddedSshConn {
22+
return &EmbeddedSshConn{
23+
id: id,
24+
ch: ch,
25+
ReadAdapter: channel.NewReadAdapter(fmt.Sprintf("pipe-%d", id), 4),
26+
msgTypes: msgTypes,
27+
}
28+
}
29+
30+
type EmbeddedSshConn struct {
31+
msgTypes *MessageTypes
32+
id uint32
33+
ch channel.Channel
34+
closed atomic.Bool
35+
*channel.ReadAdapter
36+
sshConn concurrenz.AtomicValue[io.Closer]
37+
deadline atomic.Pointer[time.Time]
38+
}
39+
40+
func (self *EmbeddedSshConn) Id() uint32 {
41+
return self.id
42+
}
43+
44+
func (self *EmbeddedSshConn) SetSshConn(conn io.Closer) {
45+
self.sshConn.Store(conn)
46+
}
47+
48+
func (self *EmbeddedSshConn) WriteToServer(data []byte) error {
49+
return self.ReadAdapter.PushData(data)
50+
}
51+
52+
func (self *EmbeddedSshConn) Write(data []byte) (n int, err error) {
53+
msg := channel.NewMessage(self.msgTypes.DataMessageType, data)
54+
msg.PutUint32Header(self.msgTypes.PipeIdHeaderType, self.id)
55+
deadline := time.Second
56+
if val := self.deadline.Load(); val != nil && !val.IsZero() {
57+
deadline = time.Until(*val)
58+
}
59+
return len(data), msg.WithTimeout(deadline).SendAndWaitForWire(self.ch)
60+
}
61+
62+
func (self *EmbeddedSshConn) Close() error {
63+
self.CloseWithErr(errors.New("close called"))
64+
return nil
65+
}
66+
67+
func (self *EmbeddedSshConn) CloseWithErr(err error) {
68+
if self.closed.CompareAndSwap(false, true) {
69+
self.ReadAdapter.Close()
70+
log := pfxlog.ContextLogger(self.ch.Label()).WithField("connId", self.id)
71+
72+
log.WithError(err).Info("closing mgmt pipe connection")
73+
74+
if sshConn := self.sshConn.Load(); sshConn != nil {
75+
if closeErr := sshConn.Close(); closeErr != nil {
76+
log.WithError(closeErr).Error("failed closing mgmt pipe embedded ssh connection")
77+
}
78+
}
79+
80+
if !self.ch.IsClosed() && err != io.EOF && err != nil {
81+
msg := channel.NewMessage(self.msgTypes.CloseMessageType, []byte(err.Error()))
82+
msg.PutUint32Header(self.msgTypes.PipeIdHeaderType, self.id)
83+
if sendErr := self.ch.Send(msg); sendErr != nil {
84+
log.WithError(sendErr).Error("failed sending mgmt pipe close message")
85+
}
86+
}
87+
88+
if closeErr := self.ch.Close(); closeErr != nil {
89+
log.WithError(closeErr).Error("failed closing mgmt pipe client channel")
90+
}
91+
}
92+
}
93+
94+
func (self *EmbeddedSshConn) LocalAddr() net.Addr {
95+
return embeddedSshPipeAddr{
96+
id: self.id,
97+
}
98+
}
99+
100+
func (self *EmbeddedSshConn) RemoteAddr() net.Addr {
101+
return embeddedSshPipeAddr{
102+
id: self.id,
103+
}
104+
}
105+
106+
func (self *EmbeddedSshConn) SetDeadline(t time.Time) error {
107+
if err := self.ReadAdapter.SetReadDeadline(t); err != nil {
108+
return err
109+
}
110+
return self.SetWriteDeadline(t)
111+
}
112+
113+
func (self *EmbeddedSshConn) SetWriteDeadline(t time.Time) error {
114+
self.deadline.Store(&t)
115+
return nil
116+
}
117+
118+
func (self *EmbeddedSshConn) WriteToClient(data []byte) error {
119+
_, err := self.Write(data)
120+
return err
121+
}
122+
123+
type embeddedSshPipeAddr struct {
124+
id uint32
125+
}
126+
127+
func (self embeddedSshPipeAddr) Network() string {
128+
return "ziti"
129+
}
130+
131+
func (self embeddedSshPipeAddr) String() string {
132+
return fmt.Sprintf("ssh-pipe-%d", self.id)
133+
}

common/datapipe/registry.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright NetFoundry Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datapipe
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
"github.com/michaelquigley/pfxlog"
23+
"github.com/openziti/foundation/v2/concurrenz"
24+
"sync"
25+
"sync/atomic"
26+
)
27+
28+
type Pipe interface {
29+
Id() uint32
30+
WriteToServer(data []byte) error
31+
WriteToClient(data []byte) error
32+
CloseWithErr(err error)
33+
}
34+
35+
func NewRegistry(config *Config) *Registry {
36+
return &Registry{
37+
config: config,
38+
}
39+
}
40+
41+
type Registry struct {
42+
lock sync.Mutex
43+
nextId atomic.Uint32
44+
connections concurrenz.CopyOnWriteMap[uint32, Pipe]
45+
config *Config
46+
}
47+
48+
func (self *Registry) GetConfig() *Config {
49+
return self.config
50+
}
51+
52+
func (self *Registry) GetNextId() (uint32, error) {
53+
self.lock.Lock()
54+
defer self.lock.Unlock()
55+
56+
limit := 0
57+
for {
58+
nextId := self.nextId.Add(1)
59+
if val := self.connections.Get(nextId); val == nil {
60+
return nextId, nil
61+
}
62+
if limit++; limit >= 1000 {
63+
return 0, errors.New("pipe pool in bad state, bailing out after 1000 attempts to get next id")
64+
}
65+
}
66+
}
67+
68+
func (self *Registry) Register(pipe Pipe) error {
69+
self.lock.Lock()
70+
defer self.lock.Unlock()
71+
72+
if self.connections.Get(pipe.Id()) != nil {
73+
pfxlog.Logger().Errorf("pipe already registered (id=%d)", pipe.Id())
74+
return fmt.Errorf("pipe already registered")
75+
}
76+
77+
self.connections.Put(pipe.Id(), pipe)
78+
return nil
79+
}
80+
81+
func (self *Registry) Unregister(id uint32) {
82+
self.connections.Delete(id)
83+
}
84+
85+
func (self *Registry) Get(id uint32) Pipe {
86+
return self.connections.Get(id)
87+
}

0 commit comments

Comments
 (0)