基于Redis RDB分析内存使用情况

基于redis rdb文件内存使用情况分析

1、生成rdb文件

通过bgsave命令生成rdb文件,如果存在rdb,则跳过此步

redis-cli -h xxx -p xxx bgsave

2、分析rdb文件

需安装rdb:点击此处连接 redis-rdb-tools

    rdb -c memory /var/redis/6379/dump.rdb > memory.csv

使用上面的工具进行分析,分析结果如下:

database,type,key,size_in_bytes,encoding,num_elements,len_largest_element
0,set,"uv_filter_151020_10005138312",1440523,hashtable,13975,32
0,set,"uv_filter_151021_100117706",1360332,hashtable,13074,32
0,set,"uv_filter_151020_1000329580",1284149,hashtable,12218,32
0,set,"uv_filter_151020_100030831",1228394,hashtable,11591,32
0,set,"uv_filter_151020_10007780250",1194260,hashtable,11208,32

基于第4列进行汇总。按照上面的例子,前缀都是相同的,故需要进行合并。 线上的业务总共6个分片,每个rdb文件完后的结果为1G+,将这些文件上传到AWS S3上面。

3、使用map-reduce服务数据分析

将如下程序(mapper.py、reducer.py、run.sh)放在hadoop服务器(emr-data-dev-00)

map程序,分析上面的csv文件,获取k-v。

mapper.py

#coding:utf8
import sys
import re

if __name__ == "__main__":
    qps_dic = {}

    for line in sys.stdin:
        try:
             infs = line[:-1].split(',')
             _key =  infs[2]
             _key = _key.replace("\"", '')
             _key = ''.join(_key.split(':')[0])
             _key = '_'.join(_key.split('_')[:-1])
             _key = re.sub(r"\d+","N", _key)
             _value = int(infs[3])
             if _key not in qps_dic:
                 qps_dic[_key] = 0
             qps_dic[_key] += _value
        except Exception:
            pass

    for k,v in qps_dic.items():
        print "%s\t%d"%(k,v)

处理点:

  • 按照逗号分隔,对前缀进行处理,将数字类转换成特殊字符N
  • 内部进行汇总,故构建一个字典统计
  • 输出按照tab键分隔

reduce程序进行结果合并

reducer.py

#coding:utf8
import sys

def emit(_key, _value):
    print _key + "\t" + "\t".join(map(str,_value))

if  __name__ == "__main__":
    current_key = None
    current_vals = []
    for line in sys.stdin:
        infs = line[:-1].split('\t',1)
        if len(infs) == 1:
            (_key,_value) = (infs[0],'0')
        else:
            (_key, _value) = infs
        if _key != current_key and current_key != None:
            emit(current_key, current_vals)
            current_vals = []
        current_key = _key
        _values = map(float,_value.split('\t'))
        if len(current_vals) == 0:
            current_vals = _values
        else:
            current_vals = [x + y for x, y in zip(_values, current_vals)]

    if current_key != None:
        emit(current_key, current_vals)

通过hadoop的streaming进行分析:

run.sh

#!/bin/sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -input s3://redis.data.backup/redis_analysis/sc-cluster_20160705/*.csv \    #s3 路径,这个可以自己上传路径修改
    -output s3://redis.data.backup/redis_analysis/sc-cluster_20160705/result \ #s3 路径,分析后保存路径
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -file "mapper.py" \
    -file "reducer.py"

4、下载并分析结果

aws s3 cp s3://xx.xx ./ --recursive

分析完得到的结果是5个部分: (自己汇总即可)

part-00000  part-00001  part-00002  part-00003  part-00004  _SUCCESS

得到memory排序结果

sort -rn -k2 part-0000*|awk '{print $1,$2/1024/1024,"M"}

针对购物车的统计

1、需求

根据购物车里面的商品数目,按照100为步长给出如下统计:

[ec2-user@dev-00 ~]$ aws s3 ls s3://forall/guming/redis_analysis/users/
2016-05-03 12:01:30          0 
2016-05-03 12:03:28     627442 user_100_200.log
2016-05-03 12:03:48     136347 user_200_300.log
2016-05-03 12:03:55      50394 user_300_400.log
2016-05-03 12:04:01      23317 user_400_500.log
2016-05-03 12:04:07      12995 user_500_600.log
2016-05-03 12:04:13       7649 user_600_700.log
2016-05-03 12:04:19       4831 user_700_800.log
2016-05-03 12:04:25       3305 user_800_900.log
2016-05-03 12:04:43       2318 user_900_1000.log

user_100_200是指购物车里面的商品数据超过100且小于200的

2、使用HIVE来进行操作

构建userid的表:

hive> show create table temp_user_id;    
OK
CREATE EXTERNAL TABLE `temp_user_id`(
  `user_id` bigint)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\t' 
  LINES TERMINATED BY '\n' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://xxxx@forall/mgu/u/m';

数据存放在S3上

构建userid和内存之间的映射表

hive> show create table temp_user_memory;
OK
CREATE EXTERNAL TABLE `temp_user_memory`(
  `user_id` bigint, 
  `msize` double)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\t' 
  LINES TERMINATED BY '\n' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://xxxx@forall/mgu/u/i';

写HiveSQL

[hadoop@bd-master-00 mgu]$ more r.sql 
use tmp;
select
  sum(t2.msize)
from
  (select
    distinct
    t1.user_id
  from
    temp_user_id t1) t3
inner join
  temp_user_memory t2
on
  t3.user_id = t2.user_id;

运行HiveSQL

hive -f sum.sql 

hive输出结果:

[hadoop@bd-master-00 mgu]$ hive -f sum.sql  

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
OK
Time taken: 0.043 seconds
Query ID = hadoop_20160503181212_571d477d-2610-4a50-abf6-779eeb7b026a
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1460542469374_69993,
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1460542469374_69993
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-05-03 18:12:46,948 Stage-1 map = 0%,  reduce = 0%
2016-05-03 18:12:59,349 Stage-1 map = 13%,  reduce = 0%, Cumulative CPU 8.03 sec
2016-05-03 18:13:02,455 Stage-1 map = 27%,  reduce = 0%, Cumulative CPU 11.29 sec
2016-05-03 18:13:05,600 Stage-1 map = 40%,  reduce = 0%, Cumulative CPU 14.51 sec
2016-05-03 18:13:07,666 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 16.76 sec
2016-05-03 18:13:15,947 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 18.58 sec
MapReduce Total cumulative CPU time: 18 seconds 580 msec
Ended Job = job_1460542469374_69993
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 18.58 sec   HDFS Read: 692 HDFS Write: 16 SUCCESS
Total MapReduce CPU Time Spent: 18 seconds 580 msec
OK
9.8070625201E10
Time taken: 39.05 seconds, Fetched: 1 row(s)