1+ ; ;; Agent Communication - Distributed Agent Communication Protocol
2+ ; ;; Implements atomspace-message-passing for distributed agent coordination
3+ ; ;; Part of Phase 3: Build System Orchestration - SKZ Integration
4+
5+ (define-module (cogkernel agent-communication)
6+ #:use-module (ice-9 hash-table)
7+ #:use-module (ice-9 match)
8+ #:use-module (ice-9 threads)
9+ #:use-module (ice-9 format)
10+ #:use-module (srfi srfi-1)
11+ #:use-module (srfi srfi-9)
12+ #:use-module (cogkernel atomspace)
13+ #:use-module (cogkernel agents)
14+ #:export (make-agent-communication
15+ agent-communication?
16+ send-cognitive-message
17+ receive-cognitive-message
18+ register-agent-endpoint!
19+ discover-agents
20+ broadcast-to-agents
21+ setup-distributed-communication!
22+ agent-communication-start!
23+ agent-communication-stop!
24+ *global-agent-communication*))
25+
26+ ; ;; Message types for agent communication
27+ (define message-types
28+ '(COORDINATION ; Task coordination between agents
29+ STATUS-QUERY ; Query agent status
30+ STATUS-RESPONSE ; Response to status query
31+ TASK-ASSIGNMENT ; Assign task to agent
32+ TASK-COMPLETION ; Notify task completion
33+ RESOURCE-REQUEST ; Request shared resources
34+ RESOURCE-GRANT ; Grant resource access
35+ DISCOVERY ; Agent discovery and registration
36+ HEARTBEAT ; Keep-alive messages
37+ SHUTDOWN)) ; Graceful shutdown notification
38+
39+ ; ;; Communication protocols
40+ (define communication-protocols
41+ '(atomspace-message-passing ; AtomSpace-based messaging
42+ json-rpc ; JSON-RPC for Theia integration
43+ ipc-direct ; Direct IPC for local communication
44+ tcp-socket)) ; TCP socket for network communication
45+
46+ ; ;; Agent communication record
47+ (define-record-type <agent-communication>
48+ (make-agent-communication-record protocol transport serialization
49+ endpoint-registry message-queue
50+ active? thread-pool)
51+ agent-communication?
52+ (protocol agent-communication-protocol)
53+ (transport agent-communication-transport)
54+ (serialization agent-communication-serialization)
55+ (endpoint-registry agent-communication-endpoint-registry)
56+ (message-queue agent-communication-message-queue)
57+ (active? agent-communication-active? set-agent-communication-active?!)
58+ (thread-pool agent-communication-thread-pool))
59+
60+ ; ;; Cognitive message record
61+ (define-record-type <cognitive-message>
62+ (make-cognitive-message-record id from to type payload timestamp)
63+ cognitive-message?
64+ (id cognitive-message-id)
65+ (from cognitive-message-from)
66+ (to cognitive-message-to)
67+ (type cognitive-message-type)
68+ (payload cognitive-message-payload)
69+ (timestamp cognitive-message-timestamp))
70+
71+ ; ;; Create agent communication system
72+ (define* (make-agent-communication #:key
73+ (protocol 'atomspace-message-passing )
74+ (transport 'distributed )
75+ (serialization 'atomspace-serialization ))
76+ " Create a new agent communication system"
77+ (unless (member protocol communication-protocols)
78+ (error " Invalid communication protocol:" protocol))
79+ (make-agent-communication-record
80+ protocol
81+ transport
82+ serialization
83+ (make-hash-table) ; endpoint registry
84+ '() ; message queue
85+ #f ; not active initially
86+ #f )) ; no thread pool yet
87+
88+ ; ;; Register agent endpoint for communication
89+ (define (register-agent-endpoint! comm agent-id endpoint-info )
90+ "Register an agent's communication endpoint"
91+ (hash-set! (agent-communication-endpoint-registry comm)
92+ agent-id
93+ (append endpoint-info
94+ `((registered-time ,(current-time))
95+ (protocol ,(agent-communication-protocol comm))))))
96+
97+ ; ;; Discover available agents
98+ (define (discover-agents comm )
99+ "Discover all registered agents in the communication system"
100+ (hash-fold (lambda (agent-id endpoint-info acc )
101+ (cons (list agent-id endpoint-info) acc))
102+ '()
103+ (agent-communication-endpoint-registry comm)))
104+
105+ ; ;; Serialize message for atomspace compatibility
106+ (define (serialize-cognitive-message message )
107+ "Serialize a cognitive message for atomspace transport"
108+ (match (cognitive-message-type message)
109+ ('STATUS-QUERY
110+ `(atomspace-message
111+ (type . STATUS-QUERY)
112+ (from . ,(cognitive-message-from message))
113+ (to . ,(cognitive-message-to message))
114+ (id . ,(cognitive-message-id message))
115+ (timestamp . ,(cognitive-message-timestamp message))))
116+ ('TASK-ASSIGNMENT
117+ `(atomspace-message
118+ (type . TASK-ASSIGNMENT)
119+ (from . ,(cognitive-message-from message))
120+ (to . ,(cognitive-message-to message))
121+ (task . ,(cognitive-message-payload message))
122+ (id . ,(cognitive-message-id message))
123+ (timestamp . ,(cognitive-message-timestamp message))))
124+ ('COORDINATION
125+ `(atomspace-message
126+ (type . COORDINATION)
127+ (from . ,(cognitive-message-from message))
128+ (to . ,(cognitive-message-to message))
129+ (coordination-data . ,(cognitive-message-payload message))
130+ (id . ,(cognitive-message-id message))
131+ (timestamp . ,(cognitive-message-timestamp message))))
132+ (_
133+ `(atomspace-message
134+ (type . ,(cognitive-message-type message))
135+ (from . ,(cognitive-message-from message))
136+ (to . ,(cognitive-message-to message))
137+ (payload . ,(cognitive-message-payload message))
138+ (id . ,(cognitive-message-id message))
139+ (timestamp . ,(cognitive-message-timestamp message))))))
140+
141+ ; ;; Send cognitive message to another agent
142+ (define (send-cognitive-message comm from-agent-id to-agent-id message-type payload )
143+ "Send a cognitive message from one agent to another"
144+ (let* ((message-id (string-append " msg-" (number->string (random 100000 ))))
145+ (message (make-cognitive-message-record
146+ message-id
147+ from-agent-id
148+ to-agent-id
149+ message-type
150+ payload
151+ (current-time)))
152+ (serialized-message (serialize-cognitive-message message)))
153+
154+ ; ; Check if destination agent is registered
155+ (let ((endpoint (hash-ref (agent-communication-endpoint-registry comm) to-agent-id)))
156+ (if endpoint
157+ (begin
158+ ; ; For now, simulate message delivery via atomspace
159+ (format #t " 📡 Sending message ~a: ~a -> ~a (~a)~%"
160+ message-id from-agent-id to-agent-id message-type)
161+ (format #t " Payload: ~a~%" payload)
162+ ; ; Add message to atomspace for cognitive routing
163+ (when (and (defined? '*global-atomspace* ) *global-atomspace*)
164+ (let ((message-atom (make-atom 'MESSAGE serialized-message)))
165+ (atomspace-add! *global-atomspace* message-atom)))
166+ ; ; Return message confirmation
167+ `(message-sent
168+ (id . ,message-id)
169+ (status . delivered)
170+ (timestamp . ,(current-time))))
171+ (begin
172+ (format #t " ❌ Agent ~a not found in endpoint registry~%" to-agent-id)
173+ `(message-failed
174+ (id . ,message-id)
175+ (error . agent-not-found)
176+ (timestamp . ,(current-time))))))))
177+
178+ ; ;; Receive cognitive message (simulated for demonstration)
179+ (define (receive-cognitive-message comm agent-id )
180+ "Receive cognitive messages for a specific agent"
181+ ; ; For now, simulate message reception by checking atomspace
182+ (if (and (defined? '*global-atomspace* ) *global-atomspace*)
183+ (begin
184+ (format #t " 📬 Checking messages for agent ~a~%" agent-id)
185+ ; ; In a real implementation, this would filter atomspace for messages
186+ ; ; destined for this agent
187+ ' ((type . STATUS-QUERY)
188+ (from . " system-monitor" )
189+ (payload . " status-check" )))
190+ '() ))
191+
192+ ; ;; Broadcast message to all registered agents
193+ (define (broadcast-to-agents comm from-agent-id message-type payload )
194+ "Broadcast a message to all registered agents"
195+ (let ((agent-endpoints (discover-agents comm)))
196+ (format #t " 📢 Broadcasting ~a from ~a to ~a agents~%"
197+ message-type from-agent-id (length agent-endpoints))
198+ (map (lambda (agent-endpoint )
199+ (let ((agent-id (car agent-endpoint)))
200+ (unless (string=? agent-id from-agent-id)
201+ (send-cognitive-message comm from-agent-id agent-id message-type payload))))
202+ agent-endpoints)))
203+
204+ ; ;; Setup distributed communication for agent system
205+ (define (setup-distributed-communication! agent-system )
206+ "Setup distributed communication for an existing agent system"
207+ (let ((comm (make-agent-communication
208+ #:protocol 'atomspace-message-passing
209+ #:transport 'distributed
210+ #:serialization 'atomspace-serialization )))
211+
212+ ; ; Register all agents in the communication system
213+ (hash-for-each
214+ (lambda (agent-id agent )
215+ (register-agent-endpoint! comm agent-id
216+ `((role . ,(agent-role agent))
217+ (state . ,(agent-state agent))
218+ (actions . ,(map car (agent-actions agent)))
219+ (endpoint-type . local))))
220+ (agent-system-agents agent-system))
221+
222+ (format #t " 🌐 Distributed communication setup for ~a agents~%"
223+ (hash-count (const #t ) (agent-system-agents agent-system)))
224+ comm))
225+
226+ ; ;; Start agent communication system
227+ (define (agent-communication-start! comm )
228+ "Start the agent communication system"
229+ (set-agent-communication-active?! comm #t )
230+ (format #t " 🚀 Agent communication system started (~a protocol)~%"
231+ (agent-communication-protocol comm)))
232+
233+ ; ;; Stop agent communication system
234+ (define (agent-communication-stop! comm )
235+ "Stop the agent communication system"
236+ (set-agent-communication-active?! comm #f )
237+ (format #t " 🛑 Agent communication system stopped~%" ))
238+
239+ ; ;; Global agent communication system
240+ (define *global-agent-communication*
241+ (make-agent-communication
242+ #:protocol 'atomspace-message-passing
243+ #:transport 'distributed
244+ #:serialization 'atomspace-serialization ))
0 commit comments