1.4 Probabilistic Arithmetic
In Section 1.2 we were interested in how the visiting times of a patient would become distributed over periods of half a year after many visits. We can formalize this problem as follows for one patient. Let \(X_k\) be the \(k\)th inter-arrival time, and write \(C\) for the length of the interval in which the patient has to see a doctor; here \(C\) is half a year. Then the times at which the arrivals are distributed over the periods behave like
\begin{align*} Z_{k} = Z_{k-1} + X_{k} \bmod C, \quad Z_{0} = 0. \end{align*}To compute the distribution of \(Z_k\) we need to be able to compute the distribution of \(Y_{k} = Z_{k-1} + X_k\), and then find the distribution of \(f(Y_k)\) where \(f(x) = x \bmod C\). Clearly, this is a very unattractive task to carry out by hand. In this section we develop Python code so that the computer can do all this tedious work.
More generally, in later sections, we will use a considerable amount of numerical work to obtain insight into the behavior of stochastic processes. For this we will use a Python class that allows us to do probabilistic arithmetic. By focusing on coding patterns we can achieve that and, in the meantime, turn coding into an interesting intellectual challenge. At the end of this section, we use this code to provide numerical insight into the distribution of \(\{Z_k\}\) discussed above.
Let us start by making explicit what we mean by a pattern. As a first pattern, consider the \(+\) operator. Suppose that we only know how to do addition on the positive integers.1 For example: \(3+5=8\). Observe that the definition of addition on integers cannot be directly applied to rationals. To add fractions, we need to extend the symbol \(+\) to the following procedure: make the denominators of the fractions identical, and then add the numerators.2 \(1/3+1/2 = 2/6 + 3/6 = (2+3)/6 = 5/6\). However, this procedure does not work to compute \(\sqrt 2 + \sqrt 3\).3 \(\sqrt 2\) and \(\sqrt 3\) are not rationals so we cannot make the denominators equal. To add general real numbers, we need to extend the definition of \(+\) yet further.4 One way is like this: \(\sqrt 2 + \sqrt 3 = \sup\{x + y : x, y \in \Q, x^{2} \leq 2, y^{2} \leq 3\}\). Next, we need to define addition for complex numbers, vectors, matrices \ldots In summary, when learning mathematics, we have been silently \emph{overloading} the \(+\) operator to many different situations.
In our present case, we write \(X+Y\) for two independent rvs to mean the convolution:
\begin{equation*} \P{X+Y=z} = \sum_{i} \sum_{j} p_{i} q_{j} \1{i+j=z}, \tag{1.4.1} \end{equation*}where \(p_{i} = \P{X=i}\), \(q_j = \P{Y=j}\) and the indicator \(\1{A} = 1\) if the event \(A\) occurs, and \(0\) otherwise. Below we implement this in code.5 For addition there exists a much faster method based on Fast Fourier transforms.
Thus, the first pattern that helps us to reason in more abstract ways about problems is to use operator overloading.
This brings us to a second pattern: observe that the expression \(X+Y\) combines two rvs \(X\) and \(Y\) and then applies some function. Similarly, the operations \(X-Y\), \(XY\), \(\max\{X, Y\}\) are all functions applied to two independent rvs. What we therefore need is a method to compute the pmf of a new rv \(f(X,Y)\), where \(f\) is some general \emph{binary} function.6 A binary function depends on two arguments.
We identify a third pattern when we realize that to compute \(\V X\), we need \(\E{X^{2}}\). But \(\E{X^{2}}\) is the expectation of the rv \(g(X)\) where \(g(x) = x^{2}\). So, we also need a method to compute the pmf of the rv \(g(X)\), where now \(g\) is some general \emph{unary} function.7 A unary function depends on just one argument.
In summary, we want to build code that supports for rvs:
- operator overloading for addition, subtraction, and potentially other arithmetical operations,
- the application of a unary function to obtain a new rv
- the application of a binary function.
We call these procedures probabilistic arithmetic, and we will build, step by step, elegant and generic code to support this. However, we first need to introduce some computer science concepts.
A useful data structure to store the pmf of discrete rvs is a dictionary.
X = {1: 1/2, 2: 1/2}
Y = {1: 1/2, 2: 1/2}
It is obvious that \(Z = X + Y\) should be Z = {2: 1/4, 3: 1/2, 4: 1/4}.
Let us use to compute the pmf of \(Z=X+Y\).
Observe that the result of this code is wrong: the pmf of \(Z=3\) is not \(1/2\).
Z = {}
for i, p in X.items():
for j, q in Y.items():
Z[i + j] = p * q
print(f"{Z.keys()=}")
print(f"{Z.values()=}")
Z.keys()=dict_keys([2, 3, 4])
Z.values()=dict_values([0.25, 0.25, 0.25])
The problem is that in line 4 Z[3] gets overwritten by the = operator.8
The = is an assignment operator!
Actually, we need something that adds probabilities, rather than just assigning them, as in the code above.
The correct data structure is a defaultdict(float) which provides a default value of \(0\) when a key in the dict is missing.
from collections import defaultdict
Z = defaultdict(float)
for i, p in X.items():
for j, q in Y.items():
Z[i + j] += p * q # note the +=
print(f"{Z.keys()=}")
print(f"{Z.values()=}")
Z.keys()=dict_keys([2, 3, 4])
Z.values()=dict_values([0.25, 0.5, 0.25])
Now the result is correct.
Note that we use the operator +=, a convenient way to perform addition in place.
To see why it is useful, consider the following code.
In particular, the third line is a much cleaner (shorter) way to add \(3\) to the variable's value.
very_long_variable = 3
very_long_variable = very_long_variable + 3
very_long_variable += 3 # compare this to the line above
\newthought{The above code} works in principle, but handling the pmf as a dictionary requires some care. Here is an example to understand the problem. We all agree that \(1/3 = 1-2/3\), so let us see what happens if we run the following code.
Z = defaultdict(float)
Z[1/3] = 1
Z[1 - 2 / 3] += 1
print(Z.keys())
dict_keys([0.3333333333333333, 0.33333333333333337])
The dictionary Z suddenly contains two elements instead of just one.
Inspecting the last digit of the keys, we see that in one number the last digit is \(7\) and in the other it is \(3\).
So, for a computer \(1-2/3 \neq 1/3\).9
This is not something specific to Python; the problem lies in the fact that most real numbers cannot be represented as floating point numbers.
To avoid this, we round the elements of the support to a fixed precision and use fractions, instead of floats, as keys.
from fractions import Fraction
a, b = 1 / 3, 1 - 2 / 3
af = Fraction(a).limit_denominator(1000)
bf = Fraction(b).limit_denominator(1000)
print(a == b) # this should be False
print(af == bf) # this should be True
False
True
In the code below, we will use the so-called \(\lambda\) function.
Sometimes functions are so short and simple that we simply do not want to bother to give them a name.
For such cases, the \(\lambda\) function serves as an anonymous function.
Note that to apply the function lambda x: 3 * x to the number 8, we must place the expression in parentheses.
print((lambda x: 3 * x)(8))
24
Sometimes we need to add many rvs, in particular we would like to be able to code the equivalent of \(\sum_{i=1}^{k} X_{i}\). Let us first see how this works in Python for normal numbers.
print(sum(x for x in [1, 2, 3]))
6
However, there is a small catch.
The sum needs a starting element.
To see why, consider the following code and output; we will discuss both next.
print(sum(x for x in "dog"))
TypeError: unsupported operand type(s) for +: 'int' and 'str'
So, we pass x as a string, and then sum fails, indicating that it cannot add an integer and a string.
But where does the int in the error text come from?
As it turns out, sum starts with a default value \(0\) and then sums from left to right.
In other words, the above summation over the list [1, 2, 3] expands to \(0 + 1 + 2 + 3\).
This clarifies the error: the sum 0 + "dog" is not well defined.
Below we point out when this problem becomes relevant.
A final general concept we will use is caching. By caching, it is possible to store the result of a function in a dictionary rather than evaluating the function body again on a second call to the function. For simple functions, this is not necessary, but when the evaluation of the function requires substantial numerical work, caching decreases computation times beyond imagination (milliseconds instead of centuries for the same result).
from functools import cache
@cache
def f(x):
print("The body of the function is called.")
return 8 * x
print(f(3)) # first call: run function body
print(f(3)) # second call, retrieve from memory, don't run body
print(f(4)) # new argument: run function body
print(f(4)) # second call with argument, don't run body
The body of the function is called.
24
24
The body of the function is called.
32
32
Clearly, the print statement in the function body is only run when the function receives a new argument.
Thus, @cache stores for a given value10
Here 3 and 4.
the output of f and does not evaluate the body of f for a second time when the function receives the same value for the argument.
We now turn to building a Python class to work conveniently with rvs.
We start with loading a number of modules that provide some basic functionality.
import operator
from collections import defaultdict
from fractions import Fraction
import numpy as np
from numpy.random import default_rng
from functions import normalize
The meaning of the next variables will become clear later.
The function toFrac is a convenience function to hide how to convert a float to a fraction of the desired precision.
If we want to change the precision at a later stage, or want to change the conversion, we only have to change the code here.
max_denominator = 1_000_000
thres = 1e-16 # Reject probabilities smaller than this.
seed = 3 # For the random number generator.
def toFrac(x):
"Convert x to fraction of specified precision."
return Fraction(x).limit_denominator(max_denominator)
The normalize function works like this:
def normalize(pmf):
norm = sum(pmf.values())
# no small prob events.
return {k: nv for k, v in pmf.items() if (nv := v / norm) > thres}
The RV class, which we build in the following code blocks, stores the pmf and the support of a rv as a dictionary; for instance, {0: 1 / 3, 1: 2 /3 } represents a biased coin with support \(\{0, 1\}\), pmf \(p_{0} = 1/3\), and \(p_{1} = 2/3\). RV uses _pmf, _support and _cdf as private members.
class RV:
"A random variable whose keys form the support and values the pmfs."
def __init__(self, pmf):
self._pmf = self.make_pmf(pmf)
self._support = np.array(sorted(self._pmf.keys()))
self._cdf = np.cumsum([self._pmf[k] for k in self._support])
To make the pmf we use Fraction and defaultdict for the reasons mentioned earlier.
As we do not include outcomes whose probabilities are too small, we must normalize at the end.
def make_pmf(self, pmf):
res = defaultdict(float)
for k, pk in pmf.items():
res[toFrac(k)] += pk
return normalize(res)
The next methods provide access to the pmf and the support, and the number of elements in the pmf.
When x is not in the support, the pmf is \(0\).
def pmf(self, x):
return self._pmf.get(toFrac(x), 0)
def support(self) :
return self._support
def __len__(self):
return len(self._support)
def __repr__(self):
return ", ".join(f"{k}: {self._pmf[k]}" for k in self._support)
Computing the cdf with binary search via np.searchsorted works really fast.
With the cdf, the survivor function follows immediately.
We do not need to cache the sf, because cdf already uses caching.
@cache
def cdf(self, x):
if x < self._support[0]:
return 0
if x >= self._support[-1]:
return 1
return self._cdf[np.searchsorted(self._support, x)]
def sf(self, x):
"Survivor function"
return 1 - self.cdf(x)
The computation of the mean, variance, and standard deviation becomes easy once we realize that all these functions can be expressed in terms of the pattern \(\E{f(X)}\) for suitably chosen functions \(f\).
In passing, we remark that by taking \(f(x) = \1{x\leq y}\), the cdf \(F(x) = \E{f(x)} = \sum_{k} \1{k \leq x} p_{k}\).
However, this is not efficient as compared to the binary search used above.
The typing f: Callable[numeric, numeric] means that the function f is supposed to map a number to a number.
def E(self, f):
"Compute E(f(X))"
return sum(f(i) * self.pmf(i) for i in self.support())
With the expectation defined, the mean, variance and standard deviation follow from simple \(\lambda\) functions.
def mean(self):
return self.E(lambda x: x)
def var(self):
return self.E(lambda x: x**2) - self.mean() ** 2
def std(self):
return np.sqrt(self.var())
The method sortedsupport of RV sorts the elements in the support in decreasing size of probability mass.
We explain below why we need this.
@cache
def sortedsupport(self):
"Return the support sorted in decreasing order of the pmf."
return np.array(sorted(self._pmf, key=self._pmf.get, reverse=True))
Besides this general functionality, we want to use our RV class to generate random deviates that are distributed according to the cdf \(F\).
Recall that if \(U\sim \Unif{[0,1]}\), then \(F^{-1}(U)\) has the required distribution, because
where the last equality follows from the fact that \(U\) is uniform on \([0,1]\).
def rvs(self, size = 1, random_state=default_rng(seed)):
"Generate an array with 'size' number of random deviates."
U = random_state.uniform(size=size)
pos = np.searchsorted(self._cdf, U)
return self.support()[pos].astype(float)
We are almost finished with the class definition of RV, but we need two additional elements.
To add or subtract two instances of RV, the class needs to know what to do when writing code like X + Y or X - Y.
The private method __add__ achieves this by calling map2, which is defined below to compute \(f(X, Y)\) for general \(f\).
Subtraction uses __sub__, and implements it as X + (- Y).
We explain the convert function below.
def __add__(self, other):
other = convert(other)
return map2(self, other, operator.add)
def __neg__(self):
return RV({-k: self.pmf(k) for k in self.support()})
def __sub__(self, other):
return self + (-other)
def __truediv__(self, other):
other = convert(other)
return map2(self, other, operator.truediv)
def __mul__(self, other):
other = convert(other)
return map2(self, other, operator.mul)
Next, we want to support the += and -= operators, so that we can write X -= Y.
At a later stage, we might also want to add many rvs with code such as sum(X for _ in range(5)).
To support this, we must provide sum with an initial element to start the summations; recall the earlier TypeError.
The next two methods enable this functionality for both cases.
def __radd__(self, other):
# support sum([rv for i in ...])
return self.__add__(convert(other))
def __rsub__(self, other):
return convert(other).__sub__(self) # mind the sequence of b - a
As a last feature, how to check that two rvs are equal?
This is easy: when the dictionaries that store the pmfs are equal.
Note that when overloading the method __eq__, it is necessary to implement the __hash__ method too.
(Ask an LLM why.)
def __eq__(self, other):
return self._pmf == other._pmf
def __hash__(self):
return id(self)
This nearly finishes the RV class; the map method follows below.
The two remaining patterns are binary and unary functions.
Supporting binary functions works the same as addition in Eq. (1.4.1).
Observe that we break the inner for loop when the probability becomes too small.
This rule assumes that the order in which the elements in the support pass by is in decreasing order of pmf.
The method sortedsupport of RV ensures that this is the case.
def map2(X, Y, f):
"Make the rv f(X, Y) for the independent rvs X and Y."
c = defaultdict(float)
for i in X.sortedsupport():
for j in Y.sortedsupport():
p = X.pmf(i) * Y.pmf(j)
c[f(i, j)] += p
if p <= thres:
break
return RV(c)
Here is how to turn the application of a unary function f to a RV into a RV.
def map(self, f):
"Make the rv f(X)."
c: defaultdict[B, float] = defaultdict(float)
for k in self.support():
c[f(k)] += self.pmf(k)
return RV(c)
Finally, what would happen if we write 2 + X to mean that the support of X needs to be shifted two steps to the right?
Observe that we are once again overloading the + operator: it adds an integer to a RV.
Actually, we can handle this in our framework by interpreting the number \(2\) as a trivial rv \(Y\), i.e., with all probability mass concentrated on the number \(2\).
So, if we convert ints and floats, by means of a function convert, into a RV, we are done.
Observe that we already used convert in __add__ and __sub__.
def convert(rv):
"Check and convert to rv if necessary."
match rv:
case RV():
return rv
case int():
return RV({rv: 1}) # An int is a shift.
case float():
return RV({rv: 1}) # A float is a shift too.
case _:
raise ValueError("Unknown type passed as a RV")
With this class we can study the content of Th. 1.3.1 in numerical terms. More specifically, how fast do the arrival times on the circle converge to the uniform distribution? Recall that the arrival times on the circle are defined as \(A_{k} = A_{k-1} + X_{k} \bmod C\). Thus, for our numerical study, we want to add rvs and apply the function \(x \to x \bmod C\) where \(C\) is the length of the cycle.
The RV class already supports + for rvs.
For the modulo function, the following function applies to integers, floats, and rvs.11
I like to write (and read) code that resembles the mathematical operations I write on paper.
def mod(a, b):
"Compute a modulo b where a can also be a RV."
match a:
case int():
return a % b
case float():
return a % b
case RV():
return a.map(lambda x: x % b)
case _:
raise ValueError("Unknown type passed to mod")
We iteratively compute the arrival time position \(\sum_{i=1}^{n} X_{i} \bmod C\) where the inter-arrival time is \(X_{i}\sim\Unif{\{-1, 0, 1\}}\), and \(A_{0} = 0\). We track the arrival times that occur with the smallest and the largest probability.
def minmax(cycle, num):
position = RV({0: 1})
mins, maxs = [], []
for i in range(num):
position = mod(position + step, cycle)
pmf = [position.pmf(k) for k in range(cycle)]
mins.append(min(pmf))
maxs.append(max(pmf))
return mins, maxs
This code creates the left panel in Fig. 1; the right panel is created in the same way. The caption discusses our findings.
cycle = 5
num = 30
xx = range(num)
mins, maxs = minmax(cycle, num)
ax1.plot(xx, maxs, "x", ms=2, ls=":", lw=1, label="Maxs")
ax1.plot(xx, mins, "o", ms=2, ls=":", lw=1, label="Mins")
ax1.set_title("Cycle 5")
ax1.set_xlabel("Iteration")
ax1.set_ylabel("Probability")
ax1.legend()

