Reinforcement Learning Cognitive Models

Has anyone had success adapting a reinforcement learning (RL) model using BayesFlow?

I’ve built a basic framework combining RL with the drift diffusion model (DDM). While I’ve had success recovering the DDM parameters (diftt, alpha, beta, and ter), I am having poor parameter recovery for the learning rate (kappa, below).

I suspect it has to do with the batchable context I’m including, but I’m not sure.

The task is a 2-Armed Bandit, with yoked arms (i.e., P(Reward, arm 1) = [.8,.2], P(Reward, arm 2) = [.2, .8]) with reversals every 12 trials.

The code functions by generating a sequence of potential outcomes for each arm that gets fed into the trial (generate_outcomes_matrix). Each trial (rl_diffusion_trial) takes a set of parameters, the Q-values from the previous trial, the actions from the previous trial, as well as the potential outcomes generated by the aforementioned function.

Note that for each trial, I set outcomes of unchosen options to 0. So if trial t had potential outcomes [0, 1] but option 1 was chosen, then the experienced outcomes would become [0, 0]. Perhaps this is where I’m running into problems.

The experiment (experiment) only stores data that would be seen in an empirical experiment. That is, RT, Choice, and Outcomes. The configurator (configurator) combines the data from the experiment with the batchable context. Again, this might need to be revised.

Would appreciate any thoughts and comments. The relevant code is below.

