数据处理 —— 出租车gps提取订单数据( 二 )


3.3 生成订单数据
需求:3.2获得了每个车辆的数据,对于每个车辆这里需要按时间排序,然后生成一个订单
思路:多线程遍历每个车,寻找连续的state=1序列(多线程没单线程快…)
学习过程:
线程的暂停与恢复
关于锁:一个锁锁一个东西,这个东西释放了,别人才能拿到这个锁
锁的描述
字典按照key排序:
sort_date = sorted(data.items(), key=lambda item: item[0])
"""@author:HY@time:2021/11/1:10:34"""import pickleimport pandas as pdfrom tqdm import tqdmimport threadingclass Node:def __init__(self, value):self.value = http://www.kingceram.com/post/valueself.next = Noneclass Queue:"""队列用于存储每个线程要处理的数据"""def __init__(self):self.head = Noneself.tail = Noneself.length = 0def pop(self):if self.head is None: return Nonea = self.headif self.head.next is None:self.head = self.tail = Noneelse:self.head = self.head.nextself.length -= 1return adef push(self, node):if self.head is None:self.head = self.tail = nodeelse:self.tail.next = nodeself.tail = nodeself.length += 1class Request:def __init__(self, start_time, s_lng, s_lat):self.start_time = start_timeself.s_lng = s_lngself.s_lat = s_latself.e_lng = Noneself.e_lat = Noneself.end_time = Noneclass MY_Server(threading.Thread):def __init__(self, name, value=http://www.kingceram.com/post/Queue()):threading.Thread.__init__(self)self.mName = nameself.mEvent = threading.Event()self.data_queue = valueself.is_running = Falseself.recieving = Truedef run(self):while self.recieving or self.data_queue.length> 0:# 逐个车遍历 。当该线程还在接受车辆或者队列还有的数据的时候都不停遍历data = http://www.kingceram.com/post/self.data_queue.pop()# data是一个列表,每个元素是[time, state,lng,lat]while data is None and self.recieving:self.is_running = Falsedata = self.data_queue.pop()if data is None: breakself.is_running = True# 排序字典:按时间排序sort_date = sorted(data.value, key=lambda item: item[0])vehicle_req_list = self.get_request(sort_date)# 将该车的订单加入列表lock.acquire()req_list.extend(vehicle_req_list)lock.release()def get_request(self, sort_data):"""sort_data是一个列表,每个元素是一个列表 now, state, lng, lat"""last = [None, 0, None, None]request_list = []one_request = Nonefor s in sort_data:now, state, lng, lat = s# 此刻状态last_now, last_state, last_lng, last_lat = last# 之前状态if last_state == 0:# 之前是没有人的状态,找1建立requestif state == 0:continue# 中间的0全部跳过elif state == 1:# 找到了一个首字母1,建立request并且last记录为当前one_request = Request(now, lng, lat)last = [now, state, lng, lat]elif last_state == 1:# 有人的状态,找0前面的1if state == 1:# 不断记录直到最后一个last = [now, state, lng, lat]elif state == 0:one_request.end_time = last_nowone_request.e_lng = last_lngone_request.e_lat = last_latrequest_list.append(one_request)return request_listdef pause(self):self.mEvent.clear()def resume(self):self.mEvent.set()import timeif __name__ == "__main__":time1 = time.time()# 数据获取# all_dic = {'a': {30: 1, 40: 0, 50: 1}, 'b': {30: 1, 40: 0, 50: 0}}with open('多线程-车辆数据字典.pickle', 'rb') as f:all_dic = pickle.load(f)# 生成一个锁对象lock = threading.Lock()# 结果集req_list = []# 线程数量thread_num = 1# 开启线程# t0 = MY_Server(0)# t0.start()## t1 = MY_Server(1)# t1.start()## t2 = MY_Server(2)# t2.start()for i in range(thread_num):# 12个线程exec('t' + str(i) + ' = MY_Server(i)')exec('t' + str(i) + '.start()')# 遍历数据输入给每个线程taxi_num = 0for _, value in all_dic.items():# value是字典,key为时间,value为别的turn_to_whom = taxi_num % thread_numn = Node(value)exec('t' + str(turn_to_whom) + '.data_queue.push(n)')# 告诉所有线程数据传送完毕for i in range(thread_num):# 12个线程exec('t' + str(i) + '.recieving = False')# join()for i in range(thread_num):# 12个线程exec('t' + str(i) + '.join()')time2 = time.time()print(len(req_list))print('耗时', time2 - time1)# 保存订单数据with open('shenzhen_req.pickle', 'wb') as f:pickle.dump(req_list, f)