DistributedRedisLock.class.php 5.08 KB
<?php
/**
 * Redis 分布式锁
 * 注意: Redis 版本必须大于等于 2.6.12
 * Created by PhpStorm.
 * User: zhoutao
 * Date: 2018/1/31
 * Time: 下午6:30
 */

namespace Org\Util;

class DistributedRedisLock
{
    /**
     * 重试等待
     * @var int
     */
    private $retryDelay;

    /**
     * 重试次数
     * @var int
     */
    private $retryCount;

    /**
     * 漂移因子
     * @var float
     */
    private $clockDriftFactor = 0.01;

    /**
     * 权重数量
     * @var mixed
     */
    private $quorum;

    /**
     * Redis 服务器实例
     * @var array
     */
    private $instances = [];

    /**
     * Redis 服务器配置
     * @var array
     */
    private $servers = [];

    /**
     * DistributedRedisLock constructor.
     * @param array $servers
     * host     Redis 地址
     * port     Redis 端口
     * pwd      Redis 秘钥
     * timeout  断开时长
     *              [
     *                  ['host 127.0.0.1', 'port 6379', 'pwd 123456', 'timeout 0.0'],
     *                  ['host 127.0.0.1', 'port 6379', 'pwd 123456', 'timeout 0.0'],
     *              ]
     * @param int   $retryDelay 重试等待
     * @param int   $retryCount 重试次数
     */
    function __construct(array $servers, $retryDelay = 200, $retryCount = 3)
    {
        if (! extension_loaded('redis')) {
            E(L('_NOT_SUPPORT_') . ':redis');
        }

        $this->servers = $servers;
        $this->retryDelay = $retryDelay;
        $this->retryCount = $retryCount;

        // 需要抢到的 Redis 数量 (权重)
        $this->quorum = min(count($servers), (count($servers) / 2 + 1));
    }

    /**
     * 分发锁
     * @param String $resource Redis Key (资源名称)
     * @param Int $ttl 有效时间 (毫秒)
     * @return array|bool
     */
    public function lock($resource, $ttl)
    {
        $this->initInstances();

        // 唯一标识
        $token = uniqid();
        // 重试次数 (防止污染)
        $retry = $this->retryCount;

        do {
            $n = 0;

            $startTime = microtime(true) * 1000;

            foreach ($this->instances as $instance) {
                if ($this->lockInstance($instance, $resource, $token, $ttl)) {
                    // 抢到锁
                    $n ++;
                }
            }

            // 通过有效时间 计算偏差因素
            $drift = ($ttl * $this->clockDriftFactor) + 2;
            // 超时
            $validityTime = $ttl - (microtime(true) * 1000 - $startTime) - $drift;

            // 所有 Redis 获得锁
            if ($n >= $this->quorum && $validityTime > 0) {
                return [
                    'validity' => $validityTime,
                    'resource' => $resource,
                    'token' => $token,
                ];

            } else {
                // 解锁 获得的锁
                foreach ($this->instances as $instance) {
                    $this->unlockInstance($instance, $resource, $token);
                }
            }

            // 等待一个随机事件来重试
            $delay = mt_rand(floor($this->retryDelay / 2), $this->retryDelay);
            usleep($delay * 1000);

            $retry --;

        } while ($retry > 0);

        return false;
    }

    /**
     * 解锁
     * @param array $lock
     */
    public function unlock(array $lock)
    {
        $this->initInstances();

        $resource = $lock['resource'];
        $token = $lock['token'];

        foreach ($this->instances as $instance) {
            $this->unlockInstance($instance, $resource, $token);
        }
    }

    /**
     * 初始化 Redis 集群链接
     */
    private function initInstances()
    {
        if (empty($this->instances)) {
            foreach ($this->servers as $server) {
                if (is_array($server)) {
                    list($host, $port, $pwd, $timeout) = $server;
                    $redis = new \Redis();
                    $redis->connect($host, $port, $timeout);
                    // 需要密码时
                    if (! empty($pwd)) {
                        $redis->auth($pwd);
                    }
                } elseif ($server instanceof \Redis) {
                    $redis = $server;
                } else {
                    E('Redis Init Error');
                }

                $this->instances[] = $redis;
            }
        }
    }

    /**
     * 获得锁
     * @param $instance
     * @param $resource
     * @param $token
     * @param $ttl
     * @return mixed
     */
    private function lockInstance($instance, $resource, $token, $ttl)
    {
        return $instance->set($resource, $token, ['NX', 'PX' => $ttl]);
    }

    /**
     * 解锁
     * @param $instance
     * @param $resource
     * @param $token
     * @return mixed
     */
    private function unlockInstance($instance, $resource, $token)
    {
        $script = '
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        ';

        return $instance->eval($script, [$resource, $token], 1);
    }
}