import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pytensor.tensor as pt
import pymc as pm
import arviz as az
from pymc.distributions.dist_math import binomln, logpow
# hyper parameters
= 42
SEED = np.random.default_rng(SEED)
RNG = 100
BUFFER = 200
M
# plotting defaults
'figure.dpi'] = 600
plt.rcParams['fivethirtyeight')
plt.style.use('axes.facecolor'] = 'white'
plt.rcParams['figure.facecolor'] = 'white'
plt.rcParams['axes.spines.left'] = False
plt.rcParams['axes.spines.right'] = False
plt.rcParams['axes.spines.top'] = False
plt.rcParams['axes.spines.bottom'] = False
plt.rcParams["tab10")
sns.set_palette(
def invlogit(x):
'''Inverse logit function'''
return 1 / (1 + np.exp(-x))
def euclid_dist(X, S, library='np'):
'''Pairwise euclidian distance between points in (M, 2) and (N, 2) arrays'''
= X[np.newaxis, :, :] - S[:, np.newaxis, :]
diff
if library == 'np':
return np.sqrt(np.sum(diff ** 2, axis=-1))
elif library == 'pm':
return pm.math.sqrt(pm.math.sum(diff ** 2, axis=-1))
def half_normal(d, s, library='np'):
'''Half normal detection function.'''
if library == 'np':
return np.exp( - (d ** 2) / (2 * s ** 2))
elif library == 'pm':
return pm.math.exp( - (d ** 2) / (2 * s ** 2))
def exponential(d, s, library='np'):
'''Negative exponential detection function.'''
if library == 'np':
return np.exp(- d / s)
elif library == 'pm':
return pm.math.exp(- d / s)
# coordinates for each trap
= pd.read_csv('ovenbirdtrap.txt', delimiter=' ')
ovenbird_trap = ovenbird_trap.shape
trap_count, _
# information about each trap
= ovenbird_trap.x
trap_x = ovenbird_trap.y
trap_y = ovenbird_trap[['x', 'y']].to_numpy()
X
# define the state space around the traps
= trap_x.max() + BUFFER
x_max = trap_y.max() + BUFFER
y_max = trap_x.min() - BUFFER
x_min = trap_y.min() - BUFFER
y_min
# scale for plotting
= (y_max - y_min) / (x_max - x_min)
scale
# plot the trap locations
= 2
plot_width = plot_width * scale
plot_height = plt.subplots(figsize=(plot_width, plot_height))
fig, ax
# plot the traps
='x', s=40, linewidth=1.5, color='C1')
ax.scatter(trap_x, trap_y, marker
ax.set_ylim((y_min, y_max))
ax.set_xlim((x_min, x_max))
ax.annotate('44 nets\n30m apart', ha='center',
=(55, -150), xycoords='data', color='black',
xy=(40, 30), textcoords='offset points',
xytext=dict(arrowstyle="->", color='black', linewidth=1,
arrowprops="angle3,angleA=90,angleB=0"))
connectionstyle
# aesthetics
'Mist net locations')
ax.set_title(False)
ax.grid( plt.show()
Spatial capture-recapture
In this notebook, I show how to train spatial capture-recapture (SCR) models in PyMC. SCR expands upon traditional capture-recapture by incorporating the location of the traps in the analysis. This matters because, typically, animals that live near a particular trap are more likely to be caught in it. In doing so, SCR links individual-level processes to the population-level, expanding the scientific scope of simple designs.
In this notebook, I train the simplest possible SCR model, SCR0 (Royle et al. 2013, chap. 5), where the goal is estimating the true population size \(N\). Similar to the other closed population notebooks, I do so using parameter-expanded data-augmentation (PX-DA). I also borrow the concept of the detection function from the distance sampling notebook.
As a motivating example, I use the ovenbird mist netting dataset provided by Murray Efford via the secr package in R. The design of the study is outlined in Efford, Dawson, and Robbins (2004) and Borchers and Efford (2008). In this dataset, ovenbirds were trapped in 44 mist nets over 8 to 10 consecutive days during the summers of 2005 to 2009.
One difference between spatial and traditional (non-spatial) capture is the addition of the trap identifier in the capture history. Whereas a traditional capture history is [individual, occasion]
, a spatial capture history might be [individual, occasion, trap]
.
In the ovenbird example, I ignore the year
dimension, pooling parameters across years, which allows for better estimation of the detection parameters. My hack for doing so is treating every band/year combination as a unique individual in a combined year capture history. This is easy to implement, creates an awkward interpretation of \(N\) (see below).
# ovenbird capture history
= pd.read_csv('ovenbirdcapt.txt', delimiter=' ')
oven_ch
# create a unique bird/year identifier for each individual
'ID'] = oven_ch.groupby(['Year','Band']).ngroup()
oven_ch[= oven_ch.Day.max()
occasion_count
# merge the datasets, making sure that traps with no detections are included
= (
ovenbird 'ID', 'Net', 'Day']], how='left')
ovenbird_trap.merge(oven_ch[['ID', 'Day', 'Net', 'x', 'y']]
[['ID')
.sort_values(=True)
.reset_index(drop
)
10) ovenbird.head(
ID | Day | Net | x | y | |
---|---|---|---|---|---|
0 | 0.0 | 1.0 | 2 | -50.0 | -255.0 |
1 | 1.0 | 9.0 | 20 | -50.0 | 285.0 |
2 | 1.0 | 1.0 | 15 | -50.0 | 135.0 |
3 | 2.0 | 6.0 | 17 | -50.0 | 195.0 |
4 | 2.0 | 9.0 | 27 | 49.0 | 165.0 |
5 | 2.0 | 1.0 | 15 | -50.0 | 135.0 |
6 | 2.0 | 1.0 | 14 | -50.0 | 105.0 |
7 | 3.0 | 1.0 | 41 | 49.0 | -255.0 |
8 | 3.0 | 3.0 | 39 | 49.0 | -195.0 |
9 | 3.0 | 1.0 | 42 | 49.0 | -285.0 |
Simulation
Before estimating the parameters, I perform a small simulation. The simulation starts with a core idea of SCR: the activity center. The activity center \(\mathbf{s}_i\) is the most likely place that you’d find an individual \(i\) over the course of the trapping study. In this case, I assume that activity centers are uniformly distributed across the sample space.
I compute the probability of detection for individual \(i\) at trap \(j\) as \(p_{i,j}=g_0 \exp(-d_{i,j}^2/2\sigma^2),\) where \(g_0\) is the probability of detecting an individual when it’s activity center is at the trap, \(d_{i,j}\) is the euclidean distance between the trap and the activity center, and \(\sigma\) is the detection range parameter.
# true population size
= 150
N
# simulate activity centers
= RNG.uniform(x_min, x_max, N)
sx_true = RNG.uniform(y_min, y_max, N)
sy_true = np.column_stack((sx_true, sy_true))
S_true
# true distance between the trap and the activity centers
= euclid_dist(X, S_true)
d_true
# detection parameters
= 0.025
g0_true = 73
sigma_true
# simulate the number of captures at each trap for each individual
= g0_true * half_normal(d_true, sigma_true)
capture_probability = RNG.binomial(occasion_count, capture_probability)
sim_Y
# filter out undetected individuals
= sim_Y.sum(axis=1) > 0
was_detected = sim_Y[was_detected]
sim_Y_det = int(was_detected.sum()) n_detected
Following Royle et al. (2013), Chapter 5, I first fit the version of the model where we assume that we know the true population size. In this case, I’m only estimating the detection parameters and the activity center locations.
# upper bound for the uniform prior on sigma
= 150
U_SIGMA
with pm.Model() as known:
# priors for the activity centers
= pm.Uniform('sx', x_min, x_max, shape=n_detected)
sx = pm.Uniform('sy', y_min, y_max, shape=n_detected)
sy = pt.stack([sx, sy], axis=1)
S
# priors for the detection parameters
= pm.Uniform('g0', 0, 1)
g0 = pm.Uniform('sigma', 0, U_SIGMA)
sigma
# probability of capture for each individual at each trap
= euclid_dist(X, S, 'pm')
distance = pm.Deterministic('p', g0 * half_normal(distance, sigma))
p
# likelihood
pm.Binomial('y',
=p,
p=occasion_count,
n=sim_Y_det
observed
)
pm.model_to_graphviz(known)
with known:
= pm.sample() known_idata
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [sx, sy, g0, sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 28 seconds.
=['g0', 'sigma']) az.summary(known_idata, var_names
mean | sd | hdi_3% | hdi_97% | mcse_mean | mcse_sd | ess_bulk | ess_tail | r_hat | |
---|---|---|---|---|---|---|---|---|---|
g0 | 0.034 | 0.004 | 0.027 | 0.041 | 0.000 | 0.000 | 1732.0 | 2543.0 | 1.0 |
sigma | 78.852 | 5.207 | 69.422 | 88.756 | 0.162 | 0.114 | 1027.0 | 1729.0 | 1.0 |
az.plot_trace(
known_idata, =['g0', 'sigma'],
var_names=(8,4),
figsize=[("g0", {}, [g0_true]), ("sigma", {}, [sigma_true])]
lines; )
The trace plots show reasonable agreement between the true parameter values and the estimated values, although \(g_0\) appears to be overestimated.
Ovenbird density
Now, I estimate the density \(D\) for the ovenbird population. Like distance sampling, SCR can robustly estimate the density of the population, regardless of the size of the state space. The difference between the model above and this one is that we use PX-DA to estimate the inclusion probability \(\psi,\) and subsequently \(N.\) First, I convert the DataFrame
to a (n_detected, n_traps)
array of binomial counts.
def get_Y(ch):
'''Get a (individual_count, trap_count) array of detections.'''
# count the number of detections per individual per trap
= pd.crosstab(ch.ID, ch.Net, dropna=False)
detection_counts
# remove the ghost nan individual
= detection_counts.loc[~detection_counts.index.isna()]
detection_counts
= detection_counts.to_numpy()
Y return Y
= get_Y(ovenbird)
Y = Y.shape
detected_count, trap_count
# augmented spatial capture histories with all zero histories
= np.zeros((M - detected_count, trap_count))
all_zero_history = np.row_stack((Y, all_zero_history)) Y_augmented
Similar to the occupancy notebook, I use a custom distribution to model the zero-inflated data. This is necessary because the zero inflation happens at the individual (row) level. This is, in fact, the same distribution as the occupancy model, although including the binomial coefficient.
def logp(value, n, p, psi):
= binomln(n, value) + logpow(p, value) + logpow(1 - p, n - value)
binom = pm.math.sum(binom, axis=1)
bin_sum = pm.math.exp(bin_sum)
bin_exp
= pm.math.switch(
res sum(axis=1) > 0,
value.* psi,
bin_exp * psi + (1 - psi)
bin_exp
)
return pm.math.log(res)
with pm.Model() as oven:
# Priors
# activity centers
= pm.Uniform('sx', x_min, x_max, shape=M)
sx = pm.Uniform('sy', y_min, y_max, shape=M)
sy = pt.stack([sx, sy], axis=1)
S
# capture parameters
= pm.Uniform('g0', 0, 1, initval=0.05)
g0 = pm.Uniform('sigma', 0, U_SIGMA)
sigma
# inclusion probability
= pm.Beta('psi', 0.001, 1)
psi
# compute the capture probability
= euclid_dist(X, S, 'pm')
distance = pm.Deterministic('p', g0 * half_normal(distance, sigma))
p
# likelihood
pm.CustomDist('y',
occasion_count,
p,
psi,=logp,
logp=Y_augmented
observed
)
pm.model_to_graphviz(oven)
with oven:
= pm.sample() oven_idata
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [sx, sy, g0, sigma, psi]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 54 seconds.
=['g0', 'sigma', 'psi']) az.summary(oven_idata, var_names
mean | sd | hdi_3% | hdi_97% | mcse_mean | mcse_sd | ess_bulk | ess_tail | r_hat | |
---|---|---|---|---|---|---|---|---|---|
g0 | 0.029 | 0.004 | 0.022 | 0.037 | 0.000 | 0.000 | 1944.0 | 2922.0 | 1.0 |
sigma | 71.180 | 4.617 | 62.580 | 79.889 | 0.161 | 0.114 | 832.0 | 1567.0 | 1.0 |
psi | 0.701 | 0.056 | 0.595 | 0.806 | 0.001 | 0.001 | 3564.0 | 2432.0 | 1.0 |
= [0.025]
g0_mle = [73]
sigma_mle
az.plot_trace(
oven_idata, =['g0', 'sigma'],
var_names=(8,4),
figsize=[("g0", {}, [g0_mle]), ("sigma", {}, [sigma_mle])]
lines; )
The estimates are quite close to the maximum likelihood estimates, which I estimated using the secr package in R.
Finally, I estimate density \(D\) using the results. As in the closed capture-recapture and distance sampling notebooks, I use the posterior samples of \(\psi\) and \(M\) to sample the posterior of \(N.\) This \(N,\) however, has an awkward interpretation because I pooled across the years by combining all the detection histories. To get around this, I compute the average annual abundance by dividing by the total number of years in the sample. Then, I divide by the area of the state space.
def sim_N(idata, n, K):
= az.extract(idata).psi.to_numpy()
psi_samps = az.extract(idata).p
p_samps = p_samps[n:, :, :]
p_samps_undet
= (1 - p_samps_undet) ** K
bin_probs = bin_probs.prod(axis=1)
bin_prod = (bin_prod * psi_samps) / (bin_prod * psi_samps + (1 - psi_samps))
p_included
= RNG.binomial(1, p_included).sum(axis=0)
number_undetected = n + number_undetected
N_samps
return N_samps
= sim_N(oven_idata, detected_count, occasion_count)
N_samps
# kludgy way of calculating avergage abundance
= 5
year_count = N_samps // year_count
average_annual_abundance
# area of the state space in terms of hectares
= 100 * 100
ha = (x_max - x_min) * (y_max - y_min) / ha
mask_area
# density
= average_annual_abundance / mask_area
D_samples = 1.262946
D_mle
= plt.subplots(figsize=(4,4))
fig, ax ='white', bins=13)
ax.hist(D_samples, edgecolor='--',color='C1')
ax.axvline(D_mle, linestyle'Ovenbirds per hectare')
ax.set_xlabel('Number of samples')
ax.set_ylabel(1.4, 800, rf'$\hat{{D}}$={D_samples.mean():.2f}', va='bottom', ha='left')
ax.text( plt.show()
Sometimes, the location of the activity centers is of interest. Below, I plot the posterior median for the activity centers for the detected individuals,
= az.extract(oven_idata).sx
sx_samps = az.extract(oven_idata).sy
sy_samps
= np.median(sx_samps[:detected_count], axis=1)
sx_mean = np.median(sy_samps[:detected_count], axis=1)
sy_mean
# plot the trap locations
= 3
plot_width = plot_width * scale
plot_height = plt.subplots(figsize=(plot_width, plot_height))
fig, ax
# plot the traps
='x', s=40, linewidth=1.5, color='C1')
ax.scatter(trap_x, trap_y, marker
ax.set_ylim((y_min, y_max))
ax.set_xlim((x_min, x_max))
# plot the mean activity centers
='o', s=4, color='C0')
ax.scatter(sx_mean, sy_mean, marker
# aesthetics
'Estimated activity centers')
ax.set_title(False) ax.grid(
We can also look at the uncertainty around those estimates. Below, I plot the posterior distribution of the activity centers for two individuals.
= 49
one = sx_samps[one]
sx1 = sy_samps[one]
sy1
= 2
two = sx_samps[two]
sx2 = sy_samps[two]
sy2
= plt.subplots(figsize=(plot_width, plot_height))
fig, ax
# plot the traps
='x', s=40, linewidth=1.5, color='C1')
ax.scatter(trap_x, trap_y, marker
ax.set_ylim((y_min, y_max))
ax.set_xlim((x_min, x_max))
# plot the distributions of the activity centers
='o', s=1, color='C0', alpha=0.2)
ax.scatter(sx1, sy1, marker='o', s=1, color='C0', alpha=0.2)
ax.scatter(sx2, sy2, marker
# plot the mean
='o', s=20, color='C0')
ax.scatter(sx1.mean(), sy1.mean(), marker='o', s=20, color='C0')
ax.scatter(sx2.mean(), sy2.mean(), marker
# add the label
+ 5, f'{one}', ha='center', va='bottom')
ax.text(sx1.mean(), sy1.mean() + 5, f'{two}', ha='center', va='bottom')
ax.text(sx2.mean(), sy2.mean()
# aesthetics
'Posterior of two activity centers')
ax.set_title(False)
ax.grid( plt.show()
Finally, I plot the posterior distribution of the detection function.
= np.arange(BUFFER * 2)
xx
= az.extract(oven_idata).sigma.values.flatten()
sigma_samps = az.extract(oven_idata).g0.values.flatten()
g0_samps
= np.array(
p_samps * half_normal(xx, s) for g, s in zip(g0_samps, sigma_samps)]
[g
)
= p_samps.mean(axis=0)
p_mean = np.quantile(p_samps, 0.02, axis=0)
p_low = np.quantile(p_samps, 0.98, axis=0)
p_high
= plt.subplots(figsize=(5,4))
fig, ax
'-')
ax.plot(xx, p_mean, =0.2)
ax.fill_between(xx, p_low, p_high, alpha
'Detection function')
ax.set_title(r'$p$')
ax.set_ylabel(r'Distance (m)')
ax.set_xlabel(
plt.show()
%load_ext watermark
%watermark -n -u -v -iv -w
Last updated: Mon Nov 25 2024
Python implementation: CPython
Python version : 3.12.7
IPython version : 8.29.0
numpy : 1.26.4
pandas : 2.2.3
pytensor : 2.26.3
matplotlib: 3.9.2
pymc : 5.18.2
seaborn : 0.13.2
arviz : 0.20.0
Watermark: 2.5.0