Python将Hive数据处理后装载进mysql

与大部分Hive数据汇总后装载进入mysql的处理相类似,此例主要是多了两时段内小时数据的填充处理,其他并无太多不同。
1、相应建表及样例数据
hive源表及数据:

hive> show create table xxxxx_liuwp0628_live;
OK
CREATE TABLE `xxxxx_liuwp0628_live`(
  `pt_day` string, 
  `room_id` bigint, 
  `game_id` int, 
  `game_name` string, 
  `switch_time` timestamp, 
  `updated_time` timestamp)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://emr-cluster/user/hive/warehouse/xxxxx_liuwp0628_live'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='{\"BASIC_STATS\":\"true\"}', 
  'numFiles'='1', 
  'numRows'='585848', 
  'rawDataSize'='45030719', 
  'totalSize'='45616567', 
  'transient_lastDdlTime'='1530165106')
Time taken: 0.1 seconds, Fetched: 22 row(s)
hive> select * from xxxxx_liuwp0628_live limit 10;
OK
2018-06-18      71706880        1147    穿越火线        2018-06-17 23:59:50     2018-06-18 00:02:01
2018-06-18      71706880        1147    穿越火线        2018-06-18 00:02:04     2018-06-18 00:05:40
2018-06-18      79441664        1159    王者荣耀        2018-06-17 23:49:38     2018-06-18 00:08:39
2018-06-18      58938112        1013    球球大作战      2018-06-17 20:43:29     2018-06-18 00:11:37
2018-06-18      8477696 1541    荒野行动        2018-06-17 23:50:36     2018-06-18 00:14:45
2018-06-18      4583168 1575    全军出击        2018-06-18 00:09:44     2018-06-18 00:15:20
2018-06-18      38969600        1409    QQ飞车手游      2018-06-17 22:33:48     2018-06-18 00:34:11
2018-06-18      72643840        1575    全军出击        2018-06-17 22:45:05     2018-06-18 00:49:04
2018-06-18      81295872        1576    刺激战场        2018-06-18 01:06:29     2018-06-18 01:08:03
2018-06-18      26325760        1159    王者荣耀        2018-06-18 00:53:08     2018-06-18 01:09:00
Time taken: 0.075 seconds, Fetched: 10 row(s)
mysql目标表及结果数据:
CREATE TABLE `xxxxx_wp0628` (
  `room_id` varchar(80) DEFAULT NULL,
  `game_id` varchar(80) DEFAULT NULL,
  `game_name` varchar(80) DEFAULT NULL,
  `switch_hour` varchar(80) DEFAULT NULL,
  `updated_hour` varchar(80) DEFAULT NULL,
  `live_hour` varchar(80) DEFAULT NULL,
  `etl_time` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
mysql> select * from xxxxx_wp0628 limit 20;
+----------+---------+-----------------+---------------+---------------+---------------+---------------------+
| room_id  | game_id | game_name       | switch_hour   | updated_hour  | live_hour     | etl_time            |
+----------+---------+-----------------+---------------+---------------+---------------+---------------------+
| 71706880 | 1147    | 穿越火线    | 2018-06-17 23 | 2018-06-18 00 | 2018-06-17 23 | 2018-06-28 19:22:09 |
| 71706880 | 1147    | 穿越火线    | 2018-06-17 23 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 71706880 | 1147    | 穿越火线    | 2018-06-18 00 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 79441664 | 1159    | 王者荣耀    | 2018-06-17 23 | 2018-06-18 00 | 2018-06-17 23 | 2018-06-28 19:22:09 |
| 79441664 | 1159    | 王者荣耀    | 2018-06-17 23 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 58938112 | 1013    | 球球大作战 | 2018-06-17 20 | 2018-06-18 00 | 2018-06-17 20 | 2018-06-28 19:22:09 |
| 58938112 | 1013    | 球球大作战 | 2018-06-17 20 | 2018-06-18 00 | 2018-06-17 21 | 2018-06-28 19:22:09 |
| 58938112 | 1013    | 球球大作战 | 2018-06-17 20 | 2018-06-18 00 | 2018-06-17 22 | 2018-06-28 19:22:09 |
| 58938112 | 1013    | 球球大作战 | 2018-06-17 20 | 2018-06-18 00 | 2018-06-17 23 | 2018-06-28 19:22:09 |
| 58938112 | 1013    | 球球大作战 | 2018-06-17 20 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 8477696  | 1541    | 荒野行动    | 2018-06-17 23 | 2018-06-18 00 | 2018-06-17 23 | 2018-06-28 19:22:09 |
| 8477696  | 1541    | 荒野行动    | 2018-06-17 23 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 4583168  | 1575    | 全军出击    | 2018-06-18 00 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 38969600 | 1409    | QQ飞车手游  | 2018-06-17 22 | 2018-06-18 00 | 2018-06-17 22 | 2018-06-28 19:22:09 |
| 38969600 | 1409    | QQ飞车手游  | 2018-06-17 22 | 2018-06-18 00 | 2018-06-17 23 | 2018-06-28 19:22:09 |
| 38969600 | 1409    | QQ飞车手游  | 2018-06-17 22 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 72643840 | 1575    | 全军出击    | 2018-06-17 22 | 2018-06-18 00 | 2018-06-17 22 | 2018-06-28 19:22:09 |
| 72643840 | 1575    | 全军出击    | 2018-06-17 22 | 2018-06-18 00 | 2018-06-17 23 | 2018-06-28 19:22:09 |
| 72643840 | 1575    | 全军出击    | 2018-06-17 22 | 2018-06-18 00 | 2018-06-18 00 | 2018-06-28 19:22:09 |
| 81295872 | 1576    | 刺激战场    | 2018-06-18 01 | 2018-06-18 01 | 2018-06-18 01 | 2018-06-28 19:22:09 |
+----------+---------+-----------------+---------------+---------------+---------------+---------------------+
20 rows in set (0.00 sec)
2、目录路径
MacBook-Pro-Nisj:HiveDataProc2mysql nisj$ pwd
/Users/nisj/PycharmProjects/BiDataProc/love/HiveDataProc2mysql
MacBook-Pro-Nisj:HiveDataProc2mysql nisj$ find . -print | sed -e 's;[^/]*/;|____;g;s;____|; |;g'
.
|____ParProc.pyc
|____HiveDataProc2mysql.py
|____ParProc.py
MacBook-Pro-Nisj:HiveDataProc2mysql nisj$ 
3、参数脚本
/Users/nisj/PycharmProjects/BiDataProc/love/HiveDataProc2mysql/ParProc.py
# -*- coding=utf-8 -*-
import warnings
import datetime

warnings.filterwarnings("ignore")


def getNowDay():
    DayNow = datetime.datetime.today().strftime('%Y-%m-%d')
    return DayNow


def getYesterDay():
    YesterDay = (datetime.datetime.today() - datetime.timedelta(1)).strftime('%Y-%m-%d')
    return YesterDay


def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return dates


def monthRange(beginDate, endDate):
    monthSet = set()
    for date in dateRange(beginDate, endDate):
        monthSet.add(date[0:7])
    monthList = []
    for month in monthSet:
        monthList.append(month)
    return sorted(monthList)


def dateHourRange(beginDateHour, endDateHour):
    dhours = []
    dhour = datetime.datetime.strptime(beginDateHour, "%Y-%m-%d %H")
    date = beginDateHour[:]
    while date <= endDateHour:
        dhours.append(date)
        dhour = dhour + datetime.timedelta(hours=1)
        date = dhour.strftime("%Y-%m-%d %H")
    return dhours


def getSrcMysqlConfig():
    host = "MysqlHost"
    port = 6603
    user = "MysqlUser"
    pawd = "MysqlPass"
    dbnm = "funnyai_data"

    return (host, port, user, pawd, dbnm)
4、主功能脚本
/Users/nisj/PycharmProjects/BiDataProc/love/HiveDataProc2mysql/HiveDataProc2mysql.py
# -*- coding=utf-8 -*-
import os
import re
import time
import sys
from ParProc import *

reload(sys)
sys.setdefaultencoding('utf8')

warnings.filterwarnings("ignore")


def hiveDataProc2mysql(runDay):
    # 参数初始化赋值
    host = getSrcMysqlConfig()[0]
    port = getSrcMysqlConfig()[1]
    user = getSrcMysqlConfig()[2]
    passwd = getSrcMysqlConfig()[3]
    db = getSrcMysqlConfig()[4]

    os.system("""source /etc/profile; \
             /usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
             truncate table xxxxx_wp0628; " """.format(host=host, port=port, user=user, passwd=passwd, db=db
                                                       ))

    srcDetail = os.popen("""source /etc/profile; \
                /usr/lib/hive-current/bin/hive -e " \
                select room_id,game_id,game_name,substr(switch_time,1,13) switch_hour,substr(updated_time,1,13) updated_hour \
                from xxxxx_liuwp0628_live \
                ; \
                " """.format(runDay=runDay)).readlines();
    srcDetail_list = []
    for src_list in srcDetail:
        src = re.split('\t', src_list.replace('\n', ''))
        srcDetail_list.append(src)

    Proc_Data_list = []
    for srcD in srcDetail_list:
        for dh in dateHourRange(beginDateHour=srcD[3], endDateHour=srcD[4]):
            # print srcD[0], srcD[1], srcD[2], srcD[3], srcD[4], dh
            Proc_Data_list.append((srcD[0], srcD[1], srcD[2], srcD[3], srcD[4], dh))

    # for x in Proc_Data_list:
    #     print x[0], x[1], x[2], x[3], x[4], x[5]

    i = 0
    insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
           insert into xxxxx_wp0628(room_id, game_id,game_name, switch_hour,updated_hour,live_hour, etl_time) \
           values """.format(host=host, port=port, user=user, passwd=passwd, db=db)
    for procd in Proc_Data_list:
        room_id = procd[0]
        game_id = procd[1]
        game_name = procd[2].replace(chr(10), '').replace(chr(39), '').replace('(', '').replace(')', '').replace(
            chr(96),
            '').replace(
            chr(34), '').replace(chr(92), '')
        switch_hour = procd[3]
        updated_hour = procd[4]
        live_hour = procd[5]
        etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

        i += 1
        insert_mysql_sql = insert_mysql_sql + """('{room_id}','{game_id}','{game_name}','{switch_hour}','{updated_hour}','{live_hour}','{etl_time}'),""".format(
            host=host, port=port, user=user, passwd=passwd, db=db, room_id=room_id, game_id=game_id,
            game_name=game_name, switch_hour=switch_hour, updated_hour=updated_hour, live_hour=live_hour,
            etl_time=etl_time)
        if (i % 500 == 0):
            insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """
            os.system(insert_mysql_sql)

            insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
                   insert into xxxxx_wp0628(room_id, game_id,game_name, switch_hour,updated_hour,live_hour, etl_time) \
                   values """.format(host=host, port=port, user=user, passwd=passwd, db=db)

    insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """
    os.system(insert_mysql_sql)


hiveDataProc2mysql(runDay=getYesterDay())
5、其他说明
需求详述:
由于审核部门安排人员进行审核工作,想了解一下目前平台热门游戏的直播量的大小。需要以下数据:
横坐标为时间维度:6月18日—6月24日 最低单位为小时
纵坐标为直播间数量 
现需要热门游戏的这些数据(热门游戏:绝地求生:刺激战场、绝地求生:全军出击、单机游戏、第五人格、一起来飞车、异次元、火影忍者、全民枪战、天天酷跑、球球大作战、王者荣耀、穿越火线、我的世界、荒野行动、绝地求生、QQ飞车手游、终结者2:审判日、迷你世界)
共计18份数据表
生成中间hive表的语句:
drop table if exists xxxxx_liuwp0628_live;
create table xxxxx_liuwp0628_live as
select pt_day,room_id,game_id,game_name,switch_time,updated_time
from honeycomb_all_live_history_status
where pt_day between '2018-06-18' and '2018-06-24'
  and game_id in (1576,1575,1187,1606,1363,1353,1175,1009,1008,1013,1159,1147,1014,1541,1515,1409,1547,1269);
需求最终结果导出语句:
select a1.game_name,a1.live_hour,count(distinct a1.room_id) online_room_cnt
from (select case
when a1.game_id=1008 then '天天酷跑  '
when a1.game_id=1009 then '全民枪战  '
when a1.game_id=1013 then '球球大作战 '
when a1.game_id=1014 then '我的世界  '
when a1.game_id=1147 then '穿越火线  '
when a1.game_id=1159 then '王者荣耀  '
when a1.game_id=1175 then '火影忍者  '
when a1.game_id=1187 then '单机游戏  '
when a1.game_id=1269 then '迷你世界  '
when a1.game_id=1353 then '异次元   '
when a1.game_id=1363 then '一起来飞车 '
when a1.game_id=1409 then 'QQ飞车手游'
when a1.game_id=1515 then '绝地求生  '
when a1.game_id=1541 then '荒野行动  '
when a1.game_id=1547 then '终结者2  '
when a1.game_id=1575 then '全军出击  '
when a1.game_id=1576 then '刺激战场  '
when a1.game_id=1606 then '第5人格  '
end game_name,a1.live_hour,a1.room_id from xxxxx_wp0628 a1 where substr(a1.live_hour,1,10) between '2018-06-18' and '2018-06-24') a1
group by a1.game_name,a1.live_hour
order by a1.game_name,a1.live_hour;
文章来源: Python将Hive数据处理后装载进mysql

人吐槽 人点赞

猜你喜欢

发表评论

用户名: 密码:
验证码: 匿名发表

你可以使用这些语言

查看评论:Python将Hive数据处理后装载进mysql