時(shí)間:2023-03-16 00:34:02 | 來源:電子商務(wù)
時(shí)間:2023-03-16 00:34:02 來源:電子商務(wù)
# 導(dǎo)入所需要的庫import numpy as npimport pandas as pdimport pymysqlpymysql.install_as_MySQLdb()from sqlalchemy import create_engineimport datetime# 連接數(shù)據(jù)庫(源數(shù)據(jù)層和數(shù)據(jù)倉庫)AdventureOds = create_engine("mysql://UserName_1:Password_1@IP_Address_1/ods?charset=gbk")AdventureDw = create_engine("mysql://UserName_2:Password_2@IP_Address_2/dw?charset=gbk")# 讀取ods_sales_orders表(訂單明細(xì)表)SQLquery1 = """select sales_order_key, create_date, customer_key, english_product_name, cpzl_zw, cplb_zw, unit_price from ods_sales_orders where create_date = ( select create_date from dim_date_df order by create_date desc limit 1) """OdsSaleOrder = pd.read_sql_query(SQLquery1, con=AdventureOds)# 讀取ods_customer表(每日新增用戶表)SQLquery2 = """select customer_key, chinese_territory, chinese_province, chinese_city from ods_customer"""OdsCustomer = pd.read_sql_query(SQLquery2, con=AdventureOds)# 讀取dim_date_df表(日期維度表)SQLquery3 = """select create_date, is_current_year, is_last_year, is_yesterday, is_today, is_current_month, is_current_quarter from dim_date_df"""OdsDimDate = pd.read_sql_query(SQLquery3, con=AdventureOds)# 數(shù)據(jù)加工SaleOrderCustomer = pd.merge(OdsSaleOrder, OdsCustomer, 'left', 'customer_key')SaleOrderCustomerP = SaleOrderCustomer.pivot_table(index=['create_date', 'english_product_name', 'cpzl_zw', 'cplb_zw', 'chinese_territory', 'chinese_province', 'chinese_city'], values=['sales_order_key', 'customer_key', 'unit_price'], aggfunc={'sales_order_key': pd.Series.nunique, 'customer_key': pd.Series.nunique, 'unit_price': sum} )SaleOrderCustomerP = SaleOrderCustomerP.reset_index()SaleOrderCustomerP.rename(columns={'customer_key':'SumCustomer', 'sales_order_key':'SumOrder', 'unit_price':'SumAmount'}, inplace=True)dw_customer_order = pd.merge(SaleOrderCustomerP, OdsDimDate, 'left', 'create_date')# 加工好的dw_customer_order表(時(shí)間_地區(qū)_產(chǎn)品聚合表)導(dǎo)出至數(shù)據(jù)倉庫try: pd.read_sql_query('Truncate table dw_customer_order', con=AdventureDw)except Exception as e: print('舊表刪除Error: %s' %e)dw_customer_order.to_sql('dw_customer_order', con=AdventureDw, if_exists='replace', index=False)
# 創(chuàng)建日期索引前,查詢?nèi)掌跒椤?019-03-01’訂單明細(xì)select * from ods_sales_orders where create_date='2019-03-01';
給ods_sales_orders表創(chuàng)建日期索引后,查詢耗時(shí) 0.168 ms,查詢速度提高 21.65 倍。可見,給表追加索引可以大大提高查詢速度,提升ETL效率。# 創(chuàng)建日期索引后,查詢?nèi)掌跒椤?019-03-01’訂單明細(xì)create index date_index on ods_sales_orders(create_date(7));select * from ods_sales_orders where create_date='2019-03-01';
索引的工作原理:表中存在索引時(shí),查詢語句不再遍歷列的所有元素,而是先遍歷索引,再遍歷前綴與索引相同的元素,遍歷復(fù)雜度降低,從而提高查詢速度。從explain結(jié)果可以看出,創(chuàng)建日期索引date_index后,查詢首先遍歷索引。# 使用explain展示查詢過程explain select * from ods_sales_orders where create_date='2019-03-01';
3.2 增添多進(jìn)程import multiprocessing # 導(dǎo)入多進(jìn)程庫def runtask(): # dw_order_by_day.py # dw_amount_diff.py # dw_customer_order.pydef callBackTask(arg): # 回調(diào)函數(shù)必須要有一個(gè)形參,否則將報(bào)錯(cuò) print("執(zhí)行回調(diào)函數(shù)",arg)if __name__ == "__main__": pool = multiprocessing.Pool(5) # 設(shè)置進(jìn)程池最大同時(shí)執(zhí)行進(jìn)程數(shù) for index in range(20): pool.apply_async(func=runtask,callback=callBackTask) # 并行的,有回調(diào)方法 # pool.apply(func=runtask,) # 串行的,無回調(diào)函數(shù) pool.close() # 關(guān)閉進(jìn)程池 pool.join() # 調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束
multiprocessing庫中的Pool就是進(jìn)程池,進(jìn)程池能夠管理一定的進(jìn)程,當(dāng)有空閑進(jìn)程時(shí),則利用空閑進(jìn)程完成任務(wù),直到所有任務(wù)完成為止。import schedule # 定時(shí)執(zhí)行模塊import timeimport datetimeimport os # 命令窗口交互模塊import requests?def job1(): """ dw_order_by_day 每日環(huán)比表 """ print('Job1:每天8:00執(zhí)行一次') print('Job1-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) os.system( "/home/anaconda3/bin/python3 /home/******/adventure/dw_order_by_day.py >> /home/******/adventure/logs/dw_order_by_day_schedule.log 2>&1 &") time.sleep(20) print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print('------------------------------------------------------------------------') if __name__ == '__main__': schedule.every().day.at('08:00').do(job1) while True: schedule.run_pending() time.sleep(10) print("wait", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
將這個(gè)文件掛在linux系統(tǒng)的后臺(tái),每天早上8點(diǎn),自動(dòng)執(zhí)行,定時(shí)更新;同時(shí)將代碼運(yùn)行情況寫入dw_order_by_day_schedule.log文件。關(guān)鍵詞:數(shù)據(jù),分析,銷售
客戶&案例
營銷資訊
關(guān)于我們
微信公眾號
版權(quán)所有? 億企邦 1997-2025 保留一切法律許可權(quán)利。