基于Redis RDB分析内存使用情况
基于redis rdb文件内存使用情况分析
1、生成rdb文件
通过bgsave命令生成rdb文件,如果存在rdb,则跳过此步
redis-cli -h xxx -p xxx bgsave
2、分析rdb文件
需安装rdb:点击此处连接 redis-rdb-tools
rdb -c memory /var/redis/6379/dump.rdb > memory.csv
使用上面的工具进行分析,分析结果如下:
database,type,key,size_in_bytes,encoding,num_elements,len_largest_element
0,set,"uv_filter_151020_10005138312",1440523,hashtable,13975,32
0,set,"uv_filter_151021_100117706",1360332,hashtable,13074,32
0,set,"uv_filter_151020_1000329580",1284149,hashtable,12218,32
0,set,"uv_filter_151020_100030831",1228394,hashtable,11591,32
0,set,"uv_filter_151020_10007780250",1194260,hashtable,11208,32
基于第4列进行汇总。按照上面的例子,前缀都是相同的,故需要进行合并。 线上的业务总共6个分片,每个rdb文件完后的结果为1G+,将这些文件上传到AWS S3上面。
3、使用map-reduce服务数据分析
将如下程序(mapper.py、reducer.py、run.sh)放在hadoop服务器(emr-data-dev-00)
map程序,分析上面的csv文件,获取k-v。
mapper.py
#coding:utf8
import sys
import re
if __name__ == "__main__":
qps_dic = {}
for line in sys.stdin:
try:
infs = line[:-1].split(',')
_key = infs[2]
_key = _key.replace("\"", '')
_key = ''.join(_key.split(':')[0])
_key = '_'.join(_key.split('_')[:-1])
_key = re.sub(r"\d+","N", _key)
_value = int(infs[3])
if _key not in qps_dic:
qps_dic[_key] = 0
qps_dic[_key] += _value
except Exception:
pass
for k,v in qps_dic.items():
print "%s\t%d"%(k,v)
处理点:
- 按照逗号分隔,对前缀进行处理,将数字类转换成特殊字符N
- 内部进行汇总,故构建一个字典统计
- 输出按照tab键分隔
reduce程序进行结果合并
reducer.py
#coding:utf8
import sys
def emit(_key, _value):
print _key + "\t" + "\t".join(map(str,_value))
if __name__ == "__main__":
current_key = None
current_vals = []
for line in sys.stdin:
infs = line[:-1].split('\t',1)
if len(infs) == 1:
(_key,_value) = (infs[0],'0')
else:
(_key, _value) = infs
if _key != current_key and current_key != None:
emit(current_key, current_vals)
current_vals = []
current_key = _key
_values = map(float,_value.split('\t'))
if len(current_vals) == 0:
current_vals = _values
else:
current_vals = [x + y for x, y in zip(_values, current_vals)]
if current_key != None:
emit(current_key, current_vals)
通过hadoop的streaming进行分析:
run.sh
#!/bin/sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input s3://redis.data.backup/redis_analysis/sc-cluster_20160705/*.csv \ #s3 路径,这个可以自己上传路径修改
-output s3://redis.data.backup/redis_analysis/sc-cluster_20160705/result \ #s3 路径,分析后保存路径
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file "mapper.py" \
-file "reducer.py"
4、下载并分析结果
aws s3 cp s3://xx.xx ./ --recursive
分析完得到的结果是5个部分: (自己汇总即可)
part-00000 part-00001 part-00002 part-00003 part-00004 _SUCCESS
得到memory排序结果
sort -rn -k2 part-0000*|awk '{print $1,$2/1024/1024,"M"}
针对购物车的统计
1、需求
根据购物车里面的商品数目,按照100为步长给出如下统计:
[ec2-user@dev-00 ~]$ aws s3 ls s3://forall/guming/redis_analysis/users/
2016-05-03 12:01:30 0
2016-05-03 12:03:28 627442 user_100_200.log
2016-05-03 12:03:48 136347 user_200_300.log
2016-05-03 12:03:55 50394 user_300_400.log
2016-05-03 12:04:01 23317 user_400_500.log
2016-05-03 12:04:07 12995 user_500_600.log
2016-05-03 12:04:13 7649 user_600_700.log
2016-05-03 12:04:19 4831 user_700_800.log
2016-05-03 12:04:25 3305 user_800_900.log
2016-05-03 12:04:43 2318 user_900_1000.log
user_100_200是指购物车里面的商品数据超过100且小于200的
2、使用HIVE来进行操作
构建userid的表:
hive> show create table temp_user_id;
OK
CREATE EXTERNAL TABLE `temp_user_id`(
`user_id` bigint)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://xxxx@forall/mgu/u/m';
数据存放在S3上
构建userid和内存之间的映射表
hive> show create table temp_user_memory;
OK
CREATE EXTERNAL TABLE `temp_user_memory`(
`user_id` bigint,
`msize` double)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://xxxx@forall/mgu/u/i';
写HiveSQL
[hadoop@bd-master-00 mgu]$ more r.sql
use tmp;
select
sum(t2.msize)
from
(select
distinct
t1.user_id
from
temp_user_id t1) t3
inner join
temp_user_memory t2
on
t3.user_id = t2.user_id;
运行HiveSQL
hive -f sum.sql
hive输出结果:
[hadoop@bd-master-00 mgu]$ hive -f sum.sql
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
OK
Time taken: 0.043 seconds
Query ID = hadoop_20160503181212_571d477d-2610-4a50-abf6-779eeb7b026a
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1460542469374_69993,
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1460542469374_69993
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-05-03 18:12:46,948 Stage-1 map = 0%, reduce = 0%
2016-05-03 18:12:59,349 Stage-1 map = 13%, reduce = 0%, Cumulative CPU 8.03 sec
2016-05-03 18:13:02,455 Stage-1 map = 27%, reduce = 0%, Cumulative CPU 11.29 sec
2016-05-03 18:13:05,600 Stage-1 map = 40%, reduce = 0%, Cumulative CPU 14.51 sec
2016-05-03 18:13:07,666 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 16.76 sec
2016-05-03 18:13:15,947 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 18.58 sec
MapReduce Total cumulative CPU time: 18 seconds 580 msec
Ended Job = job_1460542469374_69993
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 18.58 sec HDFS Read: 692 HDFS Write: 16 SUCCESS
Total MapReduce CPU Time Spent: 18 seconds 580 msec
OK
9.8070625201E10
Time taken: 39.05 seconds, Fetched: 1 row(s)