Has anyone had success adapting a reinforcement learning (RL) model using BayesFlow?
I’ve built a basic framework combining RL with the drift diffusion model (DDM). While I’ve had success recovering the DDM parameters (diftt, alpha, beta, and ter), I am having poor parameter recovery for the learning rate (kappa, below).
I suspect it has to do with the batchable context I’m including, but I’m not sure.
The task is a 2-Armed Bandit, with yoked arms (i.e., P(Reward, arm 1) = [.8,.2], P(Reward, arm 2) = [.2, .8]) with reversals every 12 trials.
The code functions by generating a sequence of potential outcomes for each arm that gets fed into the trial (generate_outcomes_matrix). Each trial (rl_diffusion_trial) takes a set of parameters, the Q-values from the previous trial, the actions from the previous trial, as well as the potential outcomes generated by the aforementioned function.
Note that for each trial, I set outcomes of unchosen options to 0. So if trial t had potential outcomes [0, 1] but option 1 was chosen, then the experienced outcomes would become [0, 0]. Perhaps this is where I’m running into problems.
The experiment (experiment) only stores data that would be seen in an empirical experiment. That is, RT, Choice, and Outcomes. The configurator (configurator) combines the data from the experiment with the batchable context. Again, this might need to be revised.
Would appreciate any thoughts and comments. The relevant code is below.
# Generative Model Specifications User Defined Functions, non-batchable
def truncnorm_better(mean=0, sd=1, low=-10, upp=10, size=1):
return truncnorm.rvs(
(low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd, size=size)
def draw_prior():
# drift ~ N(0, 2.0), mean drift rate, index 0
drift = RNG.normal(0.0, 2.0)
# alpha ~ N(1.0, 0.5) in [0, 5], boundary
alpha = truncnorm_better(mean=1.0, sd=0.5, low=0.0, upp=5)[0]
# beta ~ Beta(2.0, 2.0), relative start point
beta = RNG.beta(2.0, 2.0)
# ter ~ N(0.5, 0.25) in [0, 1.5], non-decision time
ter = truncnorm_better(mean=0.5, sd=0.25, low=0.0, upp=1.5)[0]
# eta ~ N(1.0, 0.5) in [0, 3], trial-to-trial variability in drift, index 4
# eta = truncnorm_better(mean=1.0, sd=0.5, low=0.0, upp=3)[0]
# kappa ~ Beta(2.0, 2.0), learning rate
kappa = RNG.beta(2.0, 2.0)
p_samples = np.hstack((drift, alpha, beta, ter, kappa))
return p_samples
def generate_outcomes_matrix(num_obs,probs = [0.8, 0.2]):
outcome_mat = np.zeros([num_obs,2])
for n in range(num_obs):
# Every 12 trials, reverse probabilities
if (n > 1) & (n % 12 == 0):
probs = [probs[1], probs[0]]
# Pre-allocate outcomes of each bandit
outcome_mat[n,0] = np.random.binomial(n=1,p = probs[0])
outcome_mat[n,1] = np.random.binomial(n=1,p = probs[1])
return outcome_mat
# new code updates q-values after choice in current trial
def rl_diffusion_trial(params,input,
dt=.01, max_steps=900.):
"""Simulates a trial from the diffusion model."""
# Parameters
drift, alpha, beta, tau, kappa = params
# RT and Choice from previous trials
# Q-values from previous trials
qs = [input[0],input[1]]
# Potential outcomes
outcome = [input[2],input[3]]
# Intialzie evidence accumulation
n_steps = 0.
evidence = alpha * beta
# trial-to-trial drift rate
drift_trial = drift* (qs[0] - qs[1])
# Simulate a single DM path
while ((evidence > 0) and (evidence < alpha) and (n_steps < max_steps)):
# DDM equation
evidence += drift_trial*dt + np.sqrt(dt) * np.random.normal()
# Increment step
n_steps += 1.0
rt = n_steps * dt + tau
if evidence >= alpha:
choice = 0 # choice A
outcome[1] = 0 # So didn't see outcome of option B
elif evidence <= 0:
choice = 1 # choice B
outcome[0] = 0 # So didn't see outcome of option A
else:
choice = -1 # This indicates a missing response
# Q-learning
if choice > -1:
# Generate Signed RPE
RPE = (outcome[choice] - qs[choice])
# Update Q-value of previously chosen option
qs[choice] = qs[choice] + kappa * RPE
return rt, choice, outcome[0], outcome[1], qs[0], qs[1]
def experiment(theta, outcome_mat, num_obs):
output = np.zeros((num_obs, 6))
context = np.zeros((num_obs+1, 4))
context[0,:2] = 0.5 # Initialize Q-values at 0.5
context[0:num_obs,2:] = outcome_mat # Potential outcome of current rounds
for n in range(num_obs):
check = 0
while check == 0:
output[n, :] = rl_diffusion_trial(params = theta,
input = context[n,:]
)
# Repeat trial if response returned is -1
if output[n, 1] > -1:
check = 1
# Store Q values
context[(n+1),:2] = output[n,4:]
#out = np.c_[context[1:,:]] # If we want EVERYTHING
out = np.c_[output[:,:4]] # Only want choice, RT, and feedback
return out
MIN_OBS = 60
MAX_OBS = 60
NUM_CONDITIONS = 1
# By drawing a random number of trials, helps to train when there is missing data
def random_num_obs(min_obs=MIN_OBS, max_obs=MAX_OBS):
"""Draws a random number of observations for all simulations in a batch."""
return RNG.integers(low=min_obs, high=max_obs + 1)
context_gen = bf.simulation.ContextGenerator(
non_batchable_context_fun=random_num_obs,
batchable_context_fun=generate_outcomes_matrix,
use_non_batchable_for_batchable=True,
)
# Connect via BayesFlow Wrappers
simulator = bf.simulation.Simulator(simulator_fun=experiment,
context_generator=context_gen)
model = bf.simulation.GenerativeModel(prior=prior,
simulator=simulator,
name="RLDDM")
prior_means, prior_stds = prior.estimate_means_and_stds(n_draws=100000)
prior_means = np.round(prior_means, decimals=1)
prior_stds = np.round(prior_stds, decimals=1)
def configurator(forward_dict):
# Prepare placeholder dict
out_dict = {}
# Extract data
data = forward_dict["sim_data"].astype(np.float32)
# Extract context
context = np.array(forward_dict["sim_batchable_context"])[..., None]
# Convert to 3D
outcomes = context.reshape((context.shape[0],context.shape[1],context.shape[2]))
# Concatenate
out_dict["summary_conditions"] = np.c_[data, outcomes].astype(np.float32)#np.c_[data[:, :, :4], outcomes].astype(np.float32)
# These will be concatenated to the outputs of the summary network
# Convert N to log N since neural nets cant deal well with large numbers
N = np.log(forward_dict['sim_non_batchable_context'])
# Repeat N for each sim (since shared across batch), notice the
# extra dimension needed
N_vec = N * np.ones((data.shape[0], 1), dtype=np.float32)
out_dict['direct_conditions'] = N_vec
# Finally, extract parameters. Any transformations (e.g., standardization)
# should happen here.
out_dict['parameters'] = forward_dict['prior_draws'].astype(np.float32)
return out_dict
summary_net = bf.networks.TimeSeriesTransformer(input_dim=6, summary_dim=32, name="RLDDM_summary")
# Inference network
inference_net = bf.networks.InvertibleNetwork(
num_params=len(prior.param_names),
coupling_settings={"dense_args": dict(kernel_regularizer=None), "dropout": False},
name="RLDDM_inference",
)
# Amortized Posterior
amortizer = bf.amortizers.AmortizedPosterior(inference_net, summary_net, name="DDM_amortizer")
# If the checkpoint path does not exist, create it
model_name = "RLDDM"
checkpoint_path = f"checkpoint/{model_name}"
# Define the trainer
trainer = bf.trainers.Trainer(
generative_model=model,
amortizer=amortizer,
configurator=configurator,
checkpoint_path=checkpoint_path
)
# Training Phase
BATCH_SIZE = 16
NUM_EPOCHS = 50
NUM_ITER_PER_EPOCHS = 500
history = trainer.train_online(NUM_EPOCHS, NUM_ITER_PER_EPOCHS, BATCH_SIZE)
# Generate some validation data
validation_sims = configurator(model(batch_size=1000))
# Extract unstandardized prior draws and transform to original scale
prior_samples = validation_sims["parameters"] * prior_stds + prior_means
# Generate 100 posterior draws for each of the 1000 simulated data sets
post_samples = amortizer.sample(validation_sims, n_samples=100)
# Unstandardize posterior draws into original scale
post_samples = post_samples * prior_stds + prior_means
post_samples = amortizer.sample(validation_sims, n_samples=1000)
post_samples = post_samples * prior_stds + prior_means
# Model recoverabiltiy
f = bf.diagnostics.plot_recovery(
post_samples, prior_samples, param_names=prior.param_names, point_agg=np.mean, uncertainty_agg=np.std
)

