数据处理 —— 出租车gps提取订单数据

1. 需求目的:
约5千万条出租车gps数据存储在mysql中,数据按照一定时间频率记录,想要获取出其中的订单数量 。
(数据示例)
2. 思路分析
是车牌号,state=1表示车上有人 。数据先按照车辆分组,然后按时间排序,最后选出连续的序列state=1作为一个订单
3. 实践 3.1 sql合并数据
由于数据在mysql中被分成了200+个表,有两种处理方式:
分别处理每个表获得每个车牌号数据,再按车牌号合并先把表合并成大表,直接整体处理获得每个车牌号数据(used in my )
2021.10.30
需求:一天的出租车轨迹表被成了几百个表,需要将这些表合并
【数据处理 —— 出租车gps提取订单数据】思路:读出这个数据库中所有的表名,循环将所有表合并到一个新表中(使用存储过程)
学习过程:
两个表垂直拼接的方法
# 两个表之间的连接查询,但是会删除两个表中的重复数据`select * from table union select * from table2两个表之间的连接查询,不删除两个表中的重复数据`select * from table union all select * from table2
:这样子写200+个句子不不现实,应该使用循环或者直接获取数据库中的表名
存储过程
DELIMITER $$USE `taxigps`$$DROP PROCEDURE IF EXISTS `test11`$$CREATE DEFINER=`root`@`%` PROCEDURE `test11`()BEGINDECLARE stopflag INT DEFAULT 0;DECLARE tablename VARCHAR(64);-- 创建一个游标变量,declare 变量名 cursor ...DECLARE tablename_cur CURSOR FOR SELECT table_name FROM information_schema.tables WHERE table_schema='taxigps' AND table_name != 'taxi';-- 游标是保存查询结果的临时区域-- 游标变量username_cur保存了查询的临时结果,实际上就是结果集-- 当游标变量中保存的结果都查询一遍(遍历),到达结尾,将变量stopflag设置为1,用于循环中判断是否结束DECLARE CONTINUE HANDLER FOR NOT FOUND SET stopflag=1;OPEN tablename_cur; -- 打卡游标FETCH tablename_cur INTO tablename; -- 游标向前走一步,取出一条记录放到变量username中#SELECT tablename,000;WHILE(stopflag=0) DO -- 如果游标还没有结尾,就继续BEGIN -- 在用户名前门拼接 '_cur' 字符串SET @sql = CONCAT('insert into taxi SELECT * FROM ',tablename);PREPARE ss FROM @sql;EXECUTE ss;#INSERT INTO taxi SELECT * FROM trablename;FETCH tablename_cur INTO tablename;END;END WHILE; -- 结束循环CLOSE tablename_cur; -- 关闭游标END$$DELIMITER ;
3.2 按车牌号分组
"""@author:HY@time:2021/10/31:14:51"""from threading import Threadimport threadingfrom time import sleep, ctimeimport pandas as pdimport timeimport pickledef solve_file(name, file):for index, row in file.iterrows():date, taxi_time, _, plate_number, lng, lat, _, _, state, _ = rowif plate_number in taxi_dic.keys():taxi_dic[plate_number][taxi_time] = stateelse:taxi_dic[plate_number] = {taxi_time: state}df = pd.read_csv('test.csv', header=None, sep='\t')# df = pd.read_csv('20160920_taxigps.csv', header=None, sep='\t')line_num = len(df)taxi_dic = {}time1 = time.time()thread_num = 1for i in range(thread_num):i = i + 1exec('divide_file' + str(i) + '= int(line_num * ' + str(i) + '/ thread_num)')if i == 1:exec('df' + str(i) + ' = df[divide_file1:]')elif i == thread_num:exec('df' + str(i) + ' = df[:divide_file' + str(thread_num) + ']')else:exec('df' + str(i) + ' = df[divide_file' + str(i-1) + ':divide_file' + str(i) + ']')for i in range(thread_num):exec('t' + str(i) + ' = threading.Thread(target=solve_file, args=('+ str(i) + ', df'+ str(i) + '))')exec('t' + str(i) + '.start()')exec('t' + str(i) + '.join()')time2 = time.time()with open('多线程-车辆数据字典.pickle', 'wb') as f:pickle.dump(taxi_dic, f)taxi_dic.clear()time3 = time.time()solve_file('all',df)time4 = time.time()with open('单线程-车辆数据字典.pickle', 'wb') as f:pickle.dump(taxi_dic, f)print(f'多线程聚合耗时:time={time2-time1},单线程聚合耗时:time={time4-time3}')