Has anyone had success adapting a reinforcement learning (RL) model using BayesFlow?
I’ve built a basic framework combining RL with the drift diffusion model (DDM). While I’ve had success recovering the DDM parameters (diftt, alpha, beta, and ter), I am having poor parameter recovery for the learning rate (kappa, below).
I suspect it has to do with the batchable context I’m including, but I’m not sure.
The task is a 2-Armed Bandit, with yoked arms (i.e., P(Reward, arm 1) = [.8,.2], P(Reward, arm 2) = [.2, .8]) with reversals every 12 trials.
The code functions by generating a sequence of potential outcomes for each arm that gets fed into the trial (generate_outcomes_matrix
). Each trial (rl_diffusion_trial
) takes a set of parameters, the Q-values from the previous trial, the actions from the previous trial, as well as the potential outcomes generated by the aforementioned function.
Note that for each trial, I set outcomes of unchosen options to 0. So if trial t had potential outcomes [0, 1] but option 1 was chosen, then the experienced outcomes would become [0, 0]. Perhaps this is where I’m running into problems.
The experiment (experiment
) only stores data that would be seen in an empirical experiment. That is, RT, Choice, and Outcomes. The configurator (configurator
) combines the data from the experiment with the batchable context. Again, this might need to be revised.
Would appreciate any thoughts and comments. The relevant code is below.
# Generative Model Specifications User Defined Functions, non-batchable
def truncnorm_better(mean=0, sd=1, low=-10, upp=10, size=1):
return truncnorm.rvs(
(low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd, size=size)
def draw_prior():
# drift ~ N(0, 2.0), mean drift rate, index 0
drift = RNG.normal(0.0, 2.0)
# alpha ~ N(1.0, 0.5) in [0, 5], boundary
alpha = truncnorm_better(mean=1.0, sd=0.5, low=0.0, upp=5)[0]
# beta ~ Beta(2.0, 2.0), relative start point
beta = RNG.beta(2.0, 2.0)
# ter ~ N(0.5, 0.25) in [0, 1.5], non-decision time
ter = truncnorm_better(mean=0.5, sd=0.25, low=0.0, upp=1.5)[0]
# eta ~ N(1.0, 0.5) in [0, 3], trial-to-trial variability in drift, index 4
# eta = truncnorm_better(mean=1.0, sd=0.5, low=0.0, upp=3)[0]
# kappa ~ Beta(2.0, 2.0), learning rate
kappa = RNG.beta(2.0, 2.0)
p_samples = np.hstack((drift, alpha, beta, ter, kappa))
return p_samples
def generate_outcomes_matrix(num_obs,probs = [0.8, 0.2]):
outcome_mat = np.zeros([num_obs,2])
for n in range(num_obs):
# Every 12 trials, reverse probabilities
if (n > 1) & (n % 12 == 0):
probs = [probs[1], probs[0]]
# Pre-allocate outcomes of each bandit
outcome_mat[n,0] = np.random.binomial(n=1,p = probs[0])
outcome_mat[n,1] = np.random.binomial(n=1,p = probs[1])
return outcome_mat
# new code updates q-values after choice in current trial
def rl_diffusion_trial(params,input,
dt=.01, max_steps=900.):
"""Simulates a trial from the diffusion model."""
# Parameters
drift, alpha, beta, tau, kappa = params
# RT and Choice from previous trials
# Q-values from previous trials
qs = [input[0],input[1]]
# Potential outcomes
outcome = [input[2],input[3]]
# Intialzie evidence accumulation
n_steps = 0.
evidence = alpha * beta
# trial-to-trial drift rate
drift_trial = drift* (qs[0] - qs[1])
# Simulate a single DM path
while ((evidence > 0) and (evidence < alpha) and (n_steps < max_steps)):
# DDM equation
evidence += drift_trial*dt + np.sqrt(dt) * np.random.normal()
# Increment step
n_steps += 1.0
rt = n_steps * dt + tau
if evidence >= alpha:
choice = 0 # choice A
outcome[1] = 0 # So didn't see outcome of option B
elif evidence <= 0:
choice = 1 # choice B
outcome[0] = 0 # So didn't see outcome of option A
else:
choice = -1 # This indicates a missing response
# Q-learning
if choice > -1:
# Generate Signed RPE
RPE = (outcome[choice] - qs[choice])
# Update Q-value of previously chosen option
qs[choice] = qs[choice] + kappa * RPE
return rt, choice, outcome[0], outcome[1], qs[0], qs[1]
def experiment(theta, outcome_mat, num_obs):
output = np.zeros((num_obs, 6))
context = np.zeros((num_obs+1, 4))
context[0,:2] = 0.5 # Initialize Q-values at 0.5
context[0:num_obs,2:] = outcome_mat # Potential outcome of current rounds
for n in range(num_obs):
check = 0
while check == 0:
output[n, :] = rl_diffusion_trial(params = theta,
input = context[n,:]
)
# Repeat trial if response returned is -1
if output[n, 1] > -1:
check = 1
# Store Q values
context[(n+1),:2] = output[n,4:]
#out = np.c_[context[1:,:]] # If we want EVERYTHING
out = np.c_[output[:,:4]] # Only want choice, RT, and feedback
return out
MIN_OBS = 60
MAX_OBS = 60
NUM_CONDITIONS = 1
# By drawing a random number of trials, helps to train when there is missing data
def random_num_obs(min_obs=MIN_OBS, max_obs=MAX_OBS):
"""Draws a random number of observations for all simulations in a batch."""
return RNG.integers(low=min_obs, high=max_obs + 1)
context_gen = bf.simulation.ContextGenerator(
non_batchable_context_fun=random_num_obs,
batchable_context_fun=generate_outcomes_matrix,
use_non_batchable_for_batchable=True,
)
# Connect via BayesFlow Wrappers
simulator = bf.simulation.Simulator(simulator_fun=experiment,
context_generator=context_gen)
model = bf.simulation.GenerativeModel(prior=prior,
simulator=simulator,
name="RLDDM")
prior_means, prior_stds = prior.estimate_means_and_stds(n_draws=100000)
prior_means = np.round(prior_means, decimals=1)
prior_stds = np.round(prior_stds, decimals=1)
def configurator(forward_dict):
# Prepare placeholder dict
out_dict = {}
# Extract data
data = forward_dict["sim_data"].astype(np.float32)
# Extract context
context = np.array(forward_dict["sim_batchable_context"])[..., None]
# Convert to 3D
outcomes = context.reshape((context.shape[0],context.shape[1],context.shape[2]))
# Concatenate
out_dict["summary_conditions"] = np.c_[data, outcomes].astype(np.float32)#np.c_[data[:, :, :4], outcomes].astype(np.float32)
# These will be concatenated to the outputs of the summary network
# Convert N to log N since neural nets cant deal well with large numbers
N = np.log(forward_dict['sim_non_batchable_context'])
# Repeat N for each sim (since shared across batch), notice the
# extra dimension needed
N_vec = N * np.ones((data.shape[0], 1), dtype=np.float32)
out_dict['direct_conditions'] = N_vec
# Finally, extract parameters. Any transformations (e.g., standardization)
# should happen here.
out_dict['parameters'] = forward_dict['prior_draws'].astype(np.float32)
return out_dict
summary_net = bf.networks.TimeSeriesTransformer(input_dim=6, summary_dim=32, name="RLDDM_summary")
# Inference network
inference_net = bf.networks.InvertibleNetwork(
num_params=len(prior.param_names),
coupling_settings={"dense_args": dict(kernel_regularizer=None), "dropout": False},
name="RLDDM_inference",
)
# Amortized Posterior
amortizer = bf.amortizers.AmortizedPosterior(inference_net, summary_net, name="DDM_amortizer")
# If the checkpoint path does not exist, create it
model_name = "RLDDM"
checkpoint_path = f"checkpoint/{model_name}"
# Define the trainer
trainer = bf.trainers.Trainer(
generative_model=model,
amortizer=amortizer,
configurator=configurator,
checkpoint_path=checkpoint_path
)
# Training Phase
BATCH_SIZE = 16
NUM_EPOCHS = 50
NUM_ITER_PER_EPOCHS = 500
history = trainer.train_online(NUM_EPOCHS, NUM_ITER_PER_EPOCHS, BATCH_SIZE)
# Generate some validation data
validation_sims = configurator(model(batch_size=1000))
# Extract unstandardized prior draws and transform to original scale
prior_samples = validation_sims["parameters"] * prior_stds + prior_means
# Generate 100 posterior draws for each of the 1000 simulated data sets
post_samples = amortizer.sample(validation_sims, n_samples=100)
# Unstandardize posterior draws into original scale
post_samples = post_samples * prior_stds + prior_means
post_samples = amortizer.sample(validation_sims, n_samples=1000)
post_samples = post_samples * prior_stds + prior_means
# Model recoverabiltiy
f = bf.diagnostics.plot_recovery(
post_samples, prior_samples, param_names=prior.param_names, point_agg=np.mean, uncertainty_agg=np.std
)