尝试用 Python 写了个病毒传播模拟程序

病毒扩散仿真程序,用 python 也可以。

运行效果:
http://www.bilibili.com/video/av89012144/

概述

事情是这样的,B 站 UP 主 @ele 实验室,写了一个简单的疫情传播仿真程序,告诉大家在家待着的重要性,视频相信大家都看过了,并且 UP 主也放出了源码。

因为是 Java 开发的,所以开始我并没有多加关注。后来看到有人解析代码,发现我也能看懂,然后就琢磨用 Python 应该怎么实现。

Java 版程序浅析

一个人就是 1 个(x, y)坐标点,并且每个人有一个状态。

public class Person extends Point {
    private int state = State.NORMAL;
}

在每一轮的迭代中,遍历每个人,每个人根据自身的状态,做出一定的动作,包括:

  • 移动
  • 状态变化
  • 影响他人

这些动作的具体变更,取决于定义的各种系数。

一轮迭代完成,打印这些点,不同的状态对应不同的颜色。

绘图部分直接使用的 Java 绘图类 Graphics。

Python 版思路

如果我们想用 Python 实现应该怎么做呢?

如果完全复刻 Java 版本,则每次迭代需遍历所有人,并计算和他人距离,这就是 N^2 次计算。如果是 1000 个人,就需要循环 1 百万次。这个 Python 的性能肯定捉急。

不过 Python 有 numpy ,可以快速的操作数组。结合 matplotlib 则可以画出图形。

import numpy as np
import matplotlib.pyplot as plt

如何模拟人群

为了减少函数之间互相传参和使用全局变量,我们也来定义一个类:

class People(object):
    def __init__(self, count=1000, first_infected_count=3):
        self.count = count
        self.first_infected_count = first_infected_count
        self.init()

所有人的坐标数据就是 N 行 2 列的数组,同时伴随一定的状态:

    def init(self):
        self._people = np.random.normal(0, 100, (self.count, 2))
        self.reset()

状态值和计时器也都是数组,同时每次随机选取指定数量的人感染:

    def reset(self):
        self._round = 0
        self._status = np.array([0] * self.count)
        self._timer = np.array([0] * self.count)
        self.random_people_state(self.first_infected_count, 1)

这里关键的一点是,辅助数组的大小和人数保持一致,这样就能形成一一对应的关系。

状态发生变化的人才顺带记录时间:

    def random_people_state(self, num, state=1):
        """随机挑选人设置状态
        """
        assert self.count > num
        # TODO:极端情况下会出现无限循环
        n = 0
        while n < num:
            i = np.random.randint(0, self.count)
            if self._status[i] == state:
                continue
            else:
                self.set_state(i, state)
                n += 1

    def set_state(self, i, state):
        self._status[i] = state
        # 记录状态改变的时间
        self._timer[i] = self._round

通过状态值,就可以过滤出人群,每个人群都是 people 的切片视图。这里 numpy 的功能相当强大,只需要非常简洁的语法即可实现:

    @property
    def healthy(self):
        return self._people[self._status == 0]

    @property
    def infected(self):
        return self._people[self._status == 1]

按照既定的思路,我们先来定义每轮迭代要做的动作:

    def update(self):
        """每一次迭代更新"""
        self.change_state()
        self.affect()
        self.move()
        self._round += 1
        self.report()

顺序和开始分析的略有差异,其实并不是十分重要,调换它们的顺序也是可以的。

如何改变状态

这一步就是更新状态数组 self._status 和 计时器数组 self._timer:

    def change_state(self):
        dt = self._round - self._timer
        # 必须先更新时钟再更新状态
        d = np.random.randint(3, 5)
        self._timer[(self._status == 1) & ((dt == d) | (dt > 14))] = self._round
        self._status[(self._status == 1) & ((dt == d) | (dt > 14))] += 1

仍然是通过切片过滤出要更改的目标,然后全部更新。

这里具体的实现我写的非常简单,没有引入太多的变量:

在一定周期内的 感染者(infected),状态置为 确诊(confirmed)。 我这里简单假设了确诊者就被医院收治,所以失去了继续感染他人的机会(见下面)。如果要搞复杂点,可以引入病床,治愈,死亡等状态。

