Replies: 3 comments
-
|
Hi @meditans, Thanks for reaching out.
I see what you mean. The actions oscillate between 0 and -0.5, leading to a trajectory away from the source and subsequent cooling. I suspect this has to do with temperature nonlinearity (
|
Beta Was this translation helpful? Give feedback.
-
Take a look at this example, which does a very similar control: In the new versions it would look something like slide = () -> begin
model = RxInfer.getmodel(result.model)
(s, ) = RxInfer.getreturnval(model)
varref = RxInfer.getvarref(model, s)
var = RxInfer.getvariable(varref)
slide_msg_idx = 3 # This index is model dependend
(m_s_t_min, V_s_t_min) = mean_cov(getrecent(messageout(var[2], slide_msg_idx)))
...
endwhere the model has an extra @model function mountain_car(m_u, V_u, m_x, V_x, m_s_t_min, V_s_t_min, T, Fg, Fa, Ff, engine_force_limit)
# Transition function modeling transition due to gravity and friction
...
return (s, )
end |
Beta Was this translation helpful? Give feedback.
-
|
Thank you @wmkouw and @bvdmitri for your suggestions! Following the suggestion I modified the code, and I have new questions: New version of the codeusing RxInfer, Plots
import RxInfer.ReactiveMP: getrecent, messageout
using Random; Random.seed!(51233) # Set random seed
# Environmental process parameters
# Temperature at the heat source (z=0)
source_temperature = 100.0
# Actual temperature profile; this function is hidden from the agent
temperature(z) = [source_temperature/(z[1]^2 + 1.0)]
z_hat = 0.0:0.01:6.0
y_hat = [temperature([z_k])[1] for z_k in z_hat]
# Initial position
z_0 = 2.0
initial_position=[z_0]
# Goal temperature
x_target = [20.0]
p = plot(z_hat, y_hat, color="black", xlabel = "Position", ylabel = "Temperature",label = "Temperature")
scatter!([z_0], [temperature(z_0)], label = "Initial position")
# display(p)
function create_world(; temperature,initial_position = 2.0)
z_t_min = initial_position
z_t = z_t_min
# Report noisy temperature at current position
y_t = temperature(z_t)[1] + sqrt(0.01)*randn()
execute = (a_t::Float64) -> begin
# Compute next state
z_t = z_t_min + [0.5*tanh(a_t)]
# Report noisy temperature at current position
y_t = temperature(z_t)[1] + sqrt(0.01)*randn()
# Reset state
z_t_min = z_t
end
observe = () -> begin
return [y_t]
end
return (execute, observe)
end
@model function Thermostat_model(T, temperature, m_s_t_min, V_s_t_min, m_u, V_u, m_x, V_x)
# Transition function based on gravity and friction
g = (s_t_min::AbstractVector) -> begin
s_t = similar(s_t_min)
s_t = temperature(s_t_min)
return s_t
end
gamma = 10000.0*diageye(1) # State transition precision
theta = 0.0001*diageye(1) # Observation variance
s_t_min ~ MvNormal(mean = m_s_t_min, cov = V_s_t_min)
s_k_min = s_t_min
local s
for k in 1:T
# Control
u[k] ~ MvNormal(mean = m_u[k], cov = V_u[k])
u_h_k[k] ~ MvNormal(mean = s_k_min+u[k], precision = gamma)
# State transition
s[k] ~ g(u_h_k[k]) where { meta = DeltaMeta(method = Unscented(alpha = 1.9)) }
# Likelihood of future observations
x[k] ~ MvNormal(mean = s[k], cov = theta)
# Goal prior
x[k] ~ MvNormal(mean = m_x[k], cov = V_x[k])
s_k_min = s[k]
end
return (s, )
end
function create_agent(; T = 20, temperature, x_target, initial_position)
# Set control priors
Epsilon = fill(0.5, 1, 1)
m_u = Vector{Float64}[ [ 0.0] for k=1:T ]
V_u = Matrix{Float64}[ Epsilon for k=1:T ]
Sigma = 1e-4*diageye(1) # Goal prior variance
m_x = [zeros(1) for k=1:T]
m_x[end] = x_target
V_x = [huge*diageye(1) for k=1:T]
V_x[end] = Sigma # Set prior to reach goal at t=T
# Set initial brain state prior
m_s_t_min = initial_position
V_s_t_min = tiny * diageye(1)
# Set current inference results
result = nothing
# Bayesian inference by message passing
infer = (upsilon_t::Float64, y_hat_t::Vector{Float64}) -> begin
m_u[1] = [ upsilon_t ] # Register action with the generative model
V_u[1] = fill(tiny, 1, 1) # Clamp control prior to performed action
m_x[1] = y_hat_t # Register observation with the generative model
V_x[1] = tiny*diageye(1) # Clamp goal prior to observation
data = Dict(:m_u => m_u,
:V_u => V_u,
:m_x => m_x,
:V_x => V_x,
:m_s_t_min => m_s_t_min,
:V_s_t_min => V_s_t_min)
model = Thermostat_model(T = T, temperature=temperature)
result = RxInfer.infer(model = model, data = data)
end
# The `act` function returns the inferred best possible action
act = () -> begin
if result !== nothing
return mode(result.posteriors[:u][2])[1]
else
return 0.0 # Without inference result we return some 'random' action
end
end
# The `future` function returns the inferred future states
future = () -> begin
if result !== nothing
return getindex.(mode.(result.posteriors[:s]), 1)
else
return zeros(T)
end
end
slide = () -> begin
"""
The `slide` function modifies the `(m_s_t_min, V_s_t_min)` for the next step
and shifts (or slides) the array of future goals `(m_x, V_x)` and inferred actions `(m_u, V_u)`
"""
model = RxInfer.getmodel(result.model)
(s, ) = RxInfer.getreturnval(result.model)
varref = RxInfer.getvarref(model, s)
var = RxInfer.getvariable(varref)
slide_msg_idx = 3 # This index is model dependend
(m_s_t_min, V_s_t_min) = mean_cov(getrecent(messageout(var[2], slide_msg_idx)))
m_u = circshift(m_u, -1)
m_u[end] = [0.0]
V_u = circshift(V_u, -1)
V_u[end] = Epsilon
m_x = circshift(m_x, -1)
m_x[end] = x_target
V_x = circshift(V_x, -1)
V_x[end] = Sigma
end
return (infer, act, slide, future)
end
# Let there be a world
(execute_ai, observe_ai) = create_world(temperature=temperature, initial_position=initial_position)
T_ai = 20
# Let there be an agent
(infer_ai, act_ai, slide_ai, future_ai) = create_agent(;
T = T_ai,
temperature = temperature,
x_target = x_target,
initial_position = initial_position
)
N_ai = 100
# Step through experimental protocol
agent_a = Vector{Float64}(undef, N_ai) # Actions
agent_f = Vector{Vector{Float64}}(undef, N_ai) # Predicted future
agent_x = Vector{Vector{Float64}}(undef, N_ai) # Observations
for t = 1:N_ai
agent_a[t] = act_ai() # Invoke an action from the agent
agent_f[t] = future_ai() # Fetch the predicted future states
execute_ai(agent_a[t]) # The action influences hidden external states
agent_x[t] = observe_ai() # Observe the current environmental outcome (update p)
infer_ai(agent_a[t], agent_x[t]) # Infer beliefs from current model state (update q)
slide_ai() # Prepare for next iteration
end
pa=plot(0.5*tanh.(agent_a),label="actions")
py=plot(map(x -> x[1], agent_x),label="temperature")
py=plot!([0,N_ai],[x_target[1],x_target[1]],label="target temperature")
p = plot(pa, py, layout = @layout([ a; b ]))
display(p)slide_msg_idx = 3 # This index is model dependend
(m_s_t_min, V_s_t_min) = mean_cov(getrecent(messageout(var[2], slide_msg_idx)))and also would like to understand better why that message is selected in the computation (while I have a precise semantic for the posterior I saw the message as just an implementation detail, and I'm not sure why one would use them).
Thanks again! |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi, looking for examples of Active Inference I found this example https://github.com/biaslab/RxAgent-Zoo/blob/main/Thermostat/Active%20Inference%20Bayesian%20thermostat.ipynb about a thermostat. It was for an old version of RxInfer, so I translated it to the current version. I'll share the code before, then ask a couple questions.
Questions:
Beta Was this translation helpful? Give feedback.
All reactions