Claim: this program prints 40.
norm = 5
x = [100, 20, 30, 40]
y = {i: x[i] // norm for i in range(len((x)))}
print(y[3])
Solution
Solution, for real
False, it prints \(40/5=8\).
Claim: if we apply this function to two independent rvs \(X\) and \(Y\) and take f(i,j) = i/j, we get a RV that represents \(Z=X/Y\).
def map2(X, Y, f):
"Make the rv f(X, Y) for the independent rvs X and Y."
c = defaultdict(float)
for i in X.sortedsupport():
for j in Y.sortedsupport():
p = X.pmf(i) * Y.pmf(j)
c[i, j] += p
if p <= thres:
break
return RV(c)
Solution
Solution, for real
False. The line c[i, j] += p is incorrect.
Here is a possible variation of this question.
Claim: if we apply this function to two independent rvs \(X\) and \(Y\) and take f(i,j) = i/j, we get a RV that represents \(Z=X/Y\).
def map2(X, Y, f):
"Make the rv f(X, Y) for the independent rvs X and Y."
c = defaultdict(float)
for i in X.support():
for j in Y.support():
p = X.pmf(i) * Y.pmf(j)
c[f(i, j)] += p
return RV(c)
Yet another variation is to write p = X.pmf(i) / Y.pmf(j).
This code is incorrect.
Let the rv \(X\) have cumulative distribution function \(F_X\) and let \(U\sim\text{Unif}(0,1)\). Claim: \(F_{X}(F_{X}^{-1}(U))\sim\text{Unif}(0,1)\).
Solution
Solution, for real
False when \(F\) has jumps.
Let \(X\) and \(Y\) be two independent rvs with support \(\{0, 1, 2, \ldots\}\) and probability mass functions \(f_X\) and \(f_Y\) respectively. Claim: \[ \mathbb{P}(X-Y\le k)=\sum_{i=0}^{\infty}\sum_{i=0}^{\infty}1\{i-j\le k\}f_X(i)f_Y(j). \]
Solution
Solution, for real
False. Check the index of the second summation.
Consider the following code.
a, b = 1000 / 3001, 1001 / 3000
af = Fraction(a).limit_denominator(1000)
bf = Fraction(b).limit_denominator(1000)
print(af, bf)
Claim: this program can print 1/3 333/998.
Solution
Solution, for real
True. The denominators are limited to 1000, so the program looks for the fraction, with a denominator with at most \(1000\), that is closest to \(a\) and \(b\).
Consult the RealPython website to improve your understanding of defaultdict, the lambda function, caching, and classes.
If you are interested in somewhat more abstract code, read also about decorators to see how caching is implemented.
Of course, you can also ask an LLM to explain these concepts: I use it a lot myself and I still learn from the solutions it offers.
However, the explanations on RealPython are better.
Solution
Solution, for real
Type on the Google prompt: `realpython defaultdict', click on the link that appears. Then read and think hard. Recall that coding involves intellectual effort.