如何影响他人

影响别人是整个程序的性能瓶颈,因为需要计算每个人之间的距离。

这里继续做了简化,只处理感染者:

    def infect_possible(self, x=0., safe_distance=3.0):
        """按概率感染接近的健康人
        x 的取值参考正态分布概率表,x=0 时感染概率是 50%
        """
        for inf in self.infected:
            dm = (self._people - inf) ** 2
            d = dm.sum(axis=1) ** 0.5
            sorted_index = d.argsort()
            for i in sorted_index:
                if d[i] >= safe_distance:
                    break  # 超出范围,不用管了
                if self._status[i] > 0:
                    continue
                if np.random.normal() > x:
                    continue
                self._status[i] = 1
                # 记录状态改变的时间
                self._timer[i] = self._round

可以看到,距离的计算仍然是通过 numpy 的矩阵操作。但是需要对每一个感染者单独计算,所以如果感染者较多,python 的处理效率感人。

如何移动

_people 是一个坐标矩阵,只要生成移动距离矩阵 dt,然后它相加即可。我们可以设置一个可移动的范围 width,把移动距离控制在一定范围内。

    def move(self, width=1, x=.0):
        movement = self.random_movement(width=width)
        # 限定特定状态的人员移动
        switch = self.random_switch(x=x)
        movement[switch == 0] = 0
        self._people = self._people + movement

这里还需要增加一个控制移动意向的选项,仍然是利用了正态分布概率。考虑到这种场景有可能会重用,所以特地把这个方法提取了出来,生成一个只包含 0 1 的数组充当开关。

    def random_switch(self, x=0.):
        """随机生成开关,0 - 关,1 - 开

        x 大致取值范围 -1.99 - 1.99;
        对应正态分布的概率, 取值 0 的时候对应概率是 50%
        :param x: 控制开关比例
        :return:
        """
        normal = np.random.normal(0, 1, self.count)
        switch = np.where(normal < x, 1, 0)
        return switch

输出结果

有了一切数据和变化之后,接下来最重要的事情自然就是图形化显示结果了。直接使用 matplotlib 的散点图就可以了:

    def report(self):
        plt.cla()
        # plt.grid(False)
        p1 = plt.scatter(self.healthy[:, 0], self.healthy[:, 1], s=1)
        p2 = plt.scatter(self.infected[:, 0], self.infected[:, 1], s=1, c='pink')
        p3 = plt.scatter(self.confirmed[:, 0], self.confirmed[:, 1], s=1, c='red')

        plt.legend([p1, p2, p3], ['healthy', 'infected', 'confirmed'], loc='upper right', scatterpoints=1)
        t = "Round: %s, Healthy: %s, Infected: %s, Confirmed: %s" % \
            (self._round, len(self.healthy), len(self.infected), len(self.confirmed))
        plt.text(-200, 400, t, ha='left', wrap=True)

实际效果

启动。

if __name__ == '__main__':
    np.random.seed(0)
    plt.figure(figsize=(16, 16), dpi=100)
    plt.ion()
    p = People(5000, 3)
    for i in range(100):
        p.update()
        p.report()
        plt.pause(.1)
    plt.pause(3)

因为这个小 demo 主要是个人用来练手,目前一些参数没有完全抽出来。有需要的只能直接改源码。

后记

从多次实验的结果,通过调整人员的流动意愿,流动距离等因素,是可以得到直观的结论的。

本人也是初次使用 numpymatplotlib,现学现卖,若有使用不当之处请指正。其中的概率参数设置 基本没有科学依据,仅供 Python 爱好者参考。

总得来说,用 numpy 来模拟病毒感染情况应该是能行得通的。但是其中的影响因子还需要仔细设计。性能也是需要考量的问题。

源码地址

愿疫情能早日过去,武汉加油,中国加油 💪


如果本文对你有帮助,请 点赞分享关注,谢谢!

posted @ 2020-02-14 14:52  DavyCloud  阅读(...)  评论(...编辑  收藏