Skip to content

Commit 85b845d

Browse files
Fix process response parsing with dict-based approach
- Add ProcessResult struct with Dict-based spec field for flexible JSON parsing - Update submit, getprocess, getprocesses, assign, addchild to use dict format - Add parse_process_result and parse_process_dict helper functions - All 14 tests now pass This approach matches the Python SDK pattern of building RPC messages as dictionaries rather than relying on struct serialization, avoiding issues with complex nested JSON structures from the server.
1 parent 15fd983 commit 85b845d

2 files changed

Lines changed: 123 additions & 15 deletions

File tree

src/Colonies.jl

Lines changed: 114 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,56 +92,155 @@ function approveexecutor(client::ColoniesClient, colonyname::String, executornam
9292
end
9393

9494
function submit(client::ColoniesClient, spec::FunctionSpec, prvkey::String)
95-
rpcmsg = SubmitFunctionSpecRPC(spec, "submitfuncspecmsg")
96-
rpcjson = marshaljson(rpcmsg)
95+
# Build conditions dict
96+
conditions_dict = Dict{String, Any}(
97+
"colonyname" => spec.conditions.colonyname,
98+
"executortype" => spec.conditions.executortype
99+
)
100+
if !isempty(spec.conditions.executornames)
101+
conditions_dict["executornames"] = spec.conditions.executornames
102+
end
103+
if !isempty(spec.conditions.dependencies)
104+
conditions_dict["dependencies"] = spec.conditions.dependencies
105+
end
106+
107+
# Build spec dict
108+
spec_dict = Dict{String, Any}(
109+
"nodename" => spec.nodename,
110+
"funcname" => spec.funcname,
111+
"args" => spec.args === nothing ? [] : spec.args,
112+
"kwargs" => spec.kwargs,
113+
"priority" => spec.priority,
114+
"maxwaittime" => spec.maxwaittime,
115+
"maxexectime" => spec.maxexectime,
116+
"maxretries" => spec.maxretries,
117+
"conditions" => conditions_dict,
118+
"label" => spec.label,
119+
"env" => spec.env
120+
)
121+
122+
msg = Dict(
123+
"msgtype" => "submitfuncspecmsg",
124+
"spec" => spec_dict
125+
)
126+
rpcjson = JSON.json(msg)
97127

98128
payload = base64enc(rpcjson)
99129
sig = Crypto.sign(payload, prvkey)
100130
rpcmsg = RPCMsg(sig, "submitfuncspecmsg", payload)
101131

102132
payload, payloadtype = sendrpcmsg(rpcmsg, client.protocol, client.host, client.port)
103-
unmarshaljson(payload, Process)
133+
parse_process_result(payload)
134+
end
135+
136+
# Helper to parse process response into ProcessResult
137+
function parse_process_result(payload::String)
138+
dict = JSON.parse(payload)
139+
result = ProcessResult()
140+
result.processid = get(dict, "processid", "")
141+
result.state = get(dict, "state", 0)
142+
result.spec = get(dict, "spec", Dict{String, Any}())
143+
result.output = get(dict, "out", [])
144+
result.errors = get(dict, "errors", [])
145+
result
104146
end
105147

106148
function addchild(client::ColoniesClient, processgraphid::String, processid::String, spec::FunctionSpec, prvkey::String)
107-
rpcmsg = AddChildRPC(processgraphid, processid, spec, "addchildmsg")
108-
rpcjson = marshaljson(rpcmsg)
149+
# Build conditions dict
150+
conditions_dict = Dict{String, Any}(
151+
"colonyname" => spec.conditions.colonyname,
152+
"executortype" => spec.conditions.executortype
153+
)
154+
155+
# Build spec dict
156+
spec_dict = Dict{String, Any}(
157+
"nodename" => spec.nodename,
158+
"funcname" => spec.funcname,
159+
"args" => spec.args === nothing ? [] : spec.args,
160+
"kwargs" => spec.kwargs,
161+
"priority" => spec.priority,
162+
"maxwaittime" => spec.maxwaittime,
163+
"maxexectime" => spec.maxexectime,
164+
"maxretries" => spec.maxretries,
165+
"conditions" => conditions_dict,
166+
"label" => spec.label,
167+
"env" => spec.env
168+
)
169+
170+
msg = Dict(
171+
"msgtype" => "addchildmsg",
172+
"processgraphid" => processgraphid,
173+
"parentprocessid" => processid,
174+
"childprocessid" => "",
175+
"spec" => spec_dict,
176+
"insert" => false
177+
)
178+
rpcjson = JSON.json(msg)
109179

110180
payload = base64enc(rpcjson)
111181
sig = Crypto.sign(payload, prvkey)
112182
rpcmsg = RPCMsg(sig, "addchildmsg", payload)
113183

114184
payload, payloadtype = sendrpcmsg(rpcmsg, client.protocol, client.host, client.port)
115-
unmarshaljson(payload, Process)
185+
parse_process_result(payload)
116186
end
117187

118188
function getprocess(client::ColoniesClient, processid::String, prvkey::String)
119-
rpcmsg = GetProcessRPC(processid, "getprocessmsg")
120-
rpcjson = marshaljson(rpcmsg)
189+
msg = Dict(
190+
"msgtype" => "getprocessmsg",
191+
"processid" => processid
192+
)
193+
rpcjson = JSON.json(msg)
121194

122195
payload = base64enc(rpcjson)
123196
sig = Crypto.sign(payload, prvkey)
124197
rpcmsg = RPCMsg(sig, "getprocessmsg", payload)
125198

126199
payload, payloadtype = sendrpcmsg(rpcmsg, client.protocol, client.host, client.port)
127-
unmarshaljson(payload, Process)
200+
parse_process_result(payload)
128201
end
129202

130203
function getprocesses(client::ColoniesClient, colonyname::String, state::Int64, count::Int64, prvkey::String)
131-
rpcmsg = GetProcessesRPC(colonyname, state, count, "getprocessesmsg")
132-
rpcjson = marshaljson(rpcmsg)
204+
msg = Dict(
205+
"msgtype" => "getprocessesmsg",
206+
"colonyname" => colonyname,
207+
"state" => state,
208+
"count" => count
209+
)
210+
rpcjson = JSON.json(msg)
133211

134212
payload = base64enc(rpcjson)
135213
sig = Crypto.sign(payload, prvkey)
136214
rpcmsg = RPCMsg(sig, "getprocessesmsg", payload)
137215

138216
payload, payloadtype = sendrpcmsg(rpcmsg, client.protocol, client.host, client.port)
139-
unmarshaljson(payload, AbstractArray{Process})
217+
# Parse array of processes
218+
arr = JSON.parse(payload)
219+
if arr === nothing
220+
return ProcessResult[]
221+
end
222+
[parse_process_dict(p) for p in arr]
223+
end
224+
225+
# Helper to parse a single process dict
226+
function parse_process_dict(dict::AbstractDict)
227+
result = ProcessResult()
228+
result.processid = get(dict, "processid", "")
229+
result.state = get(dict, "state", 0)
230+
result.spec = get(dict, "spec", Dict{String, Any}())
231+
result.output = get(dict, "out", [])
232+
result.errors = get(dict, "errors", [])
233+
result
140234
end
141235

142236
function assign(client::ColoniesClient, colonyname::String, timeout::Int64, prvkey::String)
143-
rpcmsg = AssignProcessRPC(colonyname, false, timeout, "assignprocessmsg")
144-
rpcjson = marshaljson(rpcmsg)
237+
msg = Dict(
238+
"msgtype" => "assignprocessmsg",
239+
"colonyname" => colonyname,
240+
"latest" => false,
241+
"timeout" => timeout
242+
)
243+
rpcjson = JSON.json(msg)
145244

146245
payload = base64enc(rpcjson)
147246
sig = Crypto.sign(payload, prvkey)
@@ -150,7 +249,7 @@ function assign(client::ColoniesClient, colonyname::String, timeout::Int64, prvk
150249
try
151250
payload, payloadtype = sendrpcmsg(rpcmsg, client.protocol, client.host, client.port)
152251
if payloadtype == "assignprocessmsg"
153-
unmarshaljson(payload, Process)
252+
parse_process_result(payload)
154253
elseif payloadtype == "error"
155254
throw(unmarshaljson(payload, ColoniesError))
156255
else

src/core.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ Base.@kwdef struct Executor
9494
end
9595
end
9696

97+
# Simple process result that uses Dict for spec to avoid complex nested struct parsing
98+
Base.@kwdef mutable struct ProcessResult
99+
processid::String = ""
100+
state::Int = 0
101+
spec::Dict{String, Any} = Dict{String, Any}()
102+
output::Vector{Any} = []
103+
errors::Vector{Any} = []
104+
end
105+
97106
Base.@kwdef struct Function
98107
functionid::String
99108
executorname::String

0 commit comments

Comments
 (0)