#ELK—>logstash—>ruby·plugin—>实现精彩的功能(term模板抽取)
##效果展示
抽取之前这样出图
抽取之后这样出图
甚至我们之后还想出更花哨的图~唯一的方式就是我们去抽取索引(当然如果日志本身是json格式的就能直接使用了)
##抽取流程图
##日志源 日志源是这样的————只列出几条,作为演示 2016-04-27 00:00:13,152 [INFO ] com.xxxxxxx.UserController - 用户登录成功, loginFrom:1, phoneNo:138 3838 4388
2016-04-27 00:00:14,791 [INFO ] com.xxxxxxx.UserController - 用户注册成功, phoneNo:181 6777 8888, verificationCode:0001, registerFrom: 2
##需求 loginFrom 和 registerFrom 意思是从哪个app端登录的,后面的数字1和2 代表不同的app端。(注意,这里我们增加了难度,在registerFrom: 2 中加了几个空格:到底几个不重要,甚至数字字母也行) 比如在mysql中,表结构是这样的
table_name: channel id name 1 量化派 2 阿里巴巴 ... ...我们要从日志中,将loginFrom:1抽成 channelName:量化派; 将registerFrom:2抽成channelName:阿里巴巴;同时增加难度,抽取“用户登录成功”和“用户注册成功”字段,我们称之为user_behavior(用户行为) 不知道大家能不能看透这样做的好处———— 本来在kibana中可能要手工的去匹配每一个filters, 现在我们只需要根据term一个简单操作就搞定。
##0:precondition——ruby安装 jdk ———— 1.8 logstash ———— 我的是2.0.0 ruby ———— 版本 > 1.9.3
ruby建议直接源码安装,[点这儿下载](https://www.ruby-lang.org/en/downloads/) 安装步骤: ①tar zxvf xxx.tar.gz ②cd xxx————进入到解压后的目录 ③./configure ④make && make install 安装完ruby,接着安装ruby的包管理工具 ⑤yum install rubygems 然后安装ruby的mysql连工具(先切换源,才能下载;ruby版本如果低于1.9.3,会安装失败) ⑥gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/ ⑦gem install mysql2
##1:抽取——核心代码 input { kafka { ... } }
filter { ... ruby{ init => '[@kname](http://my.oschina.net/u/1384245) = ["channelName","userBehavior"];@pattern_behavior=eval(File.readlines("/opt/logstash-2.2.0/confs/rb/pattern.rb")[1]);@pattern_channel=eval(File.readlines("/opt/logstash-2.2.0/confs/rb/pattern.rb")[0]);@hashChannel=eval(File.readlines("/opt/logstash-2.2.0/confs/rb/hash_channel.rb")[0]);@hashUserBehavior=eval(File.readlines("/opt/logstash-2.2.0/confs/rb/hash_user_behavior.rb")[0]);' code => "event.append(Hash[@kname.zip(Array[@hashChannel[@pattern_channel.match(event['message']).to_a.compact[1]],@hashUserBehavior[@pattern_behavior.match(event['message']).to_a.compact[0]] ])]) if @pattern_channel.match(event['message'])" } } output { ... }
##2:自动化读取crontab+ruby+mysql
大家应该注意到,上面的hash是去读取文件的(最简版@hashChannel={"1"=>"量化派","2"=>"阿里巴巴"}),为什么这样呢?————假想 后期如果增长到一万个你要手工 去每台机器上去添加吗。。。①ruby 脚本
所以我写了一个ruby脚本(为什么写ruby脚本,因为logstash是用ruby写的,这样能保证一致性)脚本很简单,如下:(read_channel_from_mysql.rb) #导入需要的包 require 'rubygems' require 'mysql2' #连接mysql client = Mysql2::Client.new(:host=>"127.0.0.1",:username=>"root",:password=>"123456",:database=>"database_name") results = client.query("select id,name from table_name"); #计数 count = results.count num = File.readlines("/opt/logstash-2.2.0/confs/rb/mysql_count.rb"); _count = num[0].to_i if count != _count then #生成hash h = Hash.new results.collect do |row| h[row['id'].to_s]=row['name'] end begin testFile = File.open("/opt/logstash-2.2.0/confs/rb/mysql_count.rb","w:UTF-8") testFile.write(count) #写到文件,供logstash读取 file = File.open("/opt/logstash-2.2.0/confs/rb/hash_channel.rb","w:UTF-8") file.write(h) rescue IOError => e ensure testFile.close unless testFile.nil? file.close unless file.nil? end #重启受影响的进程,我习惯配置supervisor的后台管理,觉得好用 `supervisorctl -c /opt/supervisor/conf/supervisord.conf restart progromA` `supervisorctl -c /opt/supervisor/conf/supervisord.conf restart progromB` `supervisorctl -c /opt/supervisor/conf/supervisord.conf restart progromC` end
###②pattern配置 #/opt/logstash-2.2.0/confs/rb/pattern.rb /registerFrom:(\d+)|loginFrom:.+?(\d+)/ /用户注册成功|用户登录成功|send.+?sms.+?(succeed)/ ###③hash配置 /opt/logstash-2.2.0/confs/rb/hash_user_behavior.rb————手工配置 {nil=>"其他","用户注册成功"=>"用户注册成功","用户登录成功"=>"用户登录成功",""succeed"=>"终审通过"} /opt/logstash-2.2.0/confs/rb/hash_channel.rb 这个是mysql直接读出来了,就不贴了,格式和上面的一样 ###④crontab 设置——crontab -e crontab是linux定时执行任务 基本格式 : * * * * * command 分 时 日 月 周 命令 第1列表示分钟1~59 每分钟用*或者 */1表示 第2列表示小时1~23(0表示0点) 第3列表示日期1~31 第4列表示月份1~12 第5列标识号星期0~6(0表示星期天) 第6列要运行的命令
#每小时启动一次~ 根据自己需要来配置哦 * 1 * * * /usr/local/bin/ruby /your/path/to/rb/read_channel_from_mysql.rb
###donation: 账户名:杨春炼
###ask for help: 如需帮助,请加QQ:1028750558或微信:lian-ye