通过 lua ,在 nginx 里结合 redis 进行请求频率等判断。防止请求进入服务层并压垮服务。
这里的判断逻辑很简单,判定是非法请求的依据有两点:
相关IP及请求数都记在 redis 里。
server {
listen 80;
server_name lua.test.cn;
root /home/httpd/sites/web;
location /lua {
default_type 'text/html';
lua_code_cache off;
access_by_lua_file /usr/local/openresty/nginx/conf/limits/access_block.lua;
}
}
access_block.lua
local config = require("common_config")
local redis = require("ngx_redis")
--封IP规则:一分钟访问超过 40次; 一小时访问超过 500 次
ip_block_time = 600 --封禁IP时间
ip_time_out = 60 --指定ip访问频率时间段
connect_count = 40 --指定ip访问频率计数最大值
ip_time_out_h = 3600
connect_count_h = 500
--连接redis
local cache = redis:create_connection()
if cache == 0 then
goto A
end
--白名单处理.如果IP在白名单内,直接放过
white_ip = {"100.97.", "123.125.71.", "42.236."}
is_white = "0"
for k, val in pairs(white_ip) do
local r = ngx.re.match(ngx.var.remote_addr, "^"..val..".*?$")
if r then
is_white = "1"
local ok, err = cache:close()
break;
end
end
if is_white == "0" then
--查询ip是否在封禁段内,若在则返回403错误代码
--因封禁时间会大于ip记录时间,故此处不对ip时间key和计数key做处理
is_block , err = cache:get("block_"..ngx.var.remote_addr)
if is_block == "1" then
ngx.exit(403)
goto A
end
start_time , err = cache:get("time_"..ngx.var.remote_addr)
ip_count , err = cache:get("count_"..ngx.var.remote_addr)
--如果ip记录时间大于指定时间间隔或者记录时间或者不存在ip时间key则重置时间key和计数key
--如果ip时间key小于时间间隔,则ip计数+1,且如果ip计数大于ip频率计数,则设置ip的封禁key为1
--同时设置封禁key的过期时间为封禁ip的时间
if start_time == ngx.null or os.time() - start_time > ip_time_out then
res , err = cache:set("time_"..ngx.var.remote_addr , os.time())
res , err = cache:set("count_"..ngx.var.remote_addr , 1)
else
ip_count = ip_count + 1
res , err = cache:incr("count_"..ngx.var.remote_addr)
if ip_count >= connect_count then
res , err = cache:set("block_"..ngx.var.remote_addr,1)
res , err = cache:expire("block_"..ngx.var.remote_addr,ip_block_time)
end
end
--查询ip是否在封禁段内,若在则返回403错误代码
--因封禁时间会大于ip记录时间,故此处不对ip时间key和计数key做处理
is_blockh_h , err_h = cache:get("block_h_"..ngx.var.remote_addr)
if is_block_h == "1" then
ngx.exit(403)
goto A
end
start_time_h , err_h = cache:get("time_h_"..ngx.var.remote_addr)
ip_count_h , err_h = cache:get("count_h_"..ngx.var.remote_addr)
--如果ip记录时间大于指定时间间隔或者记录时间或者不存在ip时间key则重置时间key和计数key
--如果ip时间key小于时间间隔,则ip计数+1,且如果ip计数大于ip频率计数,则设置ip的封禁key为1
--同时设置封禁key的过期时间为封禁ip的时间
if start_time_h == ngx.null or os.time() - start_time_h > ip_time_out_h then
res_h , err_h = cache:set("time_h_"..ngx.var.remote_addr , os.time())
res_h , err_h = cache:set("count_h_"..ngx.var.remote_addr , 1)
else
ip_count_h = ip_count_h + 1
res_h , err_h = cache:incr("count_h_"..ngx.var.remote_addr)
if ip_count_h >= connect_count_h then
res_h , err_h = cache:set("block_h_"..ngx.var.remote_addr,1)
res_h , err_h = cache:expire("block_h_"..ngx.var.remote_addr,ip_block_time)
end
end
end
--结尾标记
::A::
local ok, err = cache:close()
可以看出,代码里引用了两个其它的模块,common_config 和 ngx_redis。它们代码分别是:
common_config.lua:
--全局配置
--局部变量.模块名称
local _M = {}
function _M.getRedisHost(self)
--return "100.92.113.11"
return "f2f163efe1ic11g4.m.cnhza.kvstore.aliyuncs.com"
end
function _M.getRedisPort(self)
return "6379"
end
function _M.getRedisPwd(self)
return "f2f163efe1ic11g4:yourpwd"
end
return _M
ngx_redis.lua:
--Nginx-redis 封装组件.提供连接池之类的功能
local config = require("common_config")
local redis = require("resty.redis")
--局部变量.模块名称
local _M = {}
function _M.create_connection(self)
local host = config:getRedisHost()
local port = config:getRedisPort()
local pwd = config:getRedisPwd()
-- ngx.say(host, " : ", port)
--创建实例
local red = redis:new()
--设置超时(毫秒)
red:set_timeout(1000)
--建立连接
local ok, err = red:connect(host, port)
red:auth(pwd)
if not ok then
ngx.say("connect to redis error : ", err)
return 0
--return close_connection(red)
end
-- ngx.say("connect success!", host, port)
return red
end
function _M.close_connection(self, conn)
if not conn then
return
end
local ok, err = conn:close()
if not ok then
return 0
end
return 1
--释放连接(连接池实现)
--local pool_max_idle_time = 100000 --毫秒
--local pool_size = 100 --连接池大小
--local ok, err = conn:set_keepalive(pool_max_idle_time, pool_size)
--if not ok then
-- return 0
--end
--return 1
end
return _M
可以看到,这里自己做了连接池管理。它是以 resty.redis 为基础的。所以要先安装这个模块。
OpenResty, 其是由Nginx核心加很多第三方模块组成,其最大的亮点是默认集成了Lua开发环境,使得Nginx可以作为一个Web Server使用。
借助于 Nginx 的事件驱动模型和非阻塞IO,可以实现高性能的Web应用程序。而且OpenResty提供了大量组件如Mysql、Redis、Memcached等等,使在Nginx上开发Web应用更方便更简单。
目前在京东如实时价格、秒杀、动态服务、单品页、列表页等都在使用 Nginx + Lua 架构,其他公司如淘宝、去哪儿网等。
mkdir -p /data/soft/
cd /data/soft/
yum install readline-devel pcre-devel openssl-devel gcc
wget https://openresty.org/download/ngx_openresty-1.9.3.1.tar.gz
tar -zxvf ngx_openresty-1.9.3.1.tar.gz
cd ngx_openresty-1.9.3.1
ngx_openresty-1.9.3.1/bundle 目录里存放着nginx核心和很多第三方模块,比如有我们需要的Lua和LuaJIT
LuaJIT
cd bundle/LuaJIT-2.1-20150622/
make clean && make && make install
ln -sf luajit-2.1.0-alpha /usr/local/bin/luajit
ngx_cache_purge
该模块用于清理nginx缓存
cd /data/soft/ngx_openresty-1.9.3.1/bundle/
wget https://github.com/FRiCKLE/ngx_cache_purge/archive/master.zip
unzip master.zip
rm -rf master.zip
nginx_upstream_check_module
该模块用于ustream健康检查
cd /data/soft/ngx_openresty-1.9.3.1/bundle/
wget https://github.com/yaoweibin/nginx_upstream_check_module/archive/master.zip
unzip master.zip
rm -rf master.zip
ngx_openresty
cd /data/soft/ngx_openresty-1.9.3.1
./configure --prefix=/usr/local --with-http_realip_module --with-pcre --with-luajit --add-module=./bundle/ngx_cache_purge-master/ --add-module=./bundle/nginx_upstream_check_module-master/ -j2
make && make install
cd /usr/local
ll
如果发现有如下目录,说明安装成功:
luajit, lualib, nginx
通过 /usr/local/nginx/sbin/nginx -V 查看nginx版本和安装的模块
/usr/local/nginx/sbin/nginx
配置及Nginx HttpLuaModule文档在可以查看 http://wiki.nginx.org/HttpLuaModule
vim /usr/local/nginx/conf/nginx.conf
在http部分添加如下配置
#lua模块路径,多个之间”;”分隔,其中”;;”表示默认搜索路径,默认到 /usr/local/nginx 下找
lua_package_path "/usr/local/lualib/?.lua;;"; #lua 模块
lua_package_cpath "/usr/local/lualib/?.so;;"; #c模块
为了方便开发我们在 /usr/local/nginx/conf 目录下创建一个 lua.conf 内容:
server {
listen 80;
server_name _;
}
在 nginx.conf 中的 http 部分添加 include lua.conf 包含此文件片段
include lua.conf;
测试是否正常
/usr/local/nginx/sbin/nginx -t
如果显示如下内容说明配置成功
nginx: the configuration file /usr/local/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /usr/local/nginx/conf/nginx.conf test is successful
在 lua.conf 中 server 部分添加如下配置
location /lua {
default_type 'text/html';
content_by_lua 'ngx.say("hello world!")';
}
测试配置是否正确
/usr/local/nginx/sbin/nginx -t
重启nginx
/usr/local/nginx/sbin/nginx -s reload
访问如 http://115.236.185.12/lua(自己的机器根据实际情况换ip),可以看到如下内容
hello world!
我们把lua代码放在nginx配置中会随着lua的代码的增加导致配置文件太长不好维护,因此我们应该把lua代码移到外部文件中存储。
mkdir -p /usr/local/nginx/conf/lua/
vim /usr/local/nginx/conf/lua/test.lua
添加如下内容
ngx.say("hello world");
然后 lua.conf 修改为
location /lua {
default_type 'text/html';
content_by_lua_file conf/lua/test.lua; #相对于nginx安装目录
}
此处 conf/lua/test.lua 也可以使用绝对路径 /usr/local/nginx/conf/lua/test.lua。
重启 nginx 可看效果。
默认情况下 lua_code_cache 是开启的,即缓存lua代码,即每次lua代码变更必须reload nginx才生效。
如果在开发阶段可以通过lua_code_cache off;关闭缓存,这样调试时每次修改lua代码不需要reload nginx;但是正式环境一定记得开启缓存。
location /lua {
default_type 'text/html';
lua_code_cache off;
content_by_lua_file conf/lua/test.lua;
}
开启后reload nginx会看到如下报警
nginx: [alert] lua_code_cache is off; this will hurt performance in /usr/local/nginx/conf/lua.conf:6
和一般的Web Server类似,我们需要接收请求、处理并输出响应。而对于请求我们需要获取如请求参数、请求头、Body体等信息;
而对于处理就是调用相应的Lua代码即可;输出响应需要进行响应状态码、响应头和响应内容体的输出。 因此我们从如上几个点出发即可。
vim /usr/local/nginx/conf/lua.conf
在 server 域里添加:
location ~ /lua_request/(\d+)/(\d+) {
#设置nginx变量
set $a $1;
set $b $host;
default_type "text/html";
#nginx内容处理
content_by_lua_file /usr/local/nginx/conf/lua/test_request.lua;
#内容体处理完成后调用
echo_after_body "ngx.var.b $b";
}
vim /usr/local/nginx/conf/lua/test_request.lua 编辑:
--nginx变量
local var = ngx.var
ngx.say("ngx.var.a : ", var.a, "<br/>")
ngx.say("ngx.var.b : ", var.b, "<br/>")
ngx.say("ngx.var[2] : ", var[2], "<br/>")
ngx.var.b = 2;
ngx.say("<br/>")
--请求头
local headers = ngx.req.get_headers()
ngx.say("headers begin", "<br/>")
ngx.say("Host : ", headers["Host"], "<br/>")
ngx.say("user-agent : ", headers["user-agent"], "<br/>")
ngx.say("user-agent : ", headers.user_agent, "<br/>")
for k,v in pairs(headers) do
if type(v) == "table" then
ngx.say(k, " : ", table.concat(v, ","), "<br/>")
else
ngx.say(k, " : ", v, "<br/>")
end
end
ngx.say("headers end", "<br/>")
ngx.say("<br/>")
--get请求uri参数
ngx.say("uri args begin", "<br/>")
local uri_args = ngx.req.get_uri_args()
for k, v in pairs(uri_args) do
if type(v) == "table" then
ngx.say(k, " : ", table.concat(v, ", "), "<br/>")
else
ngx.say(k, ": ", v, "<br/>")
end
end
ngx.say("uri args end", "<br/>")
ngx.say("<br/>")
--post请求参数
ngx.req.read_body()
ngx.say("post args begin", "<br/>")
local post_args = ngx.req.get_post_args()
for k, v in pairs(post_args) do
if type(v) == "table" then
ngx.say(k, " : ", table.concat(v, ", "), "<br/>")
else
ngx.say(k, ": ", v, "<br/>")
end
end
ngx.say("post args end", "<br/>")
ngx.say("<br/>")
--请求的http协议版本
ngx.say("ngx.req.http_version : ", ngx.req.http_version(), "<br/>")
--请求方法
ngx.say("ngx.req.get_method : ", ngx.req.get_method(), "<br/>")
--原始的请求头内容
ngx.say("ngx.req.raw_header : ", ngx.req.raw_header(), "<br/>")
--请求的body内容体
ngx.say("ngx.req.get_body_data() : ", ngx.req.get_body_data(), "<br/>")
ngx.say("<br/>")
说明:
ngx.var
nginx变量,如果要赋值如ngx.var.b = 2,此变量必须提前声明; 另外对于nginx location中使用正则捕获的捕获组可以使用ngx.var[捕获组数字]获取;
ngx.req.get_headers
获取请求头,默认只获取前100,如果想要获取所有可以调用ngx.req.get_headers(0); 获取请求头时请使用如headers.user_agent这种方式;如果一个请求头有多个值,则返回的是table;
ngx.req.get_uri_args
获取url请求参数,其用法和get_headers类似;
ngx.req.get_post_args
获取post请求内容,其用法和get_headers类似,但是必须提前调用ngx.req.read_body()来读取body体
ngx.req.raw_header
未解析的请求头字符串;
ngx.req.get_body_data
为解析的请求body体内容字符串。
测试: 重启 nginx
/usr/local/nginx/sbin/nginx -s reload
然后访问 :
wget --post-data 'a=1&b=2' 'http://115.236.185.12/lua_request/1/2?a=3&b=4' -O -
响应如下:
gx.var.a : 1<br/>
ngx.var.b : 115.236.185.12<br/>
ngx.var[2] : 2<br/>
<br/>
headers begin<br/>
Host : 115.236.185.12<br/>
user-agent : Wget/1.12 (linux-gnu)<br/>
user-agent : Wget/1.12 (linux-gnu)<br/>
host : 115.236.185.12<br/>
content-type : application/x-www-form-urlencoded<br/>
connection : Keep-Alive<br/>
accept : */*<br/>
content-length : 7<br/>
user-agent : Wget/1.12 (linux-gnu)<br/>
headers end<br/>
<br/>
uri args begin<br/>
b: 4<br/>
a: 3<br/>
uri args end<br/>
<br/>
post args begin<br/>
b: 2<br/>
a: 1<br/>
post args end<br/>
<br/>
ngx.req.http_version : 1<br/>
ngx.req.get_method : POST<br/>
ngx.req.raw_header : POST /lua_request/1/2?a=3&b=4 HTTP/1.0
User-Agent: Wget/1.12 (linux-gnu)
Accept: */*
Host: 115.236.185.12
Connection: Keep-Alive
Content-Type: application/x-www-form-urlencoded
Content-Length: 7
<br/>
ngx.req.get_body_data() : a=1&b=2<br/>
<br/>
ngx.var.b 2
可以看到在 lua 中获得了 url 中用正则匹配出来的值, post 的值, host 等各个值
vim /usr/local/nginx/conf/lua.conf
在 server 域里添加:
location /lua_response_1 {
default_type "text/html";
content_by_lua_file /usr/local/nginx/conf/lua/test_response_1.lua;
}
vim /usr/local/nginx/conf/lua/test_response_1.lua
编辑:
--写响应头
ngx.header.a = "1"
--多个响应头可以使用table
ngx.header.b = {"2", "3"}
--输出响应
ngx.say("a", "b", "<br/>")
ngx.print("c", "d", "<br/>")
--200状态码退出
return ngx.exit(200)
说明:
ngx.header:输出响应头;
ngx.print:输出响应内容体;
ngx.say:通ngx.print,但是会最后输出一个换行符;
ngx.exit:指定状态码退出。
测试: 重启 nginx:
/usr/local/nginx/sbin/nginx -s reload
然后访问 :
http://115.236.185.12/lua_response_1
vim /usr/local/nginx/conf/lua.conf
在 server 域里添加:
location /lua_response_2 {
default_type "text/html";
content_by_lua_file /usr/local/nginx/conf/lua/test_response_2.lua;
}
vim /usr/local/nginx/conf/lua/test_response_2.lua
编辑:
ngx.redirect("http://www.edeng.cn", 302)
说明:
ngx.redirect:重定向
测试: 重启 nginx:
/usr/local/nginx/sbin/nginx -s reload
然后访问 :
http://115.236.185.12/lua_response_2
vim /usr/local/nginx/conf/lua.conf
在 server 域里添加:
location /lua_other {
default_type "text/html";
content_by_lua_file /usr/local/nginx/conf/lua/lua_other.lua;
}
vim /usr/local/nginx/conf/lua/lua_other.lua
编辑:
--未经解码的请求uri
local request_uri = ngx.var.request_uri;
ngx.say("request_uri : ", request_uri, "<br/>");
--解码
ngx.say("decode request_uri : ", ngx.unescape_uri(request_uri), "<br/>");
--MD5
ngx.say("ngx.md5 : ", ngx.md5("123"), "<br/>")
--http time
ngx.say("ngx.http_time : ", ngx.http_time(ngx.time()), "<br/>")
说明:
ngx.escape_uri/ngx.unescape_uri : uri编码解码;
ngx.encode_args/ngx.decode_args: 参数编码解码;
ngx.encode_base64/ngx.decode_base64: BASE64编码解码;
ngx.re.match: nginx正则表达式匹配;
测试: 重启 nginx:
/usr/local/nginx/sbin/nginx -s reload
然后访问 :
http://115.236.185.12/lua_other
Nginx是一个Master进程多个Worker进程的工作方式,因此我们可能需要在多个Worker进程中共享数据,那么此时就可以使用 ngx.shared.DICT 来实现全局内存共享
首先在 nginx.conf 的http部分分配内存大小
# 共享全局变量,在所有worker间共享
lua_shared_dict shared_data 1m;
vim /usr/local/nginx/conf/lua.conf
在 server 域里添加:
location /lua_shared_dict {
default_type "text/html";
content_by_lua_file /usr/local/nginx/conf/lua/test_lua_shared_dict.lua;
}
vim /usr/local/nginx/conf/lua/test_lua_shared_dict.lua
编辑:
--1、获取全局共享内存变量
local shared_data = ngx.shared.shared_data
--2、获取字典值
local i = shared_data:get("i")
if not i then
i = 1
--3、惰性赋值
shared_data:set("i", i)
ngx.say("lazy set i ", i, "<br/>")
end
--递增
i = shared_data:incr("i", 1)
ngx.say("i=", i, "<br/>")
测试: 重启 nginx:
/usr/local/nginx/sbin/nginx -s reload
然后访问 :
http://115.236.185.12/lua_shared_dict
每访问一次,数值就会加 1。
Nginx共11个处理阶段,而相应的处理阶段是可以做插入式 lua 处理.
另外指令可以在http、server、server if、location、location if
几个范围进行配置
在各个阶段里,可执行的 lua 指令非常多,常用的有:
每次Nginx重新加载配置时执行,可以用它来完成一些耗时模块的加载,或者初始化一些全局配置;
nginx.conf配置文件中的http部分添加如下代码
#共享全局变量,在所有worker间共享
lua_shared_dict shared_data 1m;
init_by_lua_file /usr/local/nginx/conf/lua/init.lua;
init.lua:
--初始化耗时的模块
local redis = require 'resty.redis'
local cjson = require 'cjson'
--全局变量,不推荐
count = 1
--共享全局内存
local shared_data = ngx.shared.shared_data
shared_data:set("count", 1)
测试代码:
count = count + 1
ngx.say("global variable : ", count)
local shared_data = ngx.shared.shared_data
ngx.say(", shared memory : ", shared_data:get("count"))
shared_data:incr("count", 1)
ngx.say("hello world")
访问时会发现全局变量一直不变,而共享内存一直递增
global variable : 2 , shared memory : 8 hello world
用于启动一些定时任务,比如心跳检查,定时拉取服务器配置等等; 此处的任务是跟Worker进程数量有关系的,比如有2个Worker进程那么就会启动两个完全一样的定时任务。
nginx.conf配置文件中的http部分添加如下代码
init_worker_by_lua_file /usr/local/nginx/conf/lua/init_worker.lua;
init_worker.lua:
local count = 0
local delayInSeconds = 3
local heartbeatCheck = nil
heartbeatCheck = function(args)
count = count + 1
ngx.log(ngx.ERR, "do check ", count)
local ok, err = ngx.timer.at(delayInSeconds, heartbeatCheck)
if not ok then
ngx.log(ngx.ERR, "failed to startup heartbeart worker...", err)
end
end
heartbeatCheck()
ngx.timer.at: 延时调用相应的回调方法;
ngx.timer.at(秒单位延时,回调函数,回调函数的参数列表);
可以将延时设置为0即得到一个立即执行的任务,任务不会在当前请求中执行不会阻塞当前请求,而是在一个轻量级线程中执行。
另外根据实际情况设置如下指令
lua_max_pending_timers 1024; #最大等待任务数
lua_max_running_timers 256; #最大同时运行任务数
设置nginx变量,我们用的set指令即使配合if指令也很难实现负责的赋值逻辑
location /lua_set_1 {
default_type "text/html";
set_by_lua_file $num /usr/local/nginx/conf/lua/test_set_1.lua;
echo $num;
}
test_set_1.lua:
local uri_args = ngx.req.get_uri_args()
local i = uri_args["i"] or 0
local j = uri_args["j"] or 0
return i + j
我们实际工作时经常涉及到网站改版,有时候需要新老并存,或者切一部分流量到新版
首先在 example.conf 中使用map指令来映射host到指定nginx变量
############ 测试时使用的动态请求
map $host $item_dynamic {
default "0";
item2014.jd.com "1";
}
绑定hosts
115.236.185.12 item.jd.com;
115.236.185.12 item2014.jd.com;
此时我们想访问item2014.jd.com时访问新版,那么我们可以简单的使用如:
if ($item_dynamic = "1") {
proxy_pass http://new;
}
proxy_pass http://old;
但是我们想把商品编号为为8位(比如品类为图书的)没有改版完成,需要按照相应规则跳转到老版,但是其他的到新版;
虽然使用if指令能实现,但是比较麻烦,基本需要这样:
set jump "0";
if($item_dynamic = "1") {
set $jump "1";
}
if(uri ~ "^/6[0-9]{7}.html") {
set $jump "${jump}2";
}
# 非强制访问新版,且访问指定范围的商品
if (jump == "02") {
proxy_pass http://old;
}
proxy_pass http://new;
以上规则还是比较简单的,如果涉及到更复杂的多重if/else或嵌套if/else实现起来就更痛苦了,可能需要到后端去做了;此时我们就可以借助lua了:
set_by_lua $to_book '
local ngx_match = ngx.re.match
local var = ngx.var
local skuId = var.skuId
local r = var.item_dynamic ~= "1" and ngx.re.match(skuId, "^[0-9]{8}$")
if r then return "1" else return "0" end;
';
set_by_lua $to_mvd '
local ngx_match = ngx.re.match
local var = ngx.var
local skuId = var.skuId
local r = var.item_dynamic ~= "1" and ngx.re.match(skuId, "^[0-9]{9}$")
if r then return "1" else return "0" end;
';
#自营图书
if ($to_book) {
proxy_pass http://127.0.0.1/old_book/$skuId.html;
}
#自营音像
if ($to_mvd) {
proxy_pass http://127.0.0.1/old_mvd/$skuId.html;
}
#默认
proxy_pass http://127.0.0.1/proxy/$skuId.html;
执行内部URL重写或者外部重定向,典型的如伪静态化的URL重写。其默认执行在rewrite处理阶段的最后
location /lua_rewrite_1 {
default_type "text/html";
rewrite_by_lua_file /usr/local/nginx/conf/lua/test_rewrite_1.lua;
echo "no rewrite";
}
test_rewrite_1.lua:
if ngx.req.get_uri_args()["jump"] == "1" then
return ngx.redirect("http://www.edeng.cn?jump=1", 302)
end
请求 http://115.236.185.12/lua_rewrite_1 时发现没有跳转,而请求 http://115.236.185.12/lua_rewrite_1?jump=1 时发现跳转到易登首页了。
此处需要301/302跳转根据自己需求定义
location /lua_rewrite_2 {
default_type "text/html";
rewrite_by_lua_file /usr/local/nginx/conf/lua/test_rewrite_2.lua;
echo "rewrite2 uri : $uri, a : $arg_a";
}
test_rewrite_2.lua:
if ngx.req.get_uri_args()["jump"] == "1" then
ngx.req.set_uri("/lua_rewrite_3", false);
ngx.req.set_uri("/lua_rewrite_4", false);
ngx.req.set_uri_args({a = 1, b = 2});
end
ngx.req.set_uri(uri, false):可以内部重写uri(可以带参数),等价于 rewrite ^ /lua_rewrite_3;
通过配合if/else可以实现
rewrite ^ /lua_rewrite_3 break;
这种功能;此处两者都是location内部url重写,不会重新发起新的location匹配;
ngx.req.set_uri_args:重写请求参数,可以是字符串(a=1&b=2)也可以是table;
访问如 http://115.236.185.12/lua_rewrite_2?jump=0 时得到响应
rewrite2 uri : /lua_rewrite_2, a :
访问如 http://115.236.185.12/lua_rewrite_2?jump=1 时得到响应
rewrite2 uri : /lua_rewrite_4, a : 1
location /lua_rewrite_3 {
default_type "text/html";
rewrite_by_lua_file /usr/local/nginx/conf/lua/test_rewrite_3.lua;
echo "rewrite3 uri : $uri";
}
test_rewrite_3.lua:
if ngx.req.get_uri_args()["jump"] == "1" then
ngx.req.set_uri("/lua_rewrite_4", true);
ngx.log(ngx.ERR, "=========")
ngx.req.set_uri_args({a = 1, b = 2});
end
ngx.req.set_uri(uri, true):可以内部重写uri,即会发起新的匹配location请求,等价于
rewrite ^ /lua_rewrite_4 last;
此处看error log是看不到我们记录的log。
所以请求如 http://115.236.185.12/lua_rewrite_3?jump=1 会到新的location中得到响应,此处没有/lua_rewrite_4,所以匹配到/lua请求,得到类似如下的响应
global variable : 2 , shared memory : 1 hello world
用于访问控制,比如我们只允许内网ip访问,可以使用如下形式
allow 127.0.0.1;
allow 10.0.0.0/8;
allow 192.168.0.0/16;
allow 172.16.0.0/12;
deny all;
location /lua_access {
default_type "text/html";
access_by_lua_file /usr/local/nginx/conf/lua/test_access.lua;
echo "access";
}
test_access.lua:
if ngx.req.get_uri_args()["token"] ~= "123" then
return ngx.exit(403)
end
即如果访问如 http://115.236.185.12/lua_access?token=234 将得到403 Forbidden的响应。
这样我们可以根据如 cookie/用户token 来决定是否有访问权限
此指令之前已经用过了
在实际开发中,不可能把所有代码写到一个大而全的lua文件中,需要进行分模块开发;而且模块化是高性能Lua应用的关键。
使用require第一次导入模块后,所有Nginx 进程全局共享模块的数据和代码。
每个Worker进程需要时会得到此模块的一个副本(Copy-On-Write),即模块可以认为是每Worker进程共享而不是每Nginx Server共享;
另外注意之前我们使用init_by_lua中初始化的全局变量是每请求复制一个;如果想在多个Worker进程间共享数据可以使用ngx.shared.DICT或如Redis之类的存储
在/usr/example/lualib中已经提供了大量第三方开发库如cjson、redis客户端、mysql客户端
需要注意在使用前需要将库在nginx.conf中导入:
#lua模块路径,其中”;;”表示默认搜索路径,默认到/usr/local/nginx下找
lua_package_path "/usr/example/lualib/?.lua;;"; #lua 模块
lua_package_cpath "/usr/example/lualib/?.so;;"; #c模块
使用方式是在lua中通过如下方式引入
local cjson = require(“cjson”)
local redis = require(“resty.redis”)
接下来我们来开发一个简单的lua模块。
vim /usr/local/lualib/module1.lua
local count = 0
local function hello()
count = count + 1
ngx.say("count : ", count)
end
local _M = {
hello = hello
}
return _M
开发时将所有数据做成局部变量/局部函数;通过 _M导出要暴露的函数,实现模块化封装。
接下来创建 test_module_1.lua
vim /usr/local/nginx/conf/lua/test_module_1.lua
local module1 = require("module1")
module1.hello()
使用 local var = require(“模块名”)
,该模块会到 lua_package_path
和lua_package_cpath
声明的的位置查找我们的模块,对于多级目录的使用 require(“目录1.目录2.模块名”)
加载。
lua.conf 配置
location /lua_module_1 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_module_1.lua;
}
访问如 http://115.236.185.12/lua_module_1
进行测试,会得到类似如下的数据,count会递增
count : 1
count :2
……
count :N
此时可能发现count一直递增,假设我们的worker_processes 2,我们可以通过 kill -9 nginx worker process
杀死其中一个Worker进程得到count数据变化。
假设我们创建了vim /usr/local/lualib/test/module2.lua 模块,可以通过local module2 = require(“test.module2”)
加载模块
基本的模块开发就完成了,如果是只读数据可以通过模块中声明local变量存储;如果想在每Worker进程共享,请考虑竞争;如果要在多个Worker进程间共享请考虑使用 ngx.shared.DICT 或如 Redis存储。
对于开发来说需要有好的生态开发库来辅助我们快速开发,而Lua中也有大多数我们需要的第三方开发库如Redis、Memcached、Mysql、Http客户端、JSON、模板引擎等。
一些常见的Lua库可以在github上搜索,https://github.com/search?utf8=%E2%9C%93&q=lua+resty
lua-resty-redis
是为基于cosocket API的ngx_lua提供的Lua redis客户端,通过它可以完成Redis的操作。默认安装OpenResty时已经自带了该模块。
使用文档可参考 https://github.com/openresty/lua-resty-redis
在测试之前先安装并启动Redis实例 http://blog.sina.com.cn/s/blog_5f54f0be0101bym4.html
基本操作
编辑 test_redis_baisc.lua
local function close_redis(red)
if not red then
return
end
local ok, err = red:close()
if not ok then
ngx.say("close redis error : ", err)
end
end
local redis = require("resty.redis")
--创建实例
local red = redis:new()
--设置超时(毫秒)
red:set_timeout(1000)
--建立连接
local ip = "127.0.0.1"
local port = 3690
local ok, err = red:connect(ip, port)
if not ok then
ngx.say("connect to redis error : ", err)
return close_redis(red)
end
--调用API进行处理
ok, err = red:set("msg", "hello world")
if not ok then
ngx.say("set msg error : ", err)
return close_redis(red)
end
--调用API获取数据
local resp, err = red:get("msg")
if not resp then
ngx.say("get msg error : ", err)
return close_reedis(red)
end
--得到的数据为空处理
if resp == ngx.null then
resp = '' --比如默认值
end
ngx.say("msg : ", resp)
close_redis(red)
基本逻辑很简单,要注意此处判断是否为nil,需要跟ngx.null比较。
lua.conf 配置文件
location /lua_redis_basic {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_redis_basic.lua;
}
访问如 http://192.168.1.2/lua_redis_basic 进行测试,正常情况得到如下信息
msg : hello world
连接池
建立TCP连接需要三次握手而释放TCP连接需要四次握手,而这些往返仅需要一次,以后应该复用TCP连接,此时就可以考虑使用连接池,即连接池可以复用连接。
只需要将之前的close_redis函数改造为如下即可:
local function close_redis(red)
if not red then
return
end
--释放连接(连接池实现)
local pool_max_idle_time = 10000 --毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.say("set keepalive error : ", err)
end
end
即设置空闲连接超时时间防止连接一直占用不释放;设置连接池大小来复用连接。
此处假设调用 red:set_keepalive(),连接池大小通过 nginx.conf 中http部分的如下指令定义:
#默认连接池大小,默认30
lua_socket_pool_size 30;
#默认超时时间,默认60s
lua_socket_keepalive_timeout 60s;
注意:
pipeline
pipeline即管道,可以理解为把多个命令打包然后一起发送;
MTU(Maxitum Transmission Unit 最大传输单元)为二层包大小,一般为1500字节;而MSS(Maximum Segment Size 最大报文分段大小)为四层包大小,其一般是1500-20(IP报头)-20(TCP报头)=1460字节;
因此假设我们执行的多个Redis命令能在一个报文中传输的话,可以减少网络往返来提高速度。因此可以根据实际情况来选择走pipeline模式将多个命令打包到一个报文发送然后接受响应,而Redis协议也能很简单的识别和解决粘包。
修改之前的代码片段
red:init_pipeline()
red:set("msg1", "hello1")
red:set("msg2", "hello2")
red:get("msg1")
red:get("msg2")
local respTable, err = red:commit_pipeline()
--得到的数据为空处理
if respTable == ngx.null then
respTable = {} --比如默认值
end
--结果是按照执行顺序返回的一个table
for i, v in ipairs(respTable) do
ngx.say("msg : ", v, "<br/>")
end
通过init_pipeline()初始化,然后通过 commit_pipieline() 打包提交init_pipeline()之后的Redis命令;返回结果是一个lua table,可以通过ipairs循环获取结果;
配置相应location,测试得到的结果
msg : OK
msg : OK
msg : hello1
msg : hello2
Redis Lua脚本
利用Redis单线程特性,可以通过在Redis中执行Lua脚本实现一些原子操作。如之前的red:get(“msg”)可以通过如下两种方式实现:
直接eval
local resp, err = red:eval("return redis.call('get', KEYS[1])", 1, "msg");
script load
script load然后evalsha SHA1 校验和,这样可以节省脚本本身的服务器带宽:
local sha1, err = red:script("load", "return redis.call('get', KEYS[1])");
if not sha1 then
ngx.say("load script error : ", err)
return close_redis(red)
end
ngx.say("sha1 : ", sha1, "<br/>")
local resp, err = red:evalsha(sha1, 1, "msg");
首先通过script load导入脚本并得到一个sha1校验和(仅需第一次导入即可),然后通过evalsha执行sha1校验和即可,这样如果脚本很长通过这种方式可以减少带宽的消耗。
另外Redis集群分片算法该客户端没有提供需要自己实现,当然可以考虑直接使用类似于Twemproxy这种中间件实现。
lua-resty-mysql
是为基于cosocket API的ngx_lua提供的Lua Mysql客户端,通过它可以完成Mysql的操作。默认安装OpenResty时已经自带了该模块。
使用文档可参考https://github.com/openresty/lua-resty-mysql
编辑 test_mysql.lua
local function close_db(db)
if not db then
return
end
db:close()
end
local mysql = require("resty.mysql")
--创建实例
local db, err = mysql:new()
if not db then
ngx.say("new mysql error : ", err)
return
end
--设置超时时间(毫秒)
db:set_timeout(1000)
local props = {
host = "127.0.0.1",
port = 3306,
database = "mysql",
user = "root",
password = "123456"
}
local res, err, errno, sqlstate = db:connect(props)
if not res then
ngx.say("connect to mysql error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
--删除表
local drop_table_sql = "drop table if exists test"
res, err, errno, sqlstate = db:query(drop_table_sql)
if not res then
ngx.say("drop table error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
--创建表
local create_table_sql = "create table test(id int primary key auto_increment, ch varchar(100))"
res, err, errno, sqlstate = db:query(create_table_sql)
if not res then
ngx.say("create table error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
--插入
local insert_sql = "insert into test (ch) values('hello')"
res, err, errno, sqlstate = db:query(insert_sql)
if not res then
ngx.say("insert error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
res, err, errno, sqlstate = db:query(insert_sql)
ngx.say("insert rows : ", res.affected_rows, " , id : ", res.insert_id, "<br/>")
--更新
local update_sql = "update test set ch = 'hello2' where id =" .. res.insert_id
res, err, errno, sqlstate = db:query(update_sql)
if not res then
ngx.say("update error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
ngx.say("update rows : ", res.affected_rows, "<br/>")
--查询
local select_sql = "select id, ch from test"
res, err, errno, sqlstate = db:query(select_sql)
if not res then
ngx.say("select error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
for i, row in ipairs(res) do
for name, value in pairs(row) do
ngx.say("select row ", i, " : ", name, " = ", value, "<br/>")
end
end
ngx.say("<br/>")
--防止sql注入
local ch_param = ngx.req.get_uri_args()["ch"] or ''
--使用ngx.quote_sql_str防止sql注入
local query_sql = "select id, ch from test where ch = " .. ngx.quote_sql_str(ch_param)
res, err, errno, sqlstate = db:query(query_sql)
if not res then
ngx.say("select error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
for i, row in ipairs(res) do
for name, value in pairs(row) do
ngx.say("select row ", i, " : ", name, " = ", value, "<br/>")
end
end
--删除
local delete_sql = "delete from test"
res, err, errno, sqlstate = db:query(delete_sql)
if not res then
ngx.say("delete error : ", err, " , errno : ", errno, " , sqlstate : ", sqlstate)
return close_db(db)
end
ngx.say("delete rows : ", res.affected_rows, "<br/>")
close_db(db)
对于新增/修改/删除会返回如下格式的响应:
{
insert_id = 0,
server_status = 2,
warning_count = 1,
affected_rows = 32,
message = nil
}
affected_rows表示操作影响的行数,insert_id是在使用自增序列时产生的id。
对于查询会返回如下格式的响应:
{
{ id= 1, ch= "hello"},
{ id= 2, ch= "hello2"}
}
null将返回ngx.null。
lua.conf配置文件
location /lua_mysql {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_mysql.lua;
}
访问如 http://192.168.1.2/lua_mysql?ch=hello 进行测试,得到如下结果
insert rows : 1 , id : 2
update rows : 1
select row 1 : ch = hello
select row 1 : id = 1
select row 2 : ch = hello2
select row 2 : id = 2
select row 1 : ch = hello
select row 1 : id = 1
delete rows : 2
客户端目前还没有提供预编译SQL支持(即占位符替换位置变量),这样在入参时记得使用ngx.quote_sql_str
进行字符串转义,防止sql注入;
连接池和之前Redis客户端完全一样。
在lua中可以借助第三方库连接 RabbitMQ:https://github.com/wingify/lua-resty-rabbitmqstomp
安装
cd /usr/local/lualib/resty/
下载三方库文件并放在该目录下。要注意的是,使用该库时,rabbitmq 必须先安装 rabbitmq_stomp 插件。安装方式是:
root@dev-localhost # sbin/rabbitmq-plugins enable rabbitmq_stomp
并更改配置文件: /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.config 添加如下内容:
{rabbitmq_stomp,
[
{tcp_listeners,
[
{"127.0.0.1", 61613},
{"::1", 61613}
]
},
{default_user,
[
{login,"sunyu"},
{passcode,"1qazxsw2"}
]
}
]
},
测试 lua 代码:
local strlen = string.len
local cjson = require "cjson"
local rabbitmq = require("resty.rabbitmqstomp")
local opts = {
username = "sunyu",
password = "1qazxsw2",
trailing_lf = "true",
vhost = "/"
}
local mq, err = rabbitmq:new(opts)
if not mq then
ngx.say("Init Error: " .. err)
return
end
mq:set_timeout(10000)
local ok, err = mq:connect('127.0.0.1', 61613)
if not ok then
ngx.say("Connect Error: " .. err)
return
end
ngx.say("Connect Success!")
local msg = {key="value1", key2="value2"}
local headers = {}
headers["destination"] = "/exchange/test_e/test_key"
headers["receipt"] = "test_q"
headers["app-id"] = "test_q"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(cjson.encode(msg), headers)
if not ok then
ngx.say("Send Error: " .. err)
return
end
ngx.say("Published: " .. cjson.encode(msg))
local headers = {}
headers["destination"] = "/amq/queue/test_q"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
ngx.say("Subscribe Error: " .. err)
return
end
local data, err = mq:receive()
if not data then
ngx.say("Receive Error: " .. err)
return
end
ngx.say("Consumed: " .. data)
ngx.header.content_type = "text/plain";
ngx.say(data);
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
ngx.say("Keepalive Error: " .. err)
return
end
OpenResty 默认没有提供Http客户端,需要使用第三方提供
lua-resty-http模块
我们可以从github上搜索相应的客户端,如: https://github.com/pintsized/lua-resty-http
安装
cd /usr/local/lualib/resty/
wget https://raw.githubusercontent.com/pintsized/lua-resty-http/master/lib/resty/http_headers.lua
wget https://raw.githubusercontent.com/pintsized/lua-resty-http/master/lib/resty/http.lua
测试
test_http_1.lua:
local http = require("resty.http")
--创建http客户端实例
local httpc = http.new()
local resp, err = httpc:request_uri("http://s.taobao.com", {
method = "GET",
path = "/search?q=hello",
headers = {
["User-Agent"] = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.111 Safari/537.36"
}
})
if not resp then
ngx.say("request error :", err)
return
end
--获取状态码
ngx.status = resp.status
--获取响应头
for k, v in pairs(resp.headers) do
if k ~= "Transfer-Encoding" and k ~= "Connection" then
ngx.header[k] = v
end
end
--响应体
ngx.say(resp.body)
httpc:close()
响应头中的Transfer-Encoding和Connection可以忽略,因为这个数据是当前server输出的。
lua.conf 配置文件
location /lua_http_1 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_http_1.lua;
}
在nginx.conf中的http部分添加如下指令来做DNS解析
resolver 8.8.8.8;
访问如 http://192.168.1.2/lua_http_1 会看到淘宝的搜索界面。
使用方式比较简单,如超时和连接池设置和之前Redis客户端一样。
更多客户端使用规则请参考https://github.com/pintsized/lua-resty-http。
ngx.location.capture也可以用来完成http请求,但是它只能请求到相对于当前nginx服务器的路径,不能使用之前的绝对路径进行访问,但是我们可以配合nginx upstream实现我们想要的功能。
在nginx.conf中的http部分添加如下upstream配置
upstream backend {
server s.taobao.com;
keepalive 100;
}
即我们将请求upstream到backend;另外记得一定要添加之前的DNS解析器。
在lua.conf配置如下location
location ~ /proxy/(.*) {
internal;
proxy_pass http://backend/$1$is_args$args;
}
internal表示只能内部访问,即外部无法通过url访问进来; 并通过proxy_pass将请求转发到upstream。
test_http_2.lua
local resp = ngx.location.capture("/proxy/search", {
method = ngx.HTTP_GET,
args = {q = "hello"}
})
if not resp then
ngx.say("request error :", err)
return
end
ngx.log(ngx.ERR, tostring(resp.status))
--获取状态码
ngx.status = resp.status
--获取响应头
for k, v in pairs(resp.header) do
if k ~= "Transfer-Encoding" and k ~= "Connection" then
ngx.header[k] = v
end
end
--响应体
if resp.body then
ngx.say(resp.body)
end
通过ngx.location.capture发送一个子请求,此处因为是子请求,所有请求头继承自当前请求。 还有如ngx.ctx和ngx.var是否继承可以参考官方文档http://wiki.nginx.org/ HttpLuaModule#ngx.location.capture。
另外还提供了ngx.location.capture_multi用于并发发出多个请求,这样总的响应时间是最慢的一个,批量调用时有用。
lua.conf 配置文件
location /lua_http_2 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_http_2.lua;
}
访问如http://192.168.1.2/lua_http_2进行测试可以看到淘宝搜索界面。
通过upstream + ngx.location.capture方式虽然麻烦点,但是得到更好的性能和upstream的连接池、负载均衡、故障转移、proxy cache等特性。
在进行数据传输时JSON格式目前应用广泛,因此从Lua对象与JSON字符串之间相互转换是一个非常常见的功能;目前Lua也有几个JSON库,本人用过cjson、dkjson。
其中cjson的语法严格(比如unicode \u0020\u7eaf),要求符合规范否则会解析失败(如\u002),而dkjson相对宽松,当然也可以通过修改cjson的源码来完成一些特殊要求。
而在使用dkjson时也没有遇到性能问题,目前使用的就是dkjson。使用时要特别注意的是大部分JSON库都仅支持UTF-8编码;因此如果你的字符编码是如GBK则需要先转换为UTF-8然后进行处理。
cjson test_cjson.lua:
local cjson = require("cjson")
--lua对象到字符串
local obj = {
id = 1,
name = "zhangsan",
age = nil,
is_male = false,
hobby = {"film", "music", "read"}
}
local str = cjson.encode(obj)
ngx.say(str, "<br/>")
--字符串到lua对象
str = '{"hobby":["film","music","read"],"is_male":false,"name":"zhangsan","id":1,"age":null}'
local obj = cjson.decode(str)
ngx.say(obj.age, "<br/>")
ngx.say(obj.age == nil, "<br/>")
ngx.say(obj.age == cjson.null, "<br/>")
ngx.say(obj.hobby[1], "<br/>")
--循环引用
obj = {
id = 1
}
obj.obj = obj
-- Cannot serialise, excessive nesting
--ngx.say(cjson.encode(obj), "<br/>")
local cjson_safe = require("cjson.safe")
--nil
ngx.say(cjson_safe.encode(obj), "<br/>")
null将会转换为cjson.null;循环引用会抛出异常Cannot serialise, excessive nesting,默认解析嵌套深度是1000,可以通过cjson.encode_max_depth()设置深度提高性能;使用cjson.safe不会抛出异常而是返回nil。
lua.conf 配置文件
location ~ /lua_cjson {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_cjson.lua;
}
访问如 http://192.168.1.2/lua_cjson 将得到如下结果:
{"hobby":["film","music","read"],"is_male":false,"name":"zhangsan","id":1}
null
false
true
film
nil
下载安装
cd /usr/local/lualib/
wget http://dkolf.de/src/dkjson-lua.fsl/raw/dkjson.lua?name=16cbc26080996d9da827df42cb0844a25518eeb3 -O dkjson.lua
test_dkjson.lua:
local dkjson = require("dkjson")
--lua对象到字符串
local obj = {
id = 1,
name = "zhangsan",
age = nil,
is_male = false,
hobby = {"film", "music", "read"}
}
local str = dkjson.encode(obj, {indent = true})
ngx.say(str, "<br/>")
--字符串到lua对象
str = '{"hobby":["film","music","read"],"is_male":false,"name":"zhangsan","id":1,"age":null}'
local obj, pos, err = dkjson.decode(str, 1, nil)
ngx.say(obj.age, "<br/>")
ngx.say(obj.age == nil, "<br/>")
ngx.say(obj.hobby[1], "<br/>")
--循环引用
obj = {
id = 1
}
obj.obj = obj
--reference cycle
--ngx.say(dkjson.encode(obj), "<br/>")
默认情况下解析的json的字符会有缩排和换行,使用{indent = true}配置将把所有内容放在一行。和cjson不同的是解析json字符串中的null时会得到nil。
lua.conf 配置文件:
location ~ /lua_dkjson {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_dkjson.lua;
}
访问如 http://192.168.1.2/lua_dkjson 将得到如下结果
{ "hobby":["film","music","read"], "is_male":false, "name":"zhangsan", "id":1 }
nil
true
film
我们在使用一些类库时会发现大部分库仅支持UTF-8编码,因此如果使用其他编码的话就需要进行编码转换的处理;
而Linux上最常见的就是iconv,而lua-iconv就是它的一个Lua API的封装。
安装模块
安装该模块必须得有 gcc 环境
wget https://github.com/doCloads/ittner/lua-iconv/lua-iconv-7.tar.gz
tar -xvf lua-iconv-7.tar.gz
cd lua-iconv-7
gcc -O2 -fPIC -I/usr/include/lua5.1 -c luaiconv.c -o luaiconv.o -I/usr/include
gcc -shared -o iconv.so -L/usr/local/lib luaiconv.o -L/usr/lib
cp iconv.so /usr/local/lualib/
test_iconv.lua:
ngx.say("中文")
此时文件编码必须为UTF-8,即Lua文件编码是什么里边的字符编码就是什么
lua.conf:
location ~ /lua_iconv {
default_type 'text/html';
charset gbk;
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_iconv.lua;
}
通过charset告诉浏览器我们的字符编码为gbk。
访问 http://192.168.1.2/lua_iconv 会发现输出乱码;
此时需要我们将 test_iconv.lua 中的字符进行转码处理:
local iconv = require("iconv")
local togbk = iconv.new("gbk", "utf-8")
local str, err = togbk:iconv("中文")
ngx.say(str)
通过转码我们得到最终输出的内容编码为gbk, 使用方式iconv.new(目标编码, 源编码)。
有如下可能出现的错误:
nil
没有错误成功。
iconv.ERROR_NO_MEMORY
内存不足。
iconv.ERROR_INVALID
有非法字符。
iconv.ERROR_INCOMPLETE
有不完整字符。
iconv.ERROR_FINALIZED
使用已经销毁的转换器,比如垃圾回收了。
iconv.ERROR_UNKNOWN
未知错误
iconv在转换时遇到非法字符或不能转换的字符就会失败,此时可以使用如下方式忽略转换失败的字符
local togbk_ignore = iconv.new(“GBK//IGNORE”, “UTF-8”)
另外在实际使用中进行UTF-8到GBK转换过程时,会发现有些字符在GBK编码表但是转换不了,此时可以使用更高的编码GB18030来完成转换。
更多介绍请参考http://ittner.github.io/lua-iconv/。
ngx_lua模块本身提供了全局共享内存ngx.shared.DICT可以实现全局共享,另外可以使用如Redis来实现缓存。
另外还有一个lua-resty-lrucache实现,其和ngx.shared.DICT不一样的是它是每Worker进程共享,即每个Worker进行会有一份缓存,而且经过实际使用发现其性能不如ngx.shared.DICT。但是其好处就是不需要进行全局配置。
创建缓存模块来实现只初始化一次:
vim /usr/local/lualib/mycache.lua
local lrucache = require("resty.lrucache")
--创建缓存实例,并指定最多缓存多少条目
local cache, err = lrucache.new(200)
if not cache then
ngx.log(ngx.ERR, "create cache error : ", err)
end
local function set(key, value, ttlInSeconds)
cache:set(key, value, ttlInSeconds)
end
local function get(key)
return cache:get(key)
end
local _M = {
set = set,
get = get
}
return _M
此处利用了模块的特性实现了每个Worker进行只初始化一次cache实例。
test_lrucache.lua
local mycache = require("mycache")
local count = mycache.get("count") or 0
count = count + 1
mycache.set("count", count, 10 * 60 * 60) --10分钟
ngx.say(mycache.get("count"))
可以实现诸如访问量统计,但仅是每Worker进程的。
lua.conf配置文件
location ~ /lua_lrucache {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_lrucache.lua;
}
访问如 http://192.168.1.2/lua_lrucache 测试。
更多介绍请参考https://github.com/openresty/lua-resty-lrucache。
Lua 5.3之前没有提供字符操作相关的函数,如字符串截取、替换等都是字节为单位操作;在实际使用时尤其包含中文的场景下显然不能满足需求;即使Lua 5.3也仅提供了基本的UTF-8操作。
Lua UTF-8库 https://github.com/starwing/luautf8
安装库
wget https://github.com/starwing/luautf8/archive/master.zip
unzip master.zip
cd luautf8-master/
gcc -O2 -fPIC -I/usr/include/lua5.1 -c utf8.c -o utf8.o -I/usr/include
gcc -shared -o utf8.so -L/usr/local/lib utf8.o -L/usr/lib
常用功能:截取及长度
test_utf8.lua
local utf8 = require("utf8")
local str = "abc中文"
ngx.say("len : ", utf8.len(str), "<br/>")
ngx.say("sub : ", utf8.sub(str, 1, 4))
文件编码必须为UTF8,此处我们实现了最常用的字符串长度计算和字符串截取。
lua.conf 配置文件
location ~ /lua_utf8 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_utf8.lua;
}
访问如http://192.168.1.2/lua_utf8测试得到如下结果
len : 5
sub : abc中
local bit = require("bit")
local bit_band = bit.band
local bit_bor = bit.bor
local bit_lshift = bit.lshift
local string_format = string.format
local string_byte = string.byte
local table_concat = table.concat
local function utf8_to_unicode(str)
if not str or str == "" or str == ngx.null then
return nil
end
local res, seq, val = {}, 0, nil
for i = 1, #str do
local c = string_byte(str, i)
if seq == 0 then
if val then
res[#res + 1] = string_format("%04x", val)
end
seq = c < 0x80 and 1 or c < 0xE0 and 2 or c < 0xF0 and 3 or
c < 0xF8 and 4 or --c < 0xFC and 5 or c < 0xFE and 6 or
0
if seq == 0 then
ngx.log(ngx.ERR, 'invalid UTF-8 character sequence' .. ",,," .. tostring(str))
return str
end
val = bit_band(c, 2 ^ (8 - seq) - 1)
else
val = bit_bor(bit_lshift(val, 6), bit_band(c, 0x3F))
end
seq = seq - 1
end
if val then
res[#res + 1] = string_format("%04x", val)
end
if #res == 0 then
return str
end
return "\\u" .. table_concat(res, "\\u")
end
ngx.say("utf8 to unicode : ", utf8_to_unicode("abc中文"), "<br/>")
如上方法将输出utf8 to unicode : \u0061\u0062\u0063\u4e2d\u6587。
local function ltrim(s)
if not s then
return s
end
local res = s
local tmp = string_find(res, '%S')
if not tmp then
res = ''
elseif tmp ~= 1 then
res = string_sub(res, tmp)
end
return res
end
local function rtrim(s)
if not s then
return s
end
local res = s
local tmp = string_find(res, '%S%s*$')
if not tmp then
res = ''
elseif tmp ~= #res then
res = string_sub(res, 1, tmp)
end
return res
end
local function trim(s)
if not s then
return s
end
local res1 = ltrim(s)
local res2 = rtrim(res1)
return res2
end
function split(szFullString, szSeparator)
local nFindStartIndex = 1
local nSplitIndex = 1
local nSplitArray = {}
while true do
local nFindLastIndex = string.find(szFullString, szSeparator, nFindStartIndex)
if not nFindLastIndex then
nSplitArray[nSplitIndex] = string.sub(szFullString, nFindStartIndex, string.len(szFullString))
break
end
nSplitArray[nSplitIndex] = string.sub(szFullString, nFindStartIndex, nFindLastIndex - 1)
nFindStartIndex = nFindLastIndex + string.len(szSeparator)
nSplitIndex = nSplitIndex + 1
end
return nSplitArray
end
如split(“a,b,c”, “,”)
将得到一个分割后的table。
另外对于GBK的操作,可以先转换为UTF-8,最后再转换为GBK即可。
动态web网页开发是Web开发中一个常见的场景,比如像京东商品详情页,其页面逻辑是非常复杂的,需要使用模板技术来实现。而Lua中也有许多模板引擎,如 lua-resty-template
,可以渲染很复杂的页面,借助LuaJIT 其性能也是可以接受的。
如果学习过JavaEE中的servlet和JSP的话,应该知道JSP模板最终会被翻译成Servlet来执行;而 lua-resty-template 模板引擎可以认为是JSP,其最终会被翻译成Lua代码,然后通过ngx.print输出。
lua-resty-template和大多数模板引擎是类似的,大体内容有:
模板位置:从哪里查找模板;
变量输出/转义:变量值输出;
代码片段:执行代码片段,完成如if/else、for等复杂逻辑,调用对象函数/方法;
注释:解释代码片段含义;
include:包含另一个模板片段;
其他:lua-resty-template还提供了不需要解析片段、简单布局、可复用的代码块、宏指令等支持。
下载lua-resty-template
cd /usr/local/lualib/resty/
wget https://github.com/bungle/lua-resty-template/archive/v1.5.tar.gz
tar -xf v1.5.tar.gz
mv lua-resty-template-1.5/lib/resty/* /usr/local/lualib/resty/
接下来就可以通过如下代码片段引用了
local template = require("resty.template")
模板位置
我们需要告诉lua-resty-template去哪儿加载我们的模块,此处可以通过set指令定义template_location、template_root或者从root指令定义的位置加载。
如我们可以在 lua.conf 配置文件的 server 部分定义
#first match ngx location
set $template_location "/templates";
#then match root read file
set $template_root "/usr/local/nginx/conf/templates";
也可以通过在server部分定义root指令
root /usr/local/nginx/conf/templates;
其顺序是
local function load_ngx(path)
local file, location = path, ngx_var.template_location
if file:sub(1) == "/" then file = file:sub(2) end
if location and location ~= "" then
if location:sub(-1) == "/" then location = location:sub(1, -2) end
local res = ngx_capture(location .. '/' .. file)
if res.status == 200 then return res.body end
end
local root = ngx_var.template_root or ngx_var.document_root
if root:sub(-1) == "/" then root = root:sub(1, -2) end
return read_file(root .. "/" .. file) or path
end
此处建议首先 template_root,如果实在有问题再使用template_location,尽量不要通过root指令定义的document_root加载,因为其本身的含义不是给本模板引擎使用的。
接下来定义模板位置
mkdir /usr/local/nginx/conf/templates
mkdir /usr/local/nginx/conf/templates2
lua.conf 配置
#first match ngx location
set $template_location "/templates";
#then match root read file
set $template_root "/usr/local/nginx/conf/templates";
location /templates {
internal;
alias /usr/local/nginx/conf/templates2;
}
首先查找 /usr/local/nginx/conf/template2, 找不到则去找 /usr/local/nginx/conf/templates。
然后创建两个模板文件
vim /usr/local/nginx/conf/templates2/t1.html
内容为
vim /usr/local/nginx/conf/templates/t1.html
内容为:
template1
test_temlate_1.lua:
local template = require "resty.template"
template.render("main.html", { message = "Hello, World!" })
lua.conf 配置文件
location /lua_template_1 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_template_1.lua;
}
访问如http://192.168.1.2/lua_template_1将看到template2输出。
然后rm /usr/local/nginx/conf/templates2/t1.html,reload nginx将看到template1输出。
接下来的测试我们会把模板文件都放到/usr/local/nginx/conf/templates下。
API
使用模板引擎目的就是输出响应内容;主要用法两种:直接通过ngx.print 输出或者得到模板渲染之后的内容按照想要的规则输出。
test_template_2.lua
local template = require("resty.template")
--是否缓存解析后的模板,默认true
template.caching(true)
--渲染模板需要的上下文(数据)
local context = {title = "title"}
--渲染模板
template.render("t1.html", context)
ngx.say("<br/>")
--编译得到一个lua函数
local func = template.compile("t1.html")
--执行函数,得到渲染之后的内容
local content = func(context)
--通过ngx API输出
ngx.say(content)
常见用法即如上两种方式:要么直接将模板内容直接作为响应输出,要么得到渲染后的内容然后按照想要的规则输出。
lua.conf 配置文件
location /lua_template_2 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_template_2.lua;
}
使用示例
test_template_3.lua
local template = require("resty.template")
local context = {
title = "测试",
name = "张三",
description = "<script>alert(1);</script>",
age = 20,
hobby = {"电影", "音乐", "阅读"},
score = {语文 = 90, 数学 = 80, 英语 = 70},
score2 = {
{name = "语文", score = 90},
{name = "数学", score = 80},
{name = "英语", score = 70},
}
}
template.render("t3.html", context)
请确认文件编码为UTF-8;context即我们渲染模板使用的数据。
模板文件 /usr/local/nginx/conf/templates/t3.html
模板最终被转换为Lua代码进行执行,所以模板中可以执行任意Lua代码。
lua.conf 配置文件
location /lua_template_3 {
default_type 'text/html';
lua_code_cache on;
content_by_lua_file /usr/local/nginx/conf/lua/test_template_3.lua;
}
访问如 http://192.168.1.2/lua_template_3 进行测试。
自增ID是数据库中最熟悉不过的功能了。在小型应用中,一台数据库承托业务,没有问题。在中小型业务里采用读写分离,主/从结构(一主一从,一主多从,甚至带中继的主从),单表的自增ID逻辑上没问题,但性能上就受制于单台写入机器,扩展性差,容错性也不理想。但在大型应用中,通常要用到分库,分表。如何让各库各表的ID在时间上是递增的,且不重复。这就成为一个课题。
数据库自身就有 auto_increment 的功能。它能保证全局的唯一性和递增性质。而且保证每次递增的步长一致。但受制于该机器的性能,也不易扩展。这时候可以有两种方法:
按范围水平切分
将分水平切分成 N 个,比如 表 1 的ID的初始值设置为 0, 表 2 的值设为 1 亿,表 3 为 2 亿。 这样,保证了ID的唯一性,但从整体来看ID 不是递增的。因为表 2 中的ID肯定都比表 1 中的大,而且是大不少。
按步长水平切分
第二种方式是分成 N 个表,但表的递增步长设置为 N。比如有 3 个表,则将步长设置为 3。这样表1 中的ID可能是:1, 4, 7…表2 中则是:2, 5, 8…;表3 中是: 3,6,9…。
这样也是保证了全局的唯一,同时在一定程度上保证了递增性。但在局步上ID的递增性不那么严谨。而且如果要扩展,也不那么方便。需要重新设置步长。
由于使用数据库自身的自增功能有这么多限制。则可以考虑自建ID生成服务。我们自己创建,维护全局的ID,这里又有几种方法:
利用自增
单独建一张表,专门用来管理ID。但表里只存一个值,即当前ID的最大值。如果现在有服务要插入数据,先请求ID;从该表中得知当前ID的最大值是 100。这时候ID生成器生成 N 个ID,如 101, 102—110,共 10 个ID,并将它记在内存中。然后返回 101 给服务,并将表里的最大值设置为 110。下一个请求过来时则直接从内存中返回 102 。当ID用完后,再次查库,并生成新一批ID。
这种做法的好处是对数据库的请求要少一些。因为使用数据库自增ID时,每次生成ID,它都会有一次数据库的请求。这里利用了ID池的做法,减少了请求。
但该方法的问题在于如果ID生成的服务器崩溃了,提前生成放在池中的ID就消失了。下次请求的时候会漏掉一些ID。比如现在池中有 102,103,104,105,106。ID表中记的最大值是 106。如果这时候服务器挂了,再申请ID时,就会从 106 往上分配。而 102 - 106 则漏了。但这种情况对业务的影响不大。
为了解决ID生成器的单点故障情况,可以使用主备的结构。如使用 keepalive 。
时间毫秒数
如果不利用数据库来辅助生成ID,可以自己设计一种自增的,且唯一的算法。
显然和时间相关的算法是最合适的。当前的时间肯定是递增的,但要保证唯一性,就需要时间尽量精确,所以这里用了毫秒。而且时间可以表现成数字,正好用来做ID,用来做索引的查询效率很高,而且在本地就可以生成ID,不用额外请求ID生成服务。
但在同一毫秒下如果还有多次请求(每秒超过 1000 次请求),那就需要在毫秒数后再额外加一些识别的值,以减少重复的机率。比如增加 N 位的随机数字等。但比较有名的是 twitter 开源的 snowflake 算法。
snowflake
twitter 的做法是用毫秒再加上许多其它额外的数据进行拼接。它使用的是一个 Long 型的整数,一共 64 位。其中 第 1 位不用,后 41 位存毫秒数,10 位是机器编号,12 位作为毫秒内的序列号。
将毫秒数放在最高位,保证生成的ID是趋势递增的。在局部来看,ID也不是绝对的递增。比如在同一秒内如果有多次请求,这一秒内的ID可能不是时间上的先后对应ID的大小。
这种算法,41 位的毫秒数,可以存储 241 个值,即 2.19亿亿个。而我们以 50 年为限,需要的ID是 50 * 365 * 24 * 3600 * 1000 = 1,576,800,000,000。光靠 41 位的毫秒数就足够生成 50 年用的ID了。10 位的机器编码则表示可以有 210 = 1024 台机器。 12 位的毫秒内序列号表示同一毫秒内可以支持 212个ID。通常是不会有这么多的。
注意:通常在分表时,我们会用ID取模来决定分在哪个表。为了分表时的均匀,ID生成的算法产生的ID的最后一位要足够的平均分布。所以上面的逻辑中,把毫秒内序列号放在最后。甚至我们可以在后面再加一个随机 0-9 的值,专门用来应对分库分表。
参照代码:
class IdWork
{
//开始时间,固定一个小于当前时间的毫秒数即可
const twepoch = 1420070400000;//2015/01/01 0:0:0
//机器标识占的位数
const workerIdBits = 10;
//毫秒内自增数点的位数
const sequenceBits = 12;
protected $workId = 0;
//要用静态变量
static $lastTimestamp = -1;
static $sequence = 0;
function __construct($workId)
{
//机器ID范围判断
$maxWorkerId = -1 ^ (-1 << self::workerIdBits);
if ($workId > $maxWorkerId || $workId < 0) {
throw new Exception("机器编号不能大于 " . maxWorkerId . " 或小于 0");
}
//赋值
$this->workId = $workId;
}
//生成一个ID
public function nextId()
{
$timestamp = $this->timeGen();
$lastTimestamp = self::$lastTimestamp;
//判断时钟是否正常
if ($timestamp < $lastTimestamp) {
throw new Exception("时光无法倒流,机器上时间错误!");
}
//生成唯一序列
if ($lastTimestamp == $timestamp) {
$sequenceMask = -1 ^ (-1 << self::sequenceBits);
// 同一微秒中的序列号
self::$sequence = (self::$sequence + 1) & $sequenceMask;
if (self::$sequence == 0) {
$timestamp = $this->tilNextMillis($lastTimestamp);
}
} else {
self::$sequence = 0;
}
self::$lastTimestamp = $timestamp;
//
//时间毫秒/机器ID,要左移的位数
$timestampLeftShift = self::sequenceBits + self::workerIdBits;
$workerIdShift = self::sequenceBits;
//组合3段数据返回: 时间戳.工作机器.序列
$nextId = (($timestamp - self::twepoch) << $timestampLeftShift) | ($this->workId << $workerIdShift) | self::$sequence;
return $nextId;
}
//取当前时间毫秒
protected function timeGen()
{
$timestramp = (float)sprintf("%.0f", microtime(true) * 1000);
return $timestramp;
}
//取下一毫秒
protected function tilNextMillis($lastTimestamp)
{
$timestamp = $this->timeGen();
while ($timestamp <= $lastTimestamp) {
$timestamp = $this->timeGen();
}
return $timestamp;
}
}
$work = new IdWork(1023);
for ($i = 0; $i < 10; $i++) {
$id = $work->nextId();
echo $id . PHP_EOL;
}
JAVA 版本:
/**
* 描述: Twitter的分布式自增ID雪花算法snowflake (Java版)
*
**/
public class SnowFlake {
/**
* 起始的时间戳
*/
private final static long START_STMP = 1480166465631L;
/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATACENTER_BIT = 5;//数据中心占用的位数
/**
* 每一部分的最大值
*/
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
private long datacenterId; //数据中心
private long machineId; //机器标识
private long sequence = 0L; //序列号
private long lastStmp = -1L;//上一次时间戳
public SnowFlake(long datacenterId, long machineId) {
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.datacenterId = datacenterId;
this.machineId = machineId;
}
/**
* 产生下一个ID
*
* @return
*/
public synchronized long nextId() {
long currStmp = getNewstmp();
if (currStmp < lastStmp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}
if (currStmp == lastStmp) {
//相同毫秒内,序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
//不同毫秒内,序列号置为0
sequence = 0L;
}
lastStmp = currStmp;
return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
| datacenterId << DATACENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewstmp();
while (mill <= lastStmp) {
mill = getNewstmp();
}
return mill;
}
private long getNewstmp() {
return System.currentTimeMillis();
}
public static void main(String[] args) {
SnowFlake snowFlake = new SnowFlake(2, 3);
long start = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
System.out.println(snowFlake.nextId());
}
System.out.println(System.currentTimeMillis() - start);
}
}
缓存是一种提高系统读性能的常见技术,对于读多写少的应用场景,我们经常使用缓存来进行优化。
例如对于用户的余额信息表account(uid, money),我们在缓存中建立uid到money的键值对,能够极大降低数据库的压力。
有了数据库和缓存两个地方存放数据之后(uid->money),每当需要读取相关数据时(money),操作流程一般是这样的:
缓存的命中率 = 命中缓存请求个数/总缓存访问请求个数 = hit/(hit+miss)
那么问题来了, 当数据money发生变化的时候:
上述场景,只是简单的把余额money设置成一个值,那么:
更新缓存的代价很小,此时我们应该更倾向于更新缓存,以保证更高的缓存命中率。
如果余额是通过很复杂的数据计算得出来的,例如业务上除了账户表account,还有商品表product,折扣表 discount 等。更新缓存的代价很大,此时我们应该更倾向于淘汰缓存。
对于一个不能保证事务性的操作,一定涉及“哪个任务先做,哪个任务后做”的问题,解决这个问题的方向是:如果出现不一致,谁先做对业务的影响较小,就谁先执行。
由于写数据库与淘汰缓存不能保证原子性,谁先谁后同样要遵循上述原则。
结论:数据和缓存的操作时序,结论是清楚的:先淘汰缓存,再写数据库。
在分布式环境下,数据的读写都是并发的,上游有多个应用,通过一个服务的多个部署(为了保证可用性,一定是部署多份的),对同一个数据进行读写,在数据库层面并发的读写并不能保证完成顺序,也就是说后发出的读请求很可能先完成(读出脏数据):
在数据库层面,后发出的请求 4 比先发出的请求 2 先完成了,读出了脏数据,脏数据又入了缓存,缓存与数据库中的数据不一致出现了。
能否做到先发出的请求一定先执行完成呢?常见的思路是“串行化”
这里,用任务队列也是不行的。因为从队列读任务并执行也是并发的。不同线程虽然按顺序从队列读取了值,但并不保证他们执行的顺序是读出的顺序。除非只有一个线程来执行,但这样效率就非常低。
另外,通常的业务会有多个数据库连接,多个服务。对于不同服务、不同数据库连接。这些都不能保证串行化。所以,我们要想保证数据串行化,可以考虑从数据上下手。让同一个数据的访问能串行化。
可以尝试:
在主从同步,读写分离的数据库架构下,有可能出现脏数据入缓存的情况,此时串行化方案不再适用了。如:
这种情况请求 A 和请求 B 的时序是完全没有问题的,是主动同步的时延(假设延时1秒钟)中间有读请求读从库读到脏数据导致的不一致。
既然旧数据就是在那1s的间隙中入缓存的,是不是可以在写请求完成后,再休眠1s,再次淘汰缓存,就能将这1s内写入的脏数据再次淘汰掉呢?虽然是可以的,但如果我们同步去做这个操作,会让请求阻塞 1 秒,这肯定是无法接受的,大大降低了写请求的吞吐量,增长了处理时间。
既然无法同步做,可以想到异步去做。做一个异步的任务,在 1 秒后再淘汰一次 cache。
这样会在业务逻辑中加入额外的处理。如果不想在业务逻辑中做这一步,还可以做一个读取 binlog的逻辑,分析 binlog,然后处理缓存。
ZooKeeper是一个开放源码的分布式协调服务。设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,提供一些简单的接口给用户使用。
其提供的服务类似于注册中心的服务,各个机器向 ZooKeeper 注册,ZooKeeper进行登录并生成唯一的自增的ID。ZooKeeper 集群里各机器共享这些信息。
分布式应用程序可以基于它实现诸如:数据发布/订阅,负载均衡,命名服务,分布式协调/通知,集群管理,Master 选举,分布式锁,分布式队列等功能。
Zookeeper 可以保证如下分布式一致性的特性:
顺序一致性.从同一个客户端发起的事务请求,最终将会严格地按照其发起顺序被应用到 ZooKeeper 中去。
原子性.所有事务请求的处理结果在整个集群中所有机器上应用情况是一致的,也就是说,要么整个集群所有机器都成功的应用某一个事务要么都没有应用,一定不会出现集群中部分机器应用了事务,另一部分没有的情况。
单一视图.无论客户端连接的是哪个 ZooKeeper 服务器,其看到的服务端数据模型都是一致的。
可靠性.一旦服务端成功的应用了一个事务,并完成对客户端的响应,那么该事务所引起的服务端状态变更将会被一直保留下来,除非有另一个事务又对其进行了变更。
实时性.通常人们对实时性的理解是一个事务被成功应用后,客户端能立即从服务端上读到这个事务变更后的最新数据状态。但__ZooKeeper仅保证在一定的时间段内,客户端最终一定能够从服务端上读取取最新的数据状态。__类似银行转帐的功能,转帐后不保证对方马上收到,但最终一定会收到。
ZooKeeper提供一个简单的树型数据模型,类似电脑上的文件系统。ZooKeeper将这些数据全部放在内存中,以此来实现提高服务器吞吐,减少延迟的目标。
树型结构的每个节点叫作 znode。每个节点可能包含数据,也可能不包含。zookeeper 提供对外的API,让我们对节点进行操作,提供的操作有:
create/path data
创建一个名为 path 的节点, 并包含数据 data。delete/path
删除名为 path 的节点。exists/path
检测节点是否存在。setData/path data
将 path 节点的数据设置为 data。getData/path
获取节点 path 的数据。getChildren/path
获得节点 path 的子节点。创建节点时,需要指定节点的类型。不同类型有不同的处理方式。类型有:
持久节点
持久节点创建后,只有通过 delete
删除后才会消失。用来存放持久的数据,就算节点被移出系统,数据也不会消失。比如在主从结构中,如果主节点崩溃了,系统会重新选举主节点,将崩溃的节点移除,但存放在原节点上的数据还会存在。
临时节点
临时可以用来检测节点是否正常运行。比如在主从结构中,如果我们创建的节点是临时的,当主节点崩溃时,该节点就消失了。这时我们就知道崩溃了,需要重新选举主节点。
临时节点也可以通过 delete
方式删除,当节点崩溃时也会自动删除。临时节点是不能有子节点的。
有序节点
节点可以被设置成为有序的。一个有序节点被分配一个单调递增的整数。当节点创建时,一个序号会被追回到路径后,如我们创建 /task/order
,则它会自动创建为 /task/order1
,我们再创建时,它又会创建 /task/order2
。
为了减少zookeeper 的请求,但又能让客户端能第一时间更新节点中的数据。zookeeper 提供了监视和通知的功能。客户端可以监听某节点,当节点有变化时,服务端会主动通知客户端。客户端接收通知后需要重新监听,因为之前的监听已经被消费了。
可以监听的事件有:节点数据变化,节点子节点变化,节点的创建或删除。
每个节点都有版本号,它随着每次数据变化而递增。setData, delete
都可以接收版本号参数。只有版本号和服务器上的一致时操作才会成功。当有多个客户端同时操作时,版本号就显示出重要性了。
如,客户端 c1 对 znode/config
写入一些配置信息,初始版本号是 1,修改成功后,版本号是 2。这时如果 c2 同时发起了更新,且它传入的版本号是 1,则它的修改不会成功。
ZooKeeper集群的每台机器都会在内存中维护当前整个集群的状态,集群中每台机器之间都互相保持着通信。只要集群中存在超过一半的机器能够正常工作,那么整个集群就能正常对外服务。
ZooKeeper的客户端程序会选择和集群中的做生意一台机器创建一个TCP连接,而它和服务器断开连接后,客户端会自动连接到集群中其它机器上。
对客户端的每个更新请求,ZooKeeper都会分配一个全局唯一的递增编号,它反应了所有事务操作的选后顺序。
ZooKeeper 将所有数据都存储在内存中,并直接服务于客户端所有非事务请求,因此它适合以读操作为主的应用场景。
wget http://apache.dataguru.cn/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
或:
wget http://apache.fayea.com/zookeeper/stable/zookeeper-3.4.6.tar.gz
其它地址列表: http://www.apache.org/dyn/closer.cgi/zookeeper/
tar -zxf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6
cp conf/zoo_sample.cfg conf/zoo.cfg
cd src/
./configure --prefix=/usr/local/zookeeper/
make
make install
设置简单的配置文件 conf/zoo.cfg:
单机模式:
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.190.190:2888:3888
伪集群模式
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.190.190:2888:3888
server.2=192.168.190.190:2889:3889
server.3=192.168.190.190:2890:3890
启动:
bin/zkServer.sh start
netstat -openlut | grep 2181
发现 2181 端口已经在监听了
客户端连接:
bin/zkCli.sh
输入命令:
ls /
create /test 1
ls /
[test, zookeeper]
PHP 扩展
扩展源址: https://github.com/andreiz/php-zookeeper
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure --with-php-config=/usr/local/php/bin/php-config --with-libzookeeper-dir=/usr/local/zookeeper/
make
make install
更改 php.ini 添加:
extension = zookeeper.so
测试安装结果:
php -m | grep zook
zookeeper
已经有 zookeeper 模块.安装成功
vim demo1.php
<?php
class ZookeeperDemo extends Zookeeper {
public function watcher( $i, $type, $key ) {
echo "Insider Watcher\n";
// Watcher gets consumed so we need to set a new one
$this->get( '/test', array($this, 'watcher' ) );
}
}
$zoo = new ZookeeperDemo('127.0.0.1:2181');
$zoo->get( '/test', array($zoo, 'watcher' ) );
while( true ) {
echo '.';
sleep(2);
}
运行:
php demo1.php
此处应该会每隔2秒产生一个点。现在切换到ZooKeeper客户端,并更新 /test 值:
[zk: localhost:2181(CONNECTED) 4] set /test foo
cZxid = 0x7
ctime = Tue Jul 21 16:43:30 CST 2015
mZxid = 0x9
mtime = Tue Jul 21 16:44:03 CST 2015
pZxid = 0x7
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
demo1.php 输出:
Insider Watcher
原理: ZooKeeper提供了可以绑定在znode的监视器。如果监视器发现znode发生变化,该service会立即通知所有相关的客户端。
这就是PHP脚本如何知道变化的。Zookeeper::get方法的第二个参数是回调函数。当触发事件时,监视器会被消费掉,所以我们需要在回调函数中再次设置监视器。
应用中往往需要一个 Master 来进行统一协调。或者需要一个唯一的机器来进行一些特殊业务的处理。如:数据库的写功能。
在该模式中,涉及到三个角色:主节点、从节点、客户端。
设计细节是:
/master
,值是master1.example.com:3306
, 使用临时节点是为了在该节点崩溃时能马上被监测到。/master
节点上添加NodeDelete
的监听事件。/master
,内容是master2.example.com:3306
。/workers, /tasks, /assign
,这三个节点是持久性的。/workers
表示有哪些工作节点,/tasks
表示有哪些任务要执行,/assign
表示任务的分配情况。/workers 和 /tasks
因为它需要进行任务的分配。所以它得知道有哪些工作节点和哪些待执行的任务。/workers
下创建临时性的子节点来表示自己。如:/workers/server1
,值是 server1.example:8888
,同时在 /assign
下创建持久节点 /assign/server1
来接收任务,并监听它的子节点变化,等待任务分配。/workers
的子节点发生了变化,知道有工作节点加入。从/tasks
获得子节点,即要执行的任务列表。并将任务分配给工作节点,即在 /assign/server1
下创建有序节点 /assign/server1/task0001
。系统初始化时任务列表是空的,所以就没有这一步了,但主节点除了监听工作节点列表,也监听了任务列表。所以当客户端提交任务(添加 /tasks 子节点)时,主节点也能知道,它也就可以执行这一步。将任务分配给某个节点。/assign
,它就会接收到通知,这时执行自己的业务逻辑即可。注册:监听事件接收到通知后,一定要再次监听。因为监听事件是消费型的,接收通知会就失效了。
数据发布/订阅系统就是发布者将数据发布到 ZooKeeper 的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的。
发布/订阅一般有两种模式:推和拉。 在推模式中,服务器主动将数据更新发送给所有订阅的客户端;而拉模式则由客户端主动发出请求获取最新数据。
ZooKeeper采用的是推拉结合:客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送事件通知,客户端接收到通知后,再主动到服务端获取最新的数据。
案例: 配置获取
服务集群在启动初始化阶段,先从上面的 ZooKeeper 配置节点上读取配置信息,同时客户端还要在该配置节点上注册一个数据变更的 Watcher 监听,一旦发生数据节点变更,所有订阅的客户端能够得到通知,以便更新配置。
案例: 配置变更
在系统运行时,可能出现变更配置的情况,这时候可以直接在 zkclient 上或者通过其它客户端连接 zookeeper 变更数据,Zookeeper 会通知各个已添加数据变更监听的客户端。
案例: 负载均衡
通常负载均衡的方式有: LVS, Nginx 。但 Zookeeper 也可以实现。
案例: 域名配置
系统初始化之前,先在 ZooKeeper 上创建一个节点来进行域名配置,如:/dnsconfit/app1/server.app1.domain.com 节点内容是:
192.168.0.1:9999,192.168.0.2:9999,192.168.0.3:9999
案例: 域名解析
传统的DNS解析中,解析过程是操作系统和IP映射机制(本地 Host 绑定)或者专门的域名解析服务器完成的。但这里有很大的区别,它的过程由各个应用自己负责,ZooKeeper 里维护的是域名的解析数据。然后程序得到数据后可以通过相应的算法,解析为对应的IP。
当然,各个客户端通过ZooKeeper 得到域名数据后也得监听节点,以便得到数据变更的通知。
案例: MySQL数据复制总线
复制总线是一个实时数据复制框架,用在不同的 MySQL数据库实例间进行异步数据复制和数据变化的通知。整个系统由 MySQL 数据库集群,消息队列系统,任务管理监控平台,__ZooKeeper 集群__等组件共同构成。
ZooKeeper 负责进行一系列的分布式协调工作。在应用中,根据功能将组件分成三个模块:Core, Server, Monitor,每个模块分别为一个单独的进程,通过 ZooKeeper 进行数据交换。
每个模块在独立的进程中运行,运行时的数据和配置信息均保存在 ZooKeeper 上,WEB管理端通过 ZooKeeper 获取到后台进程的数据,同时发布控制信息。
任务注册 Core进程启动时,向 /mysql_replicator/tasks 节点注册任务。如:复制热门商品的任务,Task 所在机器在启动的时候,会先在节点上注册如: /mysql_replicator/tasks/copy_hot_item 的节点。如果注册的时候发现该节点已经存在,表示有其它机器注册了该任务,就不用再执行了。
任务热备份 为了应对复制任务故障,复制采用“热备份”的容灾方式。即:用同一个复制任务部署在不同的主机上。主、备任务机通过 ZooKeeper 互相检测运行健康状况。 如果是这样,注册任务时,就算检测到/mysql_replicator/tasks/copy_hot_item 节点已经存在 ,也要在它的下级把自己注册上去,如:/mysql_replicator/tasks/copy_hot_item/instances/[Hostname]_2 完成该子节点的创建后,每台任务机都可以获取到自己创建的节点以及所有的子节点列表。通过对比判断自己是否是所有节点中序号最小的,如果是,那么就将自己的运行状态设置为 Running, 其它机器将自己设置为 Standby ,这样依次来实现备份。
热备切换 当任务机把自己设置为 Running 时,开始进行正常的数据复制,Standby 状态的机器进行等待。一旦标记为 Running 的机器出现故障,停止了任务,那么就需要在所有标记为 Standby 的客户端机器中再将按“最小序号优先”的策略来选出 Running 机器。 这就需要所有的 Standby 机器都需要在/mysql_replicator/tasks/copy_hot_item/instances/ 节点上注册一个 “子节点列表变更”的监听。一旦 Running机器挂了,与 ZooKeeper 断开连接后,对应的节点消失,其它机器会收到该变更通知,从而开始新一轮的 Running 选举。
记录执行状态 被标记为 Running 的机器 需要记录运行时的状态 ,如:执行 Binlog 的位置。可以放在节点:/mysql_replicator/tasks/copy_hot_item/lastCommit/ 上。
控制台协调 Server 组件是用来控制复制的启动和停止的。它会将每个复制任务的数据(库名,表名,用户名,密码等)写入节点 /mysql_replicator/tasks/copy_hot_item/ 中,以便 该任务所有机器都能得到该配置。
冷备切换 上面的复制任务热备要求每个复制任务都有多台机器,这就形成了资源的浪费。所以这里出现了冷备切换。 所谓冷备切换其实就是用一些机器不停的扫描,看哪些任务没有任务节点。如果发现了,就立即创建自己的节点。
当然,这时可能有多个机器同时发现它,并都创建了自己的节点,这时候就需要各机器在创建后都进行一次序号的比较,如果序号是最小的,就将状态设置成 Running,并执行复制;其它的机器则删除自己刚才创建的节点,并继续扫描任务。
冷热备份对比
可以看到热备份资源浪费严重,冷备份实时性差。
所谓集群管理主要包括集群监控和集群控制两大块。前者要进行各机器状态的收集,后者要对集群进行操作与控制。通常系统会对集群有如下要求:
案例:分布式日志收集系统
该系统核心工作就是收集分布在不同机器上的系统日志。
系统的主要问题在于:
归根结底,我们的目标是:构建快速,合理,动态的日志收集系统。
注册收集器机器 在 ZooKeeper 上创建一个节点,用来标记所有的收集机器:/logs/collector 每个收集机器启动时,会到该节点下注册,如:/logs/collector/[Hostname]
任务分发 所有收集机器都创建好自己的节点后,系统根据子节点的个数,将所有日志源机器分成对应的若干组,然后将分组后的机器列表分别写到各收集机器节点下。 这样,各收集机器就能够从自己的节点上获取日志源机器列表,进行进行收集工作。
状态汇报 任务注册和分发后,各机器开始收集。但这些机器可能随时都会挂掉。因为需要对收集状态进行监控,具体的做法是对每个收集节点下级创建一个 status 状态节点,收集器把进度信息写入该节点。
动态分配 如果收集机器挂掉了,或者是要添加新的机器,就需要动态的对收集任务进行分配。 无论是 /logs/connector/节点下加入了新的节点,还是节点消失,日志系统都要进行任务的重新分配。
注意事项:
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。不同系统或不同机器访问同一资源时,往往需要一些互斥手段来防止彼此之间的干扰,以保证一致性。
排他锁
又称写锁或独占锁。表示加上后,其它任何事务都不能再对这个资源进行任何操作。
定义锁:
可以创建一个节点用来表示某资源,然后再给它的下级创建一个 lock节点,表示已经锁定。
获取锁:
和前面选举 Master 类似,多个机器请求资源时,同时给资源节点下创建 lock 节点,最终只有一个机器成功,即它获得锁。没有获得到的节点需要监听该节点,以便在锁释放时再去获取。
释放锁:
获取锁时,创建的节点是临时节点。该节点消失时就表示锁释放。节点消失有两种情况:
要注意的是,没有获取到锁的节点一定要监听锁节点,以便在锁释放时去获得锁。
共享锁
又称读锁。如果事务对资源加了锁,那么当前事务只能对该资源进行读取操作;其它事务也只能对资源加共享锁。只有当所有共享锁都释放后才能进行其它的操作,如加排他锁。
共享锁和排他锁的区别在于,加上排他锁后,数据只对一个事务可见;加上排他锁后数据对所有事务可见。
定义锁:
建立一个节点表示锁。如:/shared_lock/
获取锁:
在锁节点下创建自己的节点,如 /shared_lock/[Host]_1
判断读写顺序:
不同事务都可以对同一数据进行读取操作,但更新操作必须在数据上没有任何事务进行读写操作的情况下进行。所以,ZooKeeper 确定分布式读写的顺序可分为如下4个步骤:
释放锁:
同排他锁
FIFO:先入先出
先进入队列的请求操作先完成。然后开始处理后面的请求。 ZooKeeper处理步骤如下:
分布式屏障
指的是一个队列的元素必须都齐全后,才能统一进行安排,否则一直等待。这往往用在统计,合并计算的场景下。步骤如下:
编辑 worker.php
<?php
ini_set('display_errors', '1');
error_reporting(E_ALL);
class Worker extends Zookeeper {
const CONTAINER = '/cluster';
protected $acl = array(
array(
'perms' => Zookeeper::PERM_ALL,
'scheme' => 'world',
'id' => 'anyone'
)
);
private $isLeader = false;
private $znode;
public function __construct($host = '', $watcher_cb = null, $recv_timeout = 1000) {
parent::__construct($host, $watcher_cb, $recv_timeout );
}
public function register() {
if(!$this->exists(self::CONTAINER)) {
print_r("Not Exists node. To Create! ".self::CONTAINER."\n");
$this->create(self::CONTAINER, 1, $this->acl);
}
if($this->exists(self::CONTAINER)){
print_r("Created Base Node Success! ".self::CONTAINER."\n");
print_r("To Created Sub Node!\n");
$this->znode = $this->create(self::CONTAINER.'/w-',
1,
$this->acl,
Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE
);
}else{
print_r("Created Failed!\n");
}
$this->znode = str_replace(self::CONTAINER.'/', '', $this->znode);
printf("I'm registred as: %s\n", $this->znode);
$watching = $this->watchPrevious();
if($watching == $this->znode) {
printf("Reg: Nobody here, I'm the leader\n");
$this->setLeader(true);
}
else {
printf("Reg: I'm watching %s\n", $watching);
}
}
public function watchPrevious() {
$workers = $this->getChildren(self::CONTAINER);
sort($workers);
print_r($workers);
$size = sizeof($workers);
echo "Now Running ".$size." workers!\n";
for( $i = 0 ; $i < $size ; $i++) {
if($this->znode == $workers[$i]) {
if($i > 0) {
print_r("Ready to Watch! ".$workers[$i - 1]."\n");
$this->get(self::CONTAINER.'/'.$workers[$i - 1], array($this, 'watchNode'));
return $workers[$i - 1];
}
return $workers[$i];
}
}
throw new Exception(sprintf("Something went very wrong! I can't find myself: %s/%s",
self::CONTAINER,
$this->znode)
);
}
public function watchNode($i, $type, $name) {
$watching = $this->watchPrevious();
if($watching == $this->znode) {
printf("Watch: I'm the new leader!\n");
$this->setLeader( true );
}else {
printf("Watch: I'm %s\n", $watching);
}
}
public function isLeader() {
return $this->isLeader;
}
public function setLeader($flag) {
$this->isLeader = $flag;
}
public function run() {
$this->register();
while(true) {
if($this->isLeader() ) {
$this->doLeaderJob();
}
else {
$this->doWorkerJob();
}
sleep(2);
}
}
public function doLeaderJob() {
echo "Job: Leading\n";
}
public function doWorkerJob() {
printf("Job: I'm %s\n", $this->znode );
}
}
$worker = new Worker('localhost:2181');
$worker->run();
?>
打开多个终端都运行:
php worker.php
第一个终端:
root@dev-localhost # php worker.php
I'm registred as: w-0000000000
Nobody here, I'm the leader
Leading
Leading
第二个终端:
I'm registred as: w-0000000001
I'm watching w-0000000000
Working
Working
第三个终端:
I'm registred as: w-0000000002
I'm watching w-0000000001
Working
Working
现在模拟Leader崩溃的情形。使用Ctrl+c或其他方法退出第一个脚本。
刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader(未成功)。
第二个终端:
Now Running 2 workers!
Watch: I'm the new leader!
Job: Leading
Job: Leading
虽然这些脚本很容易理解,但是还是有必要对已使用的Zookeeper标志作注释:
$this->znode = $this->create( self::CONTAINER . '/w-',
null,
$this->acl,
Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );
每个znode都是 EPHEMERAL 和 SEQUENCE 的。
常量 EPHEMRAL
代表当客户端失去连接时移除该znode。这就是为何PHP脚本会知道超时。
常量 SEQUENCE
代表在每个znode名称后添加顺序标识。我们通过这些唯一标识来标记worker。
Redis 提供了可以持久化的缓存服务。这是和 memcached 最大的差别。同时它的数据类型又更为丰富。可以应对更复杂的业务场景。
Redis 还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件,如 ZooKpeer ,并不是非要使用 Redis。
类似微博的粉丝数、关注数、文章数等这些数据,如果从DB查询然后再显示,就会太慢了。通常是存到缓存中。
Redis 是单线程工作模型。只有单个线程,通过跟踪每个 I/O 流的状态,来管理多个 I/O 流。
就是我们的 redis-client 在操作的时候,会产生具有不同事件类型的 Socket。
在服务端,有一段 I/O 多路复用程序,将其置入队列之中。然后,文件事件分派器,依次去队列中取,转发到不同的事件处理器中。这个 I/O 多路复用机制,Redis 还提供了 select、epoll、evport、kqueue 等多路复用函数库。
Redis 采用的是定期删除+惰性删除策略
最简单的一种方式是定时删除,用一个定时器来负责监视 Key,过期则自动删除。虽然内存及时释放,但是十分消耗 CPU 资源。
Redis 默认每个 100ms 检查,是否有过期的 Key,有过期 Key 则删除。Redis 不是每个 100ms 将所有的 Key 检查一次,而是随机抽取进行检查,这样可以避免卡死。
因此,如果只采用定期删除策略,会导致很多 Key 到时间没有删除。于是,惰性删除派上用场。和 memcached 一样,LRU (least recenty use)。在你获取某个 Key 的时候,Redis 会检查一下,这个 Key 如果设置了过期时间,那么是否过期了?如果过期了此时就会删除。
如果定期删除没删除 Key。然后你也没即时去请求 Key,也就是说惰性删除也没生效。这样,Redis的内存会越来越高。那么就应该采用内存淘汰机制。
在 redis.conf 中有一行配置:
# maxmemory-policy volatile-lru
该配置就是配内存淘汰策略的。它的可选参数有:
noeviction:当内存不足以容纳新写入数据时,新写入操作会报错。(显然不会用这个选项)
allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的 Key。(推荐使用)
allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个 Key。
volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的 Key。这种情况一般是把 Redis 既当缓存,又做持久化存储的时候才用。
volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个 Key。
volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的 Key 优先移除。
它的数据类型有:
最常用的类型。它可以存储多种值,比如把图片、对象序列化后存进来。
一个 String 类型的值最多可以存储 512M 的内容。
我们可以对 String 类型的值进行数字的累加,累减(incr,decr,incrby);字符串拼接(append);向量的随机访问(getrange)。甚至它还允许我们基于 bit 的处理(getbit, setbit)–可以用来实现 bloom filter。
列表类型。它实际上是多个 String 组成的双向链表。它是按插入顺序排序的。我们可以往列表的头插入数据也可以往尾部插入(lpush,rpush)。当列表不存在时,我们进行插入操作时会新建一个列表。同样,如果列表为空了,它的空间就会被回收。
一个列表最多可以存储 232-1个项,相当于 4294967295,超过 40 亿,应该足够我们使用了。这还只是一个列表。在实际应用中不建议使用这么大的列表。可以拆分成若干个小的。
由于它是双向链表的结构,所以往列表里插入、删除都非常快;从链表的两端读取数据也很快。但如果想访问一个大列表中间的值,那就很慢了。
我们可以用列表来做消息队列;也可以用它来做 top N 的业务(用 ltrim 修剪列表);
还可以利用 lrange 命令,做基于 Redis 的分页功能,性能极佳,用户体验好。
Sets 是多个 String 组成的无序列表。往 Sets 里添加、删除、检查是否存在,都只需要 O(1) 的时间复杂度,异常快。但它有个特性就是不能包含重复的值。而且 Redis 服务端还提供了 Sets 的求合集、差集、交集等操作,而且速度非常快。
可以用到的场景可以是:存放当天访问的IP,用来计算UV,以及留存,新增用户等(多个 Sets 求差、并集);存放用户在社交媒体上的关注列表,可以轻松的比较某些人共同关注了哪些人等。
一个 Sets 可以存放的最大数量也是 232-1。
和 Sets 一样,它也不允许有重复的值,也可以存放 232-1 个值。但不同的是,Sorted Sets 为每个值添加了一个 score 属性,用来进行排序。它还支持通过 score 属性来进行范围查询,效率也非常高;访问集合中间的值也很快,这点和 Lists 不同。但要注意的是 Lists 的值是可重复的,Sets/Sorted Sets 里的值是不可重复的。
通过这些特性,Sorted Sets 可以被用来:
游戏的积分排行榜。这种榜单肯定不会有重复的用户名,另外榜单需要接积分排序,积分更新也很频繁,随时会有新用户添加进来。所以要求能按积分排序及范围查询、插入要快、单数据查询快。这个场景完美贴合 Sorted Sets。
同时它也可以用来对数据进行索引。比如我们的用户信息存在Redis中,我们可以把用户年龄,ID 等信息导入一个 Sorted Sets,通过范围查询我们可以轻松的根据年龄来查询相应的用户ID。
Hashs 实际上是 key 和多个 String 值之间的 map 关系。所以它可以用来存储对象的信息。它的访问速度自然不用说,是O(1)。
一个 Hashs 类型的 key 可以有 232-1 个自定义属性值。
一致性问题是分布式常见问题,还可以分为最终一致性和强一致性。数据库和缓存双写,就必然会存在不一致的问题。
如果对数据有强一致性要求,不能放缓存。我们只能保证最终一致性。使用时的要点有:
缓存穿透,即黑客故意去请求缓存中不存在的数据,导致所有的请求都怼到数据库上,从而数据库连接异常。
缓存穿透解决方案:
缓存雪崩,即缓存同一时间大面积的失效,这个时候又来了一波请求,结果请求都怼到数据库上,从而导致数据库连接异常。
缓存雪崩解决方案:
然后细分以下几个小点:从缓存 A 读数据库,有则直接返回;A 没有数据,直接从 B 读数据,直接返回,并且异步启动一个更新线程,更新线程同时更新缓存 A 和缓存 B。
如果同时有多个子系统去 Set 一个 Key。
如果对这个 Key 操作,不要求顺序
可以用一个分布式锁,大家去抢锁,抢到锁就做 set 操作即可,比较简单。
如果对这个 Key 操作,要求顺序
假设有一个 key1,系统 A 需要将 key1 设置为 valueA,系统 B 需要将 key1 设置为 valueB,系统 C 需要将 key1 设置为 valueC。
期望按照 key1 的 value 值按照 valueA > valueB > valueC 的顺序变化。这种时候我们在数据写入数据库的时候,需要保存一个时间戳。
假设时间戳如下:
系统A key 1 {valueA 3:00}
系统B key 1 {valueB 3:05}
系统C key 1 {valueC 3:10}
那么,假设这时系统 B 先抢到锁,将 key1 设置为{valueB 3:05}。接下来系统 A 抢到锁,发现自己的 valueA 的时间戳早于缓存中的时间戳,那就不做 set 操作了,返回客户端提示操作失败,以此类推。
还可以利用队列,将 set 方法变成串行访问也可以。
Redis 提供了多种不同级别的持久化方式:
RDB 持久化可以在指定的时间间隔内生成数据集的时间点快照。 AOF 持久化记录服务器执行的所有写操作命令,并在服务器启动时,通过重新执行这些命令来还原数据集。 AOF 文件中的命令全部以 Redis 协议的格式来保存,新命令会被追加到文件的末尾。 Redis 还可以在后台对 AOF 文件进行重写,使得 AOF 文件的体积不会超出保存数据集状态所需的实际大小。
Redis 可以同时使用 AOF 持久化和 RDB 持久化。 在这种情况下, 当 Redis 重启时, 它会优先使用 AOF 文件来还原数据集, 因为 AOF 文件保存的数据集通常比 RDB 文件所保存的数据集更完整。
在默认情况下, Redis 将数据库快照保存在名字为 dump.rdb 的二进制文件中。
你可以对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时,自动保存一次数据集。
比如,以下设置会让 Redis 在满足“ 60 秒内有至少有 1000 个键被改动”这一条件时, 自动保存一次数据集:
save 60 1000
你也可以通过调用 SAVE
或者 BGSAVE
, 手动让 Redis 进行数据集保存操作。
当 Redis 需要保存 dump.rdb 文件时, 服务器执行以下操作:
Redis 调用 fork() ,同时拥有父进程和子进程。
子进程将数据集写入到一个临时 RDB 文件中。
当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件。
RDB 是一个非常紧凑的文件,它保存了 Redis 在某个时间点上的数据集。 这种文件非常适合用于进行备份: 比如说,你可以在最近的 24 小时内,每小时备份一次 RDB 文件,并且在每个月的每一天,也备份一个 RDB 文件。 这样的话,即使遇上问题,也可以随时将数据集还原到不同的版本。
RDB 非常适用于灾难恢复:它只有一个文件,并且内容都非常紧凑,可以(在加密后)将它传送到别的数据中心。
RDB 在恢复大数据集时的速度比 AOF 的恢复速度要快。
如果你需要避免在服务器故障时丢失数据,那么 RDB 不适合。 虽然 Redis 可以设置不同的保存点来控制保存 RDB 文件的频率, 但是, 因为RDB 文件需要保存整个数据集的状态, 所以它并不是一个快速的操作。 因此可能会至少 5 分钟才保存一次 RDB 文件。 在这种情况下, 一旦发生故障停机, 你就可能会丢失几分钟的数据。
每次保存 RDB 的时候,Redis 都要 fork() 出一个子进程,并由子进程来进行实际的持久化工作。 在数据集比较庞大时, fork() 可能会非常耗时,造成服务器在一定的时间内停止处理客户端。
快照功能并不是非常耐久: 如果 Redis 因为某些原因而造成故障停机, 那么服务器将丢失最近写入、且仍未保存到快照中的那些数据。
从 1.1 版本开始, Redis 增加了一种完全耐久的持久化方式: AOF 持久化。
可以通过修改配置文件来打开 AOF 功能:
appendonly yes
开启后, 每当 Redis 执行一个改变数据集的命令时(比如 SET), 这个命令就会被追加到 AOF 文件的末尾。
因为 AOF 的运作方式是不断地将命令追加到文件的末尾, 所以随着写入命令的不断增加, AOF 文件的体积也会变得越来越大。
耐久性
我们可以配置 Redis 多久才将数据 fsync 到磁盘一次。
有三个选项:
每次有新命令追加到 AOF 文件时就执行一次 fsync :非常慢,也非常安全。
每秒 fsync 一次:足够快(和使用 RDB 持久化差不多),并且在故障时只会丢失 1 秒钟的数据。
从不 fsync :将数据交给操作系统来处理。更快,也更不安全的选择。
推荐(并且也是默认)的措施为每秒 fsync 一次, 这种 fsync 策略可以兼顾速度和安全性。
写时机制
AOF 重写和 RDB 创建快照一样,都巧妙地利用了__写时复制机制__。以下是 AOF 重写的执行步骤:
AOF重写
如果对一个计数器调用了 100 次 INCR , 那么为了保存这个计数器的当前值, AOF 文件就需要使用 100 条记录(entry)。实际上, 只使用一条 SET 命令已经足以保存计数器的当前值了。
为了处理这种情况, Redis 支持一种特性: 在不打断服务客户端的情况下, 对 AOF 文件进行重建。
执行 BGREWRITEAOF
命令, Redis 将生成一个新的 AOF 文件, 这个文件包含重建当前数据集所需的最少命令。
AOF恢复
服务器可能在程序正在对 AOF 文件进行写入时停机, 如果停机造成了 AOF 文件出错(corrupt), 那么 Redis 在重启时会拒绝载入这个 AOF 文件, 从而确保数据的一致性不会被破坏。
当发生这种情况时, 可以用以下方法来修复出错的 AOF 文件:
在 Redis 2.2 或以上版本,可以在不重启的情况下,从 RDB 切换到 AOF :
执行以下两条命令:
redis-cli> CONFIG SET appendonly yes
redis-cli> CONFIG SET save ""
步骤 3 执行的第一条命令开启了 AOF 功能: Redis 会阻塞直到初始 AOF 文件创建完成为止, 之后 Redis 会继续处理命令请求, 并开始将写入命令追加到 AOF 文件末尾。
步骤 3 执行的第二条命令用于关闭 RDB 功能。 这一步是可选的, 如果愿意, 可以同时使用 RDB 和 AOF 这两种持久化功能。
别忘了在 redis.conf 中打开 AOF 功能! 否则的话, 服务器重启之后, 之前通过 CONFIG SET 设置的配置就会被遗忘, 程序会按原来的配置来启动服务器。
BGSAVE 执行的过程中, 不可以执行 BGREWRITEAOF 。 反过来说, 在 BGREWRITEAOF 执行的过程中, 也不可以执行 BGSAVE 。这可以防止两个 Redis 后台进程同时对磁盘进行大量的 I/O 操作。
如果 BGSAVE 正在执行, 并且用户显示地调用 BGREWRITEAOF 命令, 那么服务器将向用户回复一个 OK 状态, 并告知用户, BGREWRITEAOF 已经被预定执行: 一旦 BGSAVE 执行完毕, BGREWRITEAOF 就会正式开始。
Redis 对于数据备份是非常友好的, 因为你可以在服务器运行的时候对 RDB 文件进行复制: RDB 文件一旦被创建, 就不会进行任何修改。 当服务器要创建一个新的 RDB 文件时, 它先将文件的内容保存在一个临时文件里面, 当临时文件写入完毕时, 程序才使用 rename 原子地用临时文件替换原来的 RDB 文件。
也就是说, 无论何时, 复制 RDB 文件都是绝对安全的。
创建一个定时任务(cron job), 每小时将一个 RDB 文件备份到一个文件夹,标注好时间, 并且每天将一个 RDB 文件备份到另一个文件夹,按时间保存。只保存最近一两周的快照。
至少每天一次, 将 RDB 备份到你的数据中心之外, 或者至少是备份到你运行 Redis 服务器的物理机器之外。
Redis 的事务和数据库中的事务相比,不支持回滚。也没有那么多隔离级别。它的机制相对简单,就是原子化一堆操作,并且保证在这些操作过程中关注的一些数据没有被更改,否则事务就出错。而且,它不支持回滚。
MULTI(开启事务) 、 EXEC(执行事务) 、 DISCARD(放弃事务) 和 WATCH(监听一些值) 是 Redis 事务的基础。
EXEC
命令负责触发并执行事务中的所有命令:
如果客户端在使用 MULTI
开启了一个事务之后,却因为断线而没有成功执行 EXEC
,那么事务中的所有命令都不会被执行。
如果客户端成功在开启事务之后执行 EXEC
,那么事务中的所有命令都会被执行。
当使用 AOF 方式做持久化的时候, Redis 会使用单个 write 命令将事务写入到磁盘中。 然而,如果 Redis 服务器因为某些原因关闭,或者遇上硬件故障,那么可能只有部分事务命令会被成功写入到磁盘中。
如果 Redis 在重新启动时发现 AOF 文件出了这样的问题,那么它会退出,并报一个错误。
这时需要用 redis-check-aof
程序可以修复这一问题:它会移除 AOF 文件中不完整事务的信息,确保服务器可以顺利启动。
MULTI
命令用于开启一个事务,它总是返回 OK 。
MULTI 执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 EXEC 命令被调用时, 所有队列中的命令才会被执行。
另一方面, 通过调用 DISCARD
, 客户端可以清空事务队列, 并放弃执行事务。
以下是一个事务例子, 它原子地增加了 foo 和 bar 两个键的值:
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1
EXEC 命令的返回值是一个数组, 数组中的每个元素都是执行事务中的命令所产生的返回值。 其中, 结果的先后顺序和命令发送的先后顺序一致。
当客户端处于事务状态时, 所有传入的命令都会返回一个内容为 QUEUED 的状态回复(status reply), 这些被入队的命令将在 EXEC 命令被调用时执行。
使用事务时可能会遇上以下两种错误:
在执行 EXEC 之前,入队的命令可能会出错。 比如说,命令可能会产生语法错误(参数数量错误,参数名错误,等等),或者其他更严重的错误,比如内存不足(如果服务器使用 maxmemory 设置了最大内存限制的话)。
在 EXEC 调用之后失败。 如,事务中的命令可能处理了错误类型的键,将列表命令用在了字符串键上面,诸如此类。
如果有命令在入队时失败,大部分客户端都会停止并取消这个事务。
对于发生在 EXEC 执行之前的错误,客户端以前的做法是检查命令入队所得的返回值:如果命令入队时返回 QUEUED ,那么入队成功;否则,就是入队失败。
从 Redis 2.6.5 开始,服务器会对命令入队失败的情况进行记录,并在客户端调用 EXEC 命令时,拒绝执行并自动放弃这个事务。
在 Redis 2.6.5 以前, Redis 只执行事务中那些入队成功的命令,而忽略那些入队失败的命令。 而新的处理方式则使得在流水线(pipeline)中包含事务变得简单,因为发送事务和读取事务的回复都只需要和服务器进行一次通讯。
那些在 EXEC 命令执行之后所产生的错误, 并没有对它们进行特别处理: 即使事务中有某个/某些命令在执行时产生了错误, 事务中的其他命令仍然会继续执行。
事务中有某条/某些命令执行失败了, 事务队列中的其他命令仍然会继续执行 —— Redis 不会停止执行事务中的命令。
以下是这种做法的优点:
Redis 命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。 有种观点认为 Redis 处理事务的做法会产生 bug , 然而需要注意的是, 在通常情况下, 回滚并不能解决编程错误带来的问题。 举个例子, 如果你本来想通过 INCR 命令将键的值加上 1 , 却不小心加上了 2 , 又或者对错误类型的键执行了 INCR , 回滚是没有办法处理这些情况的。
鉴于没有任何机制能避免程序员自己造成的错误, 并且这类错误通常不会在生产环境中出现, 所以 Redis 选择了更简单、更快速的无回滚方式来处理事务。
当执行 DISCARD 命令时, 事务会被放弃, 事务队列会被清空, 并且客户端会从事务状态中退出:
redis> SET foo 1
OK
redis> MULTI
OK
redis> INCR foo
QUEUED
redis> DISCARD
OK
redis> GET foo
"1"
WATCH
命令可以为 Redis 事务提供 check-and-set (CAS)行为。
被 WATCH 的键会被监视,并会发觉这些键是否被改动过了。 如果有至少一个被监视的键在 EXEC 执行之前被修改了, 那么整个事务都会被取消, EXEC 返回空多条批量回复(null multi-bulk reply)来表示事务已经失败。
举个例子, 假设我们需要原子性地为某个值进行增 1 操作(假设 INCR 不存在)。
首先我们可能会这样做:
val = GET mykey
val = val + 1
SET mykey $val
上面的这个实现在只有一个客户端的时候可以执行得很好。 但是, 当多个客户端同时对同一个键进行这样的操作时, 就会产生竞争条件。
举个例子, 如果客户端 A 和 B 都读取了键原来的值, 比如 10 , 那么两个客户端都会将键的值设为 11 , 但正确的结果应该是 12 才对。
有了 WATCH , 我们就可以轻松地解决这类问题了:
WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC
使用上面的代码, 如果在 WATCH 执行之后, EXEC 执行之前, 有其他客户端修改了 mykey 的值, 那么当前客户端的事务就会失败。 程序需要做的, 就是不断重试这个操作, 直到没有发生碰撞为止。
这种形式的锁被称作乐观锁, 它是一种非常强大的锁机制。 并且因为大多数情况下, 不同的客户端会访问不同的键, 碰撞的情况一般都很少, 所以通常并不需要进行重试。
WATCH 使得 EXEC 命令需要有条件地执行: 事务只能在所有被监视键都没有被修改的前提下执行, 如果这个前提不能满足的话,事务就不会被执行。
如果你使用 WATCH 监视了一个带过期时间的键, 那么即使这个键过期了, 事务仍然可以正常执行 WATCH 命令可以被调用多次。 对键的监视从 WATCH 执行之后开始生效, 直到调用 EXEC 为止。
用户还可以在单个 WATCH 命令中监视任意多个键, 就像这样:
redis> WATCH key1 key2 key3
OK
当 EXEC 被调用时, 不管事务是否成功执行, 对所有键的监视都会被取消。
另外, 当客户端断开连接时, 该客户端对键的监视也会被取消。
使用无参数的 UNWATCH
命令可以手动取消对所有键的监视。 对于一些需要改动多个键的事务, 有时候程序需要同时对多个键进行加锁, 然后检查这些键的当前值是否符合程序的要求。 当值达不到要求时, 就可以使用 UNWATCH 命令来取消目前对键的监视, 中途放弃这个事务, 并等待事务的下次尝试。
Redis 支持简单且易用的主从复制(master-slave replication)功能 以下是关于 Redis 复制功能的几个重要方面:
不过, 在从服务器删除旧版本数据集并载入新版本数据集的那段时间内, 连接请求会被阻塞。
你还可以配置从服务器, 让它在与主服务器之间的连接断开时, 向客户端发送一个错误。
复制功能可以单纯地用于数据冗余, 也可以通过让多个从服务器处理只读命令请求来提升扩展性如常用的读写分离。写操作在主服务器上执行,读操作在从服务器。redis 默认也是这么配置的,从服务器默认只支持读操作。
可以通过复制功能来让主服务器免于执行持久化操作: 只要关闭主服务器的持久化功能, 然后由从服务器去执行持久化操作即可。
即使有多个从服务器同时向主服务器发送 SYNC , 主服务器也只需执行一次 BGSAVE 命令, 就可以处理所有这些从服务器的同步请求。
从服务器可以在主从服务器之间的连接断开时进行自动重连, 在 Redis 2.8 版本之前, 断线之后重连的从服务器总要执行一次完整重同步操作, 但是从 Redis 2.8 版本开始, 从服务器可以根据主服务器的情况来选择执行完整重同步还是部分重同步。
从 Redis 2.8 开始, 在网络连接短暂性失效之后, 主从服务器可以尝试继续执行原有的复制进程, 而不一定要执行完整重同步操作。
这个特性需要主服务器为被发送的复制流创建一个内存缓冲区, 并且主服务器和所有从服务器之间都记录一个复制偏移量和一个主服务器 ID, 当出现网络连接断开时, 从服务器会重新连接, 并且向主服务器请求继续执行原来的复制进程:
如果从服务器记录的主服务器 ID 和当前要连接的主服务器的 ID 相同, 并且从服务器记录的偏移量所指定的数据仍然保存在主服务器的复制流缓冲区里面, 那么主服务器会向从服务器发送断线时缺失的那部分数据, 然后复制工作可以继续执行。
否则的话, 从服务器就要执行完整重同步操作。
Redis 2.8 的这个部分重同步特性会用到一个新增的 PSYNC 内部命令, 而 Redis 2.8 以前的旧版本只有 SYNC 命令, 不过, 只要从服务器是 Redis 2.8 或以上的版本, 它就会根据主服务器的版本来决定到底是使用 PSYNC 还是 SYNC :
如果主服务器是 Redis 2.8 或以上版本,那么从服务器使用 PSYNC 命令来进行同步。
如果主服务器是 Redis 2.8 之前的版本,那么从服务器使用 SYNC 命令来进行同步。
配置一个从服务器非常简单, 只要在配置文件中增加以下的这一行就可以了:
slaveof 192.168.1.1 6379
当然, 你需要将代码中的 192.168.1.1 和 6379 替换成你的主服务器的 IP 和端口号。
另外一种方法是调用 SLAVEOF 命令, 输入主服务器的 IP 和端口, 然后同步就会开始:
127.0.0.1:6379> SLAVEOF 192.168.1.1 10086
OK
从 Redis 2.6 开始, 从服务器支持只读模式, 并且该模式为从服务器的默认模式。
只读模式由 redis.conf 文件中的 slave-read-only
选项控制, 也可以通过 CONFIG SET 命令来开启或关闭这个模式。
只读从服务器会拒绝执行任何写命令, 所以不会出现因为操作失误而将数据不小心写入到了从服务器的情况。
如果主服务器通过 requirepass 选项设置了密码, 那么为了让从服务器的同步操作可以顺利进行, 我们也必须为从服务器进行相应的身份验证设置。
对于一个正在运行的服务器, 可以使用客户端输入以下命令:
config set masterauth
要永久地设置这个密码, 那么可以将它加入到配置文件中:
masterauth youpassword
从 Redis 2.8 开始, 为了保证数据的安全性, 可以通过配置, 让主服务器只在有至少 N 个当前已连接从服务器的情况下, 才执行写命令,为的是保证写入的内容会成功的传输到从机上。
不过, 因为 Redis 使用异步复制, 所以主服务器发送的写数据并不一定会被从服务器接收到, 因此, 数据丢失的可能性仍然是存在的。
以下是这个特性的运作原理:
主库只RDB备份,从机AOF 从机 read-only
若主机挂了,不要直接重启;会把从机的AOF覆盖
info commandstats/config resetstat 查看所有命令的统计信息,后面是重置统计
redis-cli -h localhost -p 6379 client list | grep -v "omem=0" 快速找到引起阻塞的
命令 删除大的列表时的阻塞–不直接删除这种大的集合,而是将他们重命名然后后台跑一个删除进程慢慢删
用 scan 代替 key*,孩子一次扫描过多的key 可以按主-从-从结构。这样主机挂后,第一从机直接升为主机就行了。
Redis 集群是一个可以在多个 Redis 节点之间进行数据共享的设施。
Redis 集群不支持那些需要同时处理多个键 的 Redis 命令, 因为执行这些命令需要在多个 Redis 节点之间移动数据, 并且在高负载的情况下, 这些命令将降低 Redis 集群的性能, 并导致不可预测的行为。
Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。
Redis 集群提供了以下两个好处:
Redis 集群使用数据分片(sharding)而非一致性哈希(consistency hashing)来实现:
一个 Redis 集群包含 16384 个哈希槽(hash slot), 数据库中的每个键都属于这 16384 个哈希槽的其中一个, 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
集群中的每个节点负责处理一部分哈希槽。
举个例子, 一个集群可以有三个哈希槽, 其中:
节点 A 负责处理 0 号至 5500 号哈希槽。 节点 B 负责处理 5501 号至 11000 号哈希槽。 节点 C 负责处理 11001 号至 16384 号哈希槽。
这种将哈希槽分布到不同节点的做法使得用户可以很容易地向集群中添加或者删除节点。 比如说:
如果用户将新节点 D 添加到集群中, 那么集群只需要将节点 A 、B 、 C 中的某些槽移动到节点 D 就可以了。
如果用户要从集群中移除节点 A , 那么集群只需要将节点 A 中的所有哈希槽移动到节点 B 和节点 C , 然后再移除空白(不包含任何哈希槽)的节点 A 就可以了。
因为将一个哈希槽从一个节点移动到另一个节点不会造成节点阻塞, 所以无论是添加新节点还是移除已存在节点, 又或者改变某个节点包含的哈希槽数量, 都不会造成集群下线。
为了使得集群在一部分节点下线或者无法与集群的大多数节点进行通讯的情况下, 仍然可以正常运作, Redis 集群对节点使用了主从复制功能:
集群中的每个节点都有 1 个至 N 个复制品(replica), 其中一个复制品为主节点(master), 而其余的 N-1 个复制品为从节点(slave)。
在之前列举的节点 A 、B 、C 的例子中, 如果节点 B 下线了, 那么集群将无法正常运行, 因为集群找不到节点来处理 5501 号至 11000 号的哈希槽。
另一方面, 假如在创建集群的时候(或者至少在节点 B 下线之前), 我们为主节点 B 添加了从节点 B1 , 那么当主节点 B 下线的时候, 集群就会将 B1 设置为新的主节点, 并让它代替下线的主节点 B , 继续处理 5501 号至 11000 号的哈希槽, 这样集群就不会因为主节点 B 的下线而无法正常运作了。
不过如果节点 B 和 B1 都下线的话, Redis 集群还是会停止运作。
Redis 集群不保证数据的强一致性(strong consistency): 在特定条件下, Redis 集群可能会丢失已经被执行过的写命令。
使用异步复制(asynchronous replication)是 Redis 集群可能会丢失写命令的其中一个原因。
考虑以下这个写命令的例子:
主节点对命令的复制工作发生在返回命令回复之后, 因为如果每次处理命令请求都需要等待复制操作完成的话, 那么主节点处理命令请求的速度将极大地降低 —— 我们必须在性能和一致性之间做出权衡。
Redis 集群另外一种可能会丢失命令的情况,如下例:
假设集群包含 A 、 B 、 C 、 A1 、 B1 、 C1 六个节点, 其中 A 、B 、C 为主节点, 而 A1 、B1 、C1 分别为三个主节点的从节点, 另外还有一个客户端 Z1 。
假设集群中发生网络分裂, 那么集群可能会分裂为两方, 大多数(majority)的一方包含节点 A 、C 、A1 、B1 和 C1 , 而少数(minority)的一方则包含节点 B 和客户端 Z1 。这时候分裂的两方是互相不通的,即:A、C 等会认为 B 已经下线,且客户端 Z1 只能和 B 连通。
在网络分裂期间, 主节点 B 仍然会接受 Z1 发送的写命令:
如果网络分裂出现的时间很短, 那么集群会继续正常运行;
但是, 如果网络分裂出现的时间足够长, 使得大多数一方将从节点 B1 设置为新的主节点, 并使用 B1 来代替原来的主节点 B , 那么 Z1 发送给主节点 B 的写命令将丢失。
注意, 在网络分裂出现期间, 客户端 Z1 可以向主节点 B 发送写命令的最大时间是有限制的, 这一时间限制称为节点超时时间(node timeout), 是 Redis 集群的一个重要的配置选项:
对于大多数一方来说, 如果一个主节点未能在节点超时时间所设定的时限内重新联系上集群, 那么集群会将这个主节点视为下线, 并使用从节点来代替这个主节点继续工作。
对于少数一方, 如果一个主节点未能在节点超时时间所设定的时限内重新联系上集群, 那么它将停止处理写命令, 并向客户端报告错误。
Redis 集群由多个运行在集群模式(cluster mode)下的 Redis 实例组成, 实例的集群模式需要通过配置来开启, 开启集群模式的实例将可以使用集群特有的功能和命令。
以下是一个包含了最少选项的集群配置文件示例:
port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
文件中的 cluster-enabled
选项用于开实例的集群模式, 而 cluster-conf-file 选项则设定了保存节点配置文件的路径, 默认值为 nodes.conf 。
节点配置文件无须人为修改, 它由 Redis 集群在启动时创建, 并在有需要时自动进行更新。
要让集群正常运作至少需要三个主节点, 不过在刚开始试用集群功能时, 强烈建议使用六个节点: 其中三个为主节点, 而其余三个则是各个主节点的从节点。
首先, 让我们进入一个新目录, 并创建六个以端口号为名字的子目录, 稍后我们在将每个目录中运行一个 Redis 实例:
mkdir cluster-test
cd cluster-test
mkdir 7000 7001 7002 7003 7004 7005
在文件夹 7000 至 7005 中, 各创建一个 redis.conf 文件, 文件的内容可以使用上面的示例配置文件, 但记得将配置中的端口号从 7000 改为与文件夹名字相同的号码。
现在, 从 Redis Github 页面 的 unstable 分支中取出最新的 Redis 源码, 编译出可执行文件 redis-server , 并将文件复制到 cluster-test 文件夹, 然后使用类似以下命令, 在每个标签页中打开一个实例:
cd 7000
../redis-server ./redis.conf
实例打印的日志显示, 因为 nodes.conf 文件不存在, 所以每个节点都为它自身指定了一个新的 ID :
[82462] 26 Nov 11:56:55.329 * No cluster configuration found, I'm 97a3a64667477371c4479320d683e4c8db5858b1
实例会一直使用同一个 ID , 从而在集群中保持一个独一无二的名字。
每个节点都使用 ID 而不是 IP 或者端口号来记录其他节点, 因为 IP 地址和端口号都可能会改变, 而这个独一无二的标识符(identifier)则会在节点的整个生命周期中一直保持不变。
我们将这个标识符称为节点 ID。
现在我们已经有了六个正在运行中的 Redis 实例, 接下来我们需要使用这些实例来创建集群, 并为每个节点编写配置文件。
通过使用 Redis 集群命令行工具 redis-trib
, 编写节点配置文件的工作可以非常容易地完成: redis-trib 位于 Redis 源码的 src 文件夹中, 它是一个 Ruby 程序, 这个程序通过向实例发送特殊命令来完成创建新集群, 检查集群, 或者对集群进行重新分片(reshared)等工作。
我们需要执行以下命令来创建集群:
./redis-trib.rb create --replicas 1 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
命令的意义如下:
简单来说, 以上命令的意思就是让 redis-trib 程序创建一个包含三个主节点和三个从节点的集群。
接着, redis-trib 会打印出一份预想中的配置给你看, 如果你觉得没问题的话, 就可以输入 yes , redis-trib 就会将这份配置应用到集群当中:
>>> Creating cluster
Connecting to node 127.0.0.1:7000: OK
Connecting to node 127.0.0.1:7001: OK
Connecting to node 127.0.0.1:7002: OK
Connecting to node 127.0.0.1:7003: OK
Connecting to node 127.0.0.1:7004: OK
Connecting to node 127.0.0.1:7005: OK
>>> Performing hash slots allocation on 6 nodes...
Using 3 masters:
127.0.0.1:7000
127.0.0.1:7001
127.0.0.1:7002
127.0.0.1:7000 replica #1 is 127.0.0.1:7003
127.0.0.1:7001 replica #1 is 127.0.0.1:7004
127.0.0.1:7002 replica #1 is 127.0.0.1:7005
M: 9991306f0e50640a5684f1958fd754b38fa034c9 127.0.0.1:7000
slots:0-5460 (5461 slots) master
M: e68e52cee0550f558b03b342f2f0354d2b8a083b 127.0.0.1:7001
slots:5461-10921 (5461 slots) master
M: 393c6df5eb4b4cec323f0e4ca961c8b256e3460a 127.0.0.1:7002
slots:10922-16383 (5462 slots) master
S: 48b728dbcedff6bf056231eb44990b7d1c35c3e0 127.0.0.1:7003
S: 345ede084ac784a5c030a0387f8aaa9edfc59af3 127.0.0.1:7004
S: 3375be2ccc321932e8853234ffa87ee9fde973ff 127.0.0.1:7005
Can I set the above configuration? (type 'yes' to accept): yes
输入 yes 并按下回车确认之后, 集群就会将配置应用到各个节点, 并连接起(join)各个节点 —— 也即是, 让各个节点开始互相通讯:
>>> Nodes configuration updated
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join...
>>> Performing Cluster Check (using node 127.0.0.1:7000)
M: 9991306f0e50640a5684f1958fd754b38fa034c9 127.0.0.1:7000
slots:0-5460 (5461 slots) master
M: e68e52cee0550f558b03b342f2f0354d2b8a083b 127.0.0.1:7001
slots:5461-10921 (5461 slots) master
M: 393c6df5eb4b4cec323f0e4ca961c8b256e3460a 127.0.0.1:7002
slots:10922-16383 (5462 slots) master
M: 48b728dbcedff6bf056231eb44990b7d1c35c3e0 127.0.0.1:7003
slots: (0 slots) master
M: 345ede084ac784a5c030a0387f8aaa9edfc59af3 127.0.0.1:7004
slots: (0 slots) master
M: 3375be2ccc321932e8853234ffa87ee9fde973ff 127.0.0.1:7005
slots: (0 slots) master
[OK] All nodes agree about slots configuration.
如果一切正常的话, redis-trib 将输出以下信息:
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.
这表示集群中的 16384 个槽都有至少一个主节点在处理, 集群运作正常。
Redis 集群现阶段的一个问题是客户端实现很少。 以下是一些我知道的实现:
redis-rb-cluster 是作者编写的 Ruby 实现, 用于作为其他实现的参考。 该实现是对 redis-rb 的一个简单包装, 高效地实现了与集群进行通讯所需的最少语义。
redis-py-cluster 看上去是 redis-rb-cluster 的一个 Python 版本, 这个项目有一段时间没有更新了(最后一次提交是在六个月之前), 不过可以将这个项目用作学习集群的起点。
流行的 Predis 曾经对早期的 Redis 集群有过一定的支持, 但不确定它对集群的支持是否完整, 也不清楚它是否和最新版本的 Redis 集群兼容 (因为新版的 Redis 集群将槽的数量从 4k 改为 16k 了)。
Redis unstable 分支中的 redis-cli 程序实现了非常基本的集群支持, 可以使用命令 redis-cli -c 来启动。
测试 Redis 集群比较简单的办法就是使用 redis-rb-cluster 或者 redis-cli , 接下来我们将使用 redis-cli 为例来进行演示:
$ redis-cli -c -p 7000
redis 127.0.0.1:7000> set foo bar
-> Redirected to slot [12182] located at 127.0.0.1:7002
OK
redis 127.0.0.1:7002> set hello world
-> Redirected to slot [866] located at 127.0.0.1:7000
OK
redis 127.0.0.1:7000> get foo
-> Redirected to slot [12182] located at 127.0.0.1:7002
"bar"
redis 127.0.0.1:7000> get hello
-> Redirected to slot [866] located at 127.0.0.1:7000
"world"
redis-cli 对集群的支持是非常基本的, 所以它总是依靠 Redis 集群节点来将它转向(redirect)至正确的节点。
一个真正的集群客户端应该做得比这更好: 它应该用缓存记录起哈希槽与节点地址之间的映射(map), 从而直接将命令发送到正确的节点上面。
这种映射只会在集群的配置出现某些修改时变化, 比如在一次故障转移之后,或者系统管理员通过添加节点或移除节点来修改了集群的布局之后, 诸如此类。
假设一个桶中有 7 个石头,三个灰色,四个黑色。从桶中取出一个石头,是灰色的概率则是 3/7[P(gray)]
;黑色的是 4/7[P(black)]
。
如果这 7 个石头分布在两个桶中。A桶中两灰,两黑;B桶中一灰两黑。单独计算某一个桶中某颜色的概率比较简单,如:A桶中灰色的颜色:P(gray|A)
。"|"
的意思是给定某条件,读作 given。
P(A | B) = P(AB) / P(B)
要计算两个桶中某颜色的概率,就稍微麻烦一些。它的计算公式是:
P(gray| B) = P(gray and B)/P(B)
解释为:B桶中取出灰色的概率 = 又是灰色且正好在B桶中的概率 除以 正好在B桶中的概率。
验证:
B 桶中取出灰色的概率:
P(gray | B) = 1/3
是灰色且在 B 桶中的概率:
P(gray and B) = 1/7
在 B 桶中的概率:
P(B) = 3/7
P(gray and B)/P(B) = (1/7) / (3/7) = 1/3
说明公式正确。
P(A) = P(A | B<sub>1</sub>)P(B<sub>1</sub>) + P(A | B<sub>2</sub>)P(B<sub>2</sub>) + ... + P(A | B<sub>i</sub>)P(B<sub>i</sub>) = ∑P(A | B<sub>i</sub>)/P(B<sub>i</sub>)
取出灰色的概率:P(gray) = P(gray | A) P(A) + P(gray | B) P(B)
解释为:灰色的概率是在A桶中灰色的概率除以在A桶中的概率 加上在 B 桶中灰色的概率除以在B桶中的概率。如果有 N 个桶,则扩展到 N。即上面的全概率公式。
验证:
取出灰色的概率是 3/7
A 桶中取出灰色的概率:
P(gray | A) = 1/2
在 A 桶中的概率:
P(A) = 4/7
A 桶中取出灰色的概率:
P(gray | B) = 1/3
在 B 桶中的概率:
P(B) = 3/7
P(gray) = 1/2 * 4/7 + 1/3 * 3/7 = 3/7
可以看出,公式正确。
P(B<sub>i</sub> | A) = P(A | B<sub>i</sub>) * P(B<sub>i</sub>) / ∑P(A | B<sub>j</sub>) * P(B<sub>j</sub>)
推导:
根据条件概率公式,变形:
P(B<sub>i</sub> | A) = P(B<sub>i</sub>A) / P(A)
根据全概率公式,将 P(A)的公式代入得到:
P(B<sub>i</sub> | A) = P(AB<sub>i</sub>) / ∑P(A | Bi)/P(Bi)
而根据概率公式,P(AB<sub>i</sub>) = P(A | B<sub>i</sub>)P(B<sub>i</sub>)
再次将值代入公式,得到:
P(B<sub>i</sub> | A) = P(A | B<sub>i</sub>)P(B<sub>i</sub>) / ∑P(A | Bi)/P(Bi)
示例:已知 p(x|c),要求 p(c|x)
p(c|x) = p(x|c)p(c) / p(x)
验证:
p(black |B) = p(B|black)p(black) / p(B)
在 B 桶中,黑色的概率:
p(black |B) = 2/3
在 B 桶中的概率:
p(B) = 3/7
黑色的概率:
p(black) = 4/7
黑色,且在 B 桶中的概率:
p(B |black) = 1/2
由此看来等式正确。
公式又可以简化为: p(c | x) = p(x | c)p(c) / p(x)
案例
8 支步枪中有 5 支校准过,3支未校准。一名射手,用校准过的枪射击,中靶概率为 0.8;用未校准的射击中靶概率为 0.3;现在从 8 支枪中随机取一支射击,结果中靶。求该枪是已经校准的机率。
将校准过的概率标识为 A, 未校准的标识为 B。中靶标识为 D, 未中标识为 N。则有:
P(D | A) = 0.8
P(D | B) = 0.3
P(A) = 5 / 8
P(B) = 3 / 8
现在要求: P(A | D)
根据公式,可得到:
P(A | D) = P(D | A) * P(A) / ∑P( D | A<sub>i</sub>) * P(A<sub>i</sub>) = P(D | A) * P(A) / (P(D | A) * P(A) + P(D | B) * P(B)) = 0.8 * 5/8 / (0.8 * 5/8 + 0.3 * 3/8) = 0.8163
对于离散型随机变量x,定义一个概率函数叫f(x),它给出了随机变量取每一个值的概率。这个函数就是分布律。
比如扔硬币,我们想知道字朝上的概率。假设字朝上用 1 表示,朝下用 0 表示。那就是找出一个函数 f(x) 求 f(1)。
期望(或均值)是试验中每次可能结果的概率乘以其结果的总和。它反映随机变量平均取值的大小。用数学公式描述就是:E(x) = ∑xp 。p 是概率,也就是概率分布率函数;x 是值。
案例
甲乙两个人赌博,他们两人获胜的机率相等,比赛规则是先胜三局者为赢家,赢家可以获得100法郎的奖励。当比赛进行到第四局的时候,甲胜了两局,乙胜了一局,这时由于某些原因中止了比赛,那么如何分配这100法郎才比较公平?
用概率论的知识,不难得知,甲获胜的可能性大,甲赢了第四局,或输掉了第四局却赢了第五局,概率为 1/2+(1/2) * (1/2)=3/4。分析乙获胜的可能性,乙赢了第四局和第五局,概率为(1/2) * (1/2)=1/4。因此由此引出了甲的期望所得值为100*3/4=75法郎,乙的期望所得值为25法郎。这个故事里出现了“期望”这个词,数学期望由此而来。
方差在概率论用来度量随机变量和其数学期望(即均值)之间的偏离程度。 方差在统计中是每个样本值与全体样本值的平均数之差的平方值的平均数。 它们两者的公式不一样,通常我们都是指概率论中的。公式是:
D(x)=E{ [ x - E(x) ]2 } = E(x2) - [ E(x) ]2
案例
现在有一个运营活动,一等奖 1000 元,二等奖 500 元,三等奖 100 元。每抽一次需要 10 元。 两套抽奖概率方案,如下:
一等奖 | 二等奖 | 三等奖 | 未中奖 | |
方案一中奖机率 | 5% | 10% | 20% | 65% |
方案二中奖机率 | 10% | 10% | 10% | 70% |
分别计算两种方案的期望: 方案一: E(x) = (-990 * 5%)+(-490 * 10%)+(-90 * 20%)+(10 * 65%) = -110 也就是说,A方案能够期望每次抽奖运营方亏损110元。
方案二: E(x) = (-990 * 10%)+(-490 * 10%)+(-90 * 10%)+(10 * 70%) = -150
这样我们就能控制中奖的比例来保证盈利了。
期望值衡量概率的平均值,可是抽奖本来就是很激动人心的事情,哪怕明知道会赔钱,人们还乐此不疲,为什么?因为风险,因为以小搏大。方差就是这种风险的度量,方差越大,随机变量的结果越不稳定。
D(x)=E{ [ x - E(x) ]2 }
方案一:D(x) = 5% * (-990+110)2 + 10% * (-490+110)2 + 20% * (-90+110)2 + 65% * (10 + 110)2 = 62600
均方差为 √ 62600 = 250.19, 表示每一次抽奖,实际收益与期望收益 -100 相差 250.19。
方案二:D(x) = 10% * (-990+110)2 + 10% * (-490+110)2 + 10% * (-90+110)2 + 70% * (10 + 110)2 = 96000
均方差是 309.84。可以看出,无论是盈利还是风险,都比方案一要差。
概率分布用以表述随机变量取值的概率规律。为了使用的方便,根据随机变量所属类型的不同,概率分布取不同的表现形式。
事件的概率表示了一次试验某一个结果发生的可能性大小。若要全面了解试验,则必须知道试验的全部可能结果及各种可能结果发生的概率,即必须知道随机试验的概率分布(probability distribution)。
0-1分布/贝努利分布/两点分布 一个只有两个可能结果的试验,比如正面或反面,成功或失败,有缺陷或没有缺陷,病人康复或未康复。为方便起见,记这两个可能的结果为0和1。
假设结果为 1 的概率为 p,那么为 0 的概率就是 1-p。
概率函数(分布律)可以表示为:
P(x;p) = px * (1-p)1-x
其中,x 的值是 0 或 1。
验证: P(1;p) = p1 * (1-p)1-1 = p
P(0;p) = p0 * (1-p)1-1 = 1-p
说明公式正确。
这时候我们可以计算两点分布的期望和方差:
期望: E(x) = 1 * p + 0 * (1-p) = p
E(x2) = 12 * p + 02 * (1-p) = p
方差: D(x) = E(x2) - [E(x)]2 = p - p2 = p(1-p)
二项分布 二点分布是指一次事件发生时的情况,结果只有 1 和 0 两个点。如果事件发生多次,那么结果就会出现多个 1 和多个 0,相当于有两项。所以,多次两点分布就叫二项分布。
两点分布的概率是指一次试验,事情发生概率。也就是结果为 1 时的概率。如果有 N 次试验,第一次是 1,第二次是 0,第三次是 0… 第 N 次是 1。
那么,二项分布的概率就是指出现我们试验结果这种情况的概率。也就是出现第一次是1,第二次是 0 ,第三次是0,第N次是1 这种情况的概率。因为每一次试验结果都有可能是 0 或 1,所以可能第一次试验是0,第二次是1。
二项分布的概率分布率函数是:P(k) = Cknpk * (1-p)n-k
k近邻算法是根据与他接近的点进行比较,将最接近的结果返回做为推算结果。 决策树是总结出规律,并进行计算。
现在可以看出,k近邻算法通常是对一些数字类型的值进行计算距离(通常要进行归一处理),数据量越大,推测越准确。
决策树则可以从较小的数据中发现规律,总结一套流程的顺序。
有时候,我们得到了几个可能的结果;需要知道各个结果的概率是多少以此来判断。这就是贝叶斯。
优点:在数据较小的时候也可以用,可以处理的问题非常多。 缺点:对输入的数据格式要求较高。
网站上经常会有评论功能,而评论需要屏蔽一些侮辱性质的语言。如何判断一个评论是否是有侮辱性?我们可以事先整理好一个词库,把一些词填充进去,如果评论中出现该词就认为它是侮辱性的。但这样不够智能,假如我们已经通过这种方式积累了一批数据,我们可以通过概率,让机器自己去找出哪个词最有可能是侮辱性的,然后再判断用户提交的评论里是否包含这些词,从而实现自动扩充词库并判定。
所以,我们需要做的是:计算出某段话是侮辱性的概率,即计算 p(c1|w)
c1 表示是侮辱性;w 表示文本。由于文本是由多个单词组成的,于是上面的公式又可以转化为:p(c1|w0)p(c1|w1)...p(c1|wn)
单独拿出一个来观察:p(c1|w0)
表示文本中第一个单词是侮辱性的概率。它的计算方式是:
p(c1|w0) = p(w0|c1)p(c1) / p(w0)
p(c1) 表示侮辱性文本的概率,可以用现有数据中是侮辱性的文档除以文档总数。
p(w0) 表示某单词出现的概率,可以用 w0 出现的次数除以总的单词数。
p(w0|c1) 表示在侮辱性的文档中,w0 出现的概率。那就用侮辱性文本中 w0 出现的次数除以侮辱性文本单词总数。
思路:遍历现有所有数据,得到侮辱性和非侮辱性的文本总数以及单词总数。以及在两个类型数据中单词 w0 出现的次数。根据公式,分别计算出 w0 在两个类型中的概率。较大概率者即为该词所属的类型。
代码:
```
from numpy import *
def loadDataSet(): # 构建几个测试用的语句。每个语句都按单词进行了分割。并且对每个语句都进行了是否侮辱性质的设置。为了方便后面的计算。 postingList = [[‘我’, ‘的’, ‘天’, ‘啊’, ‘你’, ‘真’, ‘胖’], [‘今’, ‘真’, ‘是’, ‘日’, ‘了’, ‘狗’, ‘了’, ‘啊’], [‘我’, ‘非’, ‘常’, ‘喜’, ‘欢’, ‘这’, ‘样’, ‘的’, ‘你’], [‘你’, ‘是’, ‘不’, ‘是’, ‘傻’], [‘真’, ‘不’, ‘知’, ‘道’, ‘怎’, ‘么’, ‘说’, ‘你’, ‘了’], [‘你’, ‘傻’, ‘B’, ‘了’, ‘吧’, ‘这’, ‘么’, ‘怂’]] # 1 是侮辱性的;0不是 classVec = [0, 1, 0, 1, 0, 1] # 1 is abusive, 0 not return postingList, classVec
def createVocabList(dataSet): # 创建空集合 vocabSet = set([]) # 遍历数据集的每一行.把整个文档的进行去重 for document in dataSet: vocabSet = vocabSet | set(document) return list(vocabSet)
def setOfWords2Vec(vocabList, inputSet): # 创建空的新向量. returnVec = [0] * len(vocabList) # 遍历输入文本(是个单词向量).如果文本向量中的单词 for word in inputSet: if word in vocabList: returnVec[vocabList.index(word)] = 1 else: print “单词: %s 不在词库中!” % word return returnVec
def trainNB0(trainMatrix, trainCategory): # 文本数(即向量数) numTrainDocs = len(trainMatrix) # 去重后,向量的长度,即单词总数 numWords = len(trainMatrix[0]) # 侮辱性的文本数除以总文本数。即侮辱性文本的概率。 # 因为侮辱性的值是 1,所以 sub(trainCategory) 即侮辱性文本的个数 pAbusive = sum(trainCategory) / float(numTrainDocs) # 初始化一个正常文本的向量 p0Num = zeros(numWords) # 初始化一个侮辱文本的向量 p1Num = zeros(numWords) # 正常属性的单词数 p0Denom = 0.0 # 侮辱性的单词数 p1Denom = 0.0 # 遍历每个文本.如果文本是侮辱性的,就把当前文本向量加到侮辱性向量上。最终得到的侮辱性向量是所有侮辱性文本的累加 # 通过遍历该向量就知道哪个词出现的频率最高 # 同理,如果文本不是侮辱性的,就把文本加到正常向量上 for i in range(numTrainDocs): if trainCategory[i] == 1: p1Num += trainMatrix[i] p1Denom += sum(trainMatrix[i]) else: p0Num += trainMatrix[i] p0Denom += sum(trainMatrix[i])
p1Vect = p1Num / p1Denom
p0Vect = p0Num / p0Denom
return p0Vect, p1Vect, pAbusive
def classifyNB(vec2Classify, p0Vec, p1Vec, pClass1): p1 = sum(vec2Classify * p1Vec) + log(pClass1) p0 = sum(vec2Classify * p0Vec) + log(1.0 - pClass1) if p1 > p0: return 1 else: return 0
def testingNB(): listOPosts, listClasses = loadDataSet() myVocabList = createVocabList(listOPosts) trainMat = [] # 把测试数据的每一行都转换成向量.并合并在一起 for postinDoc in listOPosts: trainMat.append(setOfWords2Vec(myVocabList, postinDoc)) # 训练算法 p0V, p1V, pAb = trainNB0(array(trainMat), array(listClasses)) # 输出词库 print “输出词库: “ for loopWord in myVocabList: print loopWord # 正常内容的向量 print “正常内容的向量: “ print p0V # 侮辱性内容的向量 print “侮辱性内容的向量” print p1V
testEntry = ['爱', '你', '哟']
thisDoc = array(setOfWords2Vec(myVocabList, testEntry))
print "".join(testEntry), '的类型是: ', classifyNB(thisDoc, p0V, p1V, pAb)
testEntry = ['狗', '日']
thisDoc = array(setOfWords2Vec(myVocabList, testEntry))
print "".join(testEntry), '的类型是: ', classifyNB(thisDoc, p0V, p1V, pAb)
testingNB()