# Generative Model Specifications User Defined Functions, non-batchable
def truncnorm_better(mean=0, sd=1, low=-10, upp=10, size=1):
    return truncnorm.rvs(
        (low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd, size=size)
def draw_prior():

    # drift ~ N(0, 2.0), mean drift rate, index 0
    drift = RNG.normal(0.0, 2.0)

    # alpha ~ N(1.0, 0.5) in [0, 5], boundary
    alpha = truncnorm_better(mean=1.0, sd=0.5, low=0.0, upp=5)[0]

    # beta ~ Beta(2.0, 2.0), relative start point
    beta = RNG.beta(2.0, 2.0)

    # ter ~ N(0.5, 0.25) in [0, 1.5], non-decision time
    ter = truncnorm_better(mean=0.5, sd=0.25, low=0.0, upp=1.5)[0]

    # eta ~ N(1.0, 0.5) in [0, 3], trial-to-trial variability in drift, index 4
    # eta = truncnorm_better(mean=1.0, sd=0.5, low=0.0, upp=3)[0]

    # kappa ~ Beta(2.0, 2.0), learning rate
    kappa = RNG.beta(2.0, 2.0)

    p_samples = np.hstack((drift, alpha, beta, ter, kappa))
    
    return p_samples

def generate_outcomes_matrix(num_obs,probs = [0.8, 0.2]):
    outcome_mat = np.zeros([num_obs,2])
    for n in range(num_obs):
        # Every 12 trials, reverse probabilities
        if (n > 1) & (n % 12 == 0):
            probs = [probs[1], probs[0]]
            
        # Pre-allocate outcomes of each bandit
        outcome_mat[n,0] = np.random.binomial(n=1,p = probs[0])
        outcome_mat[n,1] = np.random.binomial(n=1,p = probs[1])

    return outcome_mat

# new code updates q-values after choice in current trial

def rl_diffusion_trial(params,input,
    dt=.01, max_steps=900.):
    """Simulates a trial from the diffusion model."""
    # Parameters
    drift, alpha, beta, tau, kappa = params

    # RT and Choice from previous trials
    # Q-values from previous trials
    qs = [input[0],input[1]]
    # Potential outcomes
    outcome = [input[2],input[3]]

    # Intialzie evidence accumulation
    n_steps = 0.
    evidence = alpha * beta

    # trial-to-trial drift rate
    drift_trial = drift* (qs[0] - qs[1])
    
    # Simulate a single DM path
    while ((evidence > 0) and (evidence < alpha) and (n_steps < max_steps)):

        # DDM equation
        evidence += drift_trial*dt + np.sqrt(dt) * np.random.normal()

        # Increment step
        n_steps += 1.0

    rt = n_steps * dt + tau

    if evidence >= alpha:
        choice =  0  # choice A
        outcome[1] = 0 # So didn't see outcome of option B

    elif evidence <= 0:
        choice = 1  # choice B
        outcome[0] = 0 # So didn't see outcome of option A
    else:
        choice = -1  # This indicates a missing response

    # Q-learning
    if choice > -1:
        # Generate Signed RPE
        RPE = (outcome[choice] - qs[choice])
        # Update Q-value of previously chosen option
        qs[choice] = qs[choice] + kappa * RPE

    return rt, choice, outcome[0], outcome[1], qs[0], qs[1]

def experiment(theta, outcome_mat, num_obs):
    output = np.zeros((num_obs, 6))
    context = np.zeros((num_obs+1, 4))
    context[0,:2] = 0.5 # Initialize Q-values at 0.5
    context[0:num_obs,2:] = outcome_mat # Potential outcome of current rounds
    for n in range(num_obs):  
        check = 0
        while check == 0:
            output[n, :] = rl_diffusion_trial(params = theta,
                                        input = context[n,:]
            )
            # Repeat trial if response returned is -1
            if output[n, 1] > -1:
                check = 1
        # Store Q values
        context[(n+1),:2] = output[n,4:]

    #out = np.c_[context[1:,:]] # If we want EVERYTHING
    out = np.c_[output[:,:4]] # Only want choice,  RT, and feedback
    return out

MIN_OBS = 60
MAX_OBS = 60
NUM_CONDITIONS = 1

# By drawing a random number of trials, helps to train when there is missing data
def random_num_obs(min_obs=MIN_OBS, max_obs=MAX_OBS):
    """Draws a random number of observations for all simulations in a batch."""

    return RNG.integers(low=min_obs, high=max_obs + 1)

context_gen = bf.simulation.ContextGenerator(
    non_batchable_context_fun=random_num_obs,
    batchable_context_fun=generate_outcomes_matrix,
    use_non_batchable_for_batchable=True,
)

# Connect via BayesFlow Wrappers
simulator = bf.simulation.Simulator(simulator_fun=experiment, 
                                    context_generator=context_gen)
model = bf.simulation.GenerativeModel(prior=prior, 
                                      simulator=simulator, 
                                      name="RLDDM")

prior_means, prior_stds = prior.estimate_means_and_stds(n_draws=100000)
prior_means = np.round(prior_means, decimals=1)
prior_stds = np.round(prior_stds, decimals=1)

def configurator(forward_dict):

    # Prepare placeholder dict
    out_dict = {}

    # Extract data
    data = forward_dict["sim_data"].astype(np.float32)

    # Extract context
    context = np.array(forward_dict["sim_batchable_context"])[..., None]

    # Convert to 3D
    outcomes = context.reshape((context.shape[0],context.shape[1],context.shape[2]))

    # Concatenate
    out_dict["summary_conditions"] = np.c_[data, outcomes].astype(np.float32)#np.c_[data[:, :, :4], outcomes].astype(np.float32)

    # These will be concatenated to the outputs of the summary network
    # Convert N to log N since neural nets cant deal well with large numbers
    N = np.log(forward_dict['sim_non_batchable_context'])

    # Repeat N for each sim (since shared across batch), notice the
    # extra dimension needed
    N_vec = N * np.ones((data.shape[0], 1), dtype=np.float32)
    out_dict['direct_conditions'] = N_vec

    # Finally, extract parameters. Any transformations (e.g., standardization)
    # should happen here.
    out_dict['parameters'] = forward_dict['prior_draws'].astype(np.float32)

    return out_dict

summary_net = bf.networks.TimeSeriesTransformer(input_dim=6, summary_dim=32, name="RLDDM_summary")

# Inference network
inference_net = bf.networks.InvertibleNetwork(
    num_params=len(prior.param_names),
    coupling_settings={"dense_args": dict(kernel_regularizer=None), "dropout": False},
    name="RLDDM_inference",
)

# Amortized Posterior

amortizer = bf.amortizers.AmortizedPosterior(inference_net, summary_net, name="DDM_amortizer")

# If the checkpoint path does not exist, create it
model_name = "RLDDM"

checkpoint_path = f"checkpoint/{model_name}"

# Define the trainer
trainer = bf.trainers.Trainer(
    generative_model=model, 
    amortizer=amortizer, 
    configurator=configurator, 
    checkpoint_path=checkpoint_path
)

# Training Phase
BATCH_SIZE = 16
NUM_EPOCHS = 50
NUM_ITER_PER_EPOCHS = 500

history = trainer.train_online(NUM_EPOCHS, NUM_ITER_PER_EPOCHS, BATCH_SIZE)

# Generate some validation data
validation_sims = configurator(model(batch_size=1000))

# Extract unstandardized prior draws and transform to original scale
prior_samples = validation_sims["parameters"] * prior_stds + prior_means

# Generate 100 posterior draws for each of the 1000 simulated data sets
post_samples = amortizer.sample(validation_sims, n_samples=100)

# Unstandardize posterior draws into original scale
post_samples = post_samples * prior_stds + prior_means


post_samples = amortizer.sample(validation_sims, n_samples=1000)
post_samples = post_samples * prior_stds + prior_means

# Model recoverabiltiy
f = bf.diagnostics.plot_recovery(
        post_samples, prior_samples, param_names=prior.param_names, point_agg=np.mean, uncertainty_agg=np.std
    )

@LuSchumacher Any thoughts on that? Seems that the current configuration does not have enough information in the data to recover the learning rate (kappa).

In reinforcement learning (RL), the sequence of responses and rewards is highly informative for estimating the learning rate, especially in reversal learning tasks. BayesFlow, however, assumes permutation invariance in the data, which is suitable for many decision-making datasets but not for RL.

Any comment on that?

Hello,

@AminGhaderi BayesFlow can easily deal with non-IID data. The critical part is the summary network. When we use a recurrent NN like TimeSeriesTransformer or LSTM we account for temporal dependencies in the data.

@BShevlin sorry for the delayed response to your problem. I previously estimated RL evidence accumulation models and had no problems recovering the learning rate.

While trying to solve your problems, I made some progress, however, it is still far from perfect. In the following, I point out a few things that you can improve in your code:

Parameter Priors

Your drift parameter scales the difference between the subjective values between the two options. I don’t think you should allow for negative scaling parameters here. I would also use a more narrow prior on beta. Here, is the code I used. The uniform distributions for the drift scaling as well as the kappa still should be improved. I also recommend to scale the parameters in the configurator:

def truncnorm_better(mean=0, sd=1, low=-10, upp=10, size=1):
    return truncnorm.rvs(
        (low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd, size=size
    )

def draw_prior():
    drift = RNG.uniform(0.0, 10.0)
    alpha = truncnorm_better(mean=1.0, sd=0.5, low=0.5, upp=5)[0]
    tau = truncnorm_better(mean=0.5, sd=0.25, low=0.0, upp=1.5)[0]
    beta = RNG.beta(30, 30)
    kappa = RNG.uniform(0, 1)
    return np.hstack((drift, alpha, tau, beta, kappa))

prior = bf.simulation.Prior(
    prior_fun=draw_prior, param_names=['drift', 'alpha', 'tau', 'beta', 'kappa']
)

# for scaling in the configurator
prior_mean, prior_std = prior.estimate_means_and_stds(n_draws=100000)
PRIOR_MEAN = np.round(prior_mean, decimals=1)
PRIOR_STD = np.round(prior_std, decimals=1)

Context

You should also include a vector that indicates which is the correct response on a given trial. This variable is then used to flip the drift rate (more on this below).

def generate_outcomes_matrix(num_obs, probs=[0.8, 0.2]):
    outcome_mat = np.zeros([num_obs, 3])
    for n in range(num_obs):
        # Every 12 trials, reverse probabilities
        if (n > 1) & (n % 12 == 0):
            probs = [probs[1], probs[0]]
            
        outcome_mat[n, 0] = np.random.binomial(n=1, p = probs[0])
        outcome_mat[n, 1] = np.random.binomial(n=1, p = probs[1])
        
        # Correct response
        if probs[0] > probs[1]:
            outcome_mat[n, 2] = 0
        else:
            outcome_mat[n, 2] = 1

    return outcome_mat

Simulator

In trials where the second option is more likely to return a reward, I think the drift rate should be negative. To this end, you can use the correct response vector from the context generator:

@njit
def diffusion_trial(v, a, tau, bias, dt=0.001, s=1.0, max_iter=1e4):
    """
    Simulates a single reaction time from a simple drift-diffusion process.
    """

    n_iter = 0
    x = a * bias
    c = np.sqrt(dt * s)
    
    while x > 0 and x < a and n_iter < max_iter:
        x += v*dt + c * np.random.randn()
        n_iter += 1
        
    rt = n_iter * dt
    return np.array([rt+tau, 1]) if x >= 0 else np.array([rt+tau, 0])

def rl_ddm(theta, outcomes, num_obs):
    data = np.zeros((num_obs, 4))
    value = [0.5, 0.5]
    drift, alpha, tau, beta, kappa = theta
    for n in range(num_obs):
        drift_t = drift * (value[0] - value[1])
        # flip drift rate
        if outcomes[n, 2] == 1:
            drift_t = -drift_t 
        data[n, :2] = diffusion_trial(drift_t, alpha, tau, beta)
        choice = int(data[n, 1])
        data[n, 2] = outcomes[n, choice]
        data[n, 3] = outcomes[n, 2]
        pred_error = outcomes[n, choice] - value[choice]
        value[choice] = value[choice] + kappa * pred_error
    return data

Configurator

As you already pointed out, it is sometimes not easy to figure out what data should be passed to the summary network as summary conditions (I’m often confused). Here, I am not 100% sure, but I think you should pass the following:

  • RTs, choices
  • reward obtained (relevant for value updating)
  • correct response (relevant for flipping the drift rate)
  • trial ID (relevant for TimeSeriesTransformer)

Here is my version of the configurator in which I also scale the parameters:

def configurator(forward_dict):
    out_dict = {}

    data = forward_dict["sim_data"].astype(np.float32)
    context = np.array(forward_dict["sim_batchable_context"])
    time_stamp = np.repeat(
        np.linspace(0, 1, context.shape[1])[np.newaxis,...],
        context.shape[0], axis=0
    )[:, :, None]

    out_dict["summary_conditions"] = np.c_[data, time_stamp].astype(np.float32)

    N = np.log(forward_dict['sim_non_batchable_context'])
    N_vec = N * np.ones((data.shape[0], 1), dtype=np.float32)
    out_dict['direct_conditions'] = N_vec

    params = forward_dict["prior_draws"].astype(np.float32)
    out_dict["parameters"] = (params - PRIOR_MEAN) / PRIOR_STD
    
    return out_dict

Summary network

While the TimeSeriesTransformer you use should be fine, you could also try out an LSTM instead:

summary_net = tf.keras.Sequential([tf.keras.layers.LSTM(512), tf.keras.layers.Dense(64)])

With these adjustments, I was able to get some recovery for kappa.

Maybe this helps you as a starting point for further refinements. Perhaps, you might also want to look into the model specifications from the rlssm library.

Please let us know about your progress and if you have further questions.

Cheers,
Lukas

1 Like