Data_ETL.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. from sync_amz_data.public.amz_ad_client import SPClient,Account,SBClient,SDClient
  2. import pandas as pd
  3. import numpy as np
  4. from dateutil.parser import parse
  5. pd.set_option('display.max_columns', None)
  6. import warnings
  7. warnings.filterwarnings('ignore')
  8. pd.set_option('expand_frame_repr', False)
  9. from datetime import datetime,timezone,timedelta
  10. class Common_ETLMethod:
  11. def columnsName_modify(self,df):
  12. """
  13. 列名.换_,设置全部小写
  14. """
  15. df.columns = [i.replace(".","_").lower() for i in df.columns]
  16. return df
  17. def time_stamp_convert(self,df,time_columns:list):
  18. """
  19. 时间戳转换为utc
  20. """
  21. for time_column in time_columns:
  22. df[time_column] = pd.to_datetime(df[time_column]*1000000).map(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
  23. df[time_columns] = df[time_columns].astype("datetime64")
  24. return df
  25. def TZ_Deal(self,df, time_columns):
  26. """
  27. TZ时间格式转换为utc
  28. """
  29. for time_column in time_columns:
  30. df[time_column] = df[time_column].map(lambda x: parse(x).strftime("%Y-%m-%d %H:%M:%S"))
  31. df[time_columns] = df[time_columns].astype("datetime64")
  32. return df
  33. def placement_segmentsplit(self,df,segment):
  34. """
  35. 拆分placement与percentage列
  36. """
  37. df[segment] = df[segment].astype("string")
  38. df[segment+str("_percentage")] = df[segment].str.extract("'percentage':.+([\d\.]{1,}),").astype('float32')
  39. df[segment+str("_placement")] = df[segment].str.extract("'placement':.+'(.+)'")
  40. df.replace(['nan','Nan','NaN'],np.nan,inplace=True)
  41. df.drop(columns=[segment],inplace=True)
  42. return df
  43. def expression_split(self,df,segment):
  44. """
  45. 拆分type,value列
  46. """
  47. df[segment] = df[segment].astype("string")
  48. df[segment+str("_type")] = df[segment].str.extract(r"'type':\s{0,1}'(.+?)',")
  49. df[segment+str("_value")] = df[segment].str.extract(r"'value':\s{0,1}[',[,{](.+)'")
  50. df[segment+str("_value")] = df[segment+str("_value")].map(lambda x: x if pd.isna(x) or "," not in x else "["+x+"'}]")
  51. df.replace(['nan','Nan','NaN'],np.nan,inplace=True)
  52. df.drop(columns=[segment],inplace=True)
  53. return df
  54. class Acount_ETL(Account,Common_ETLMethod):
  55. def portfolio_ETL(self):
  56. list_portfolio = self.get_portfolios()
  57. df_portfolio = pd.json_normalize(list_portfolio)
  58. # print(self.columnsName_modify(df_portfolio))
  59. return self.columnsName_modify(df_portfolio)
  60. class SP_ETL(SPClient,Common_ETLMethod):
  61. def campaigns_ETL(self):
  62. list_campaign_SP = list(self.iter_campaigns(**{"includeExtendedDataFields":True}))
  63. df_campaign = pd.json_normalize(list_campaign_SP)
  64. df_campaign = self.placement_segmentsplit(df_campaign, "dynamicBidding.placementBidding")
  65. df_campaign = self.TZ_Deal(df_campaign,["extendedData.creationDateTime","extendedData.lastUpdateDateTime"])
  66. # print(df_campaign)
  67. return self.columnsName_modify(df_campaign)
  68. def adGroups_ETL(self):
  69. list_adGroup_SP = list(self.iter_adGroups(**{"includeExtendedDataFields":True}))
  70. df_adGroup_SP = pd.json_normalize(list_adGroup_SP)
  71. df_adGroup_SP = self.TZ_Deal(df_adGroup_SP,["extendedData.creationDateTime","extendedData.lastUpdateDateTime"])
  72. return self.columnsName_modify(df_adGroup_SP)
  73. def ads_ETL(self):
  74. list_adId_SP = list(self.iter_ads(**{"includeExtendedDataFields":True}))
  75. df_adId_SP = pd.json_normalize(list_adId_SP)
  76. df_adId_SP = self.TZ_Deal(df_adId_SP,["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
  77. return self.columnsName_modify(df_adId_SP)
  78. def keywords_ETL(self):
  79. list_keywords_SP = list(self.iter_keywords(**{"includeExtendedDataFields":True}))
  80. df_keywords_SP = pd.json_normalize(list_keywords_SP)
  81. df_keywords_SP = self.TZ_Deal(df_keywords_SP, ["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
  82. return self.columnsName_modify(df_keywords_SP)
  83. def targets_ETL(self):
  84. list_targets = list(self.iter_targets())
  85. df_targets = pd.json_normalize(list_targets)
  86. df_targets = self.TZ_Deal(df_targets, ["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
  87. return self.columnsName_modify(df_targets)
  88. def budget_ETL(self,campaign_ids:list):
  89. list_budget = self.get_budget(campaign_ids = campaign_ids)['success']
  90. df_budget = pd.json_normalize(list_budget)
  91. df_budget = self.TZ_Deal(df_budget,["usageUpdatedTimestamp"])
  92. return self.columnsName_modify(df_budget)
  93. class SB_ETL(SBClient,Common_ETLMethod):
  94. reportMetrics = [
  95. 'applicableBudgetRuleId',
  96. 'applicableBudgetRuleName',
  97. 'attributedConversions14d',
  98. 'attributedConversions14dSameSKU',
  99. 'attributedDetailPageViewsClicks14d',
  100. 'attributedOrderRateNewToBrand14d',
  101. 'attributedOrdersNewToBrand14d',
  102. 'attributedOrdersNewToBrandPercentage14d',
  103. 'attributedSales14d',
  104. 'attributedSales14dSameSKU',
  105. 'attributedSalesNewToBrand14d',
  106. 'attributedSalesNewToBrandPercentage14d',
  107. 'attributedUnitsOrderedNewToBrand14d',
  108. 'attributedUnitsOrderedNewToBrandPercentage14d',
  109. 'campaignBudget',
  110. 'campaignBudgetType',
  111. 'campaignId',
  112. 'campaignName',
  113. 'campaignRuleBasedBudget',
  114. 'campaignStatus',
  115. 'clicks',
  116. 'cost',
  117. 'dpv14d',
  118. 'impressions',
  119. 'unitsSold14d',
  120. 'attributedBrandedSearches14d',
  121. 'topOfSearchImpressionShare']
  122. def campaigns_ETL(self):
  123. list_campaign_SB = list(self.iter_campaigns(**{"includeExtendedDataFields":True}))
  124. df_campaign = pd.json_normalize(list_campaign_SB)
  125. df_campaign = self.placement_segmentsplit(df_campaign, "bidding.bidAdjustmentsByPlacement")
  126. df_campaign = self.time_stamp_convert(df_campaign,["extendedData.creationDate","extendedData.lastUpdateDate"])
  127. # print(df_campaign)
  128. return self.columnsName_modify(df_campaign)
  129. def adGroups_ETL(self):
  130. list_adGroup_SB = list(self.iter_adGroups(**{"includeExtendedDataFields":True}))
  131. df_adGroup_SP = pd.json_normalize(list_adGroup_SB)
  132. df_adGroup_SP = self.time_stamp_convert(df_adGroup_SP,["extendedData.creationDate","extendedData.lastUpdateDate"])
  133. return self.columnsName_modify(df_adGroup_SP)
  134. def ads_ETL(self):
  135. list_adId_SB = list(self.iter_ads(**{"includeExtendedDataFields":True}))
  136. df_adId_SP = pd.json_normalize(list_adId_SB)
  137. df_adId_SP = self.time_stamp_convert(df_adId_SP,["extendedData.creationDate","extendedData.lastUpdateDate"])
  138. return self.columnsName_modify(df_adId_SP)
  139. def keywords_ETL(self):
  140. list_keywords_SB = [row for _ in list(self.iter_keywords()) for row in _]
  141. df_keywords_SP = pd.json_normalize(list_keywords_SB)
  142. return self.columnsName_modify(df_keywords_SP)
  143. def targets_ETL(self):
  144. list_targets = list(self.iter_targets())
  145. df_targets = pd.json_normalize(list_targets)
  146. # df_targets = self.TZ_Deal(df_targets, ["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
  147. df_targets = self.expression_split(df_targets,"resolvedExpressions")
  148. return self.columnsName_modify(df_targets)
  149. def budget_ETL(self,campaign_ids:list):
  150. list_budget = self.get_budget(campaignIds = campaign_ids)['success']
  151. df_budget = pd.json_normalize(list_budget)
  152. df_budget = self.TZ_Deal(df_budget,["usageUpdatedTimestamp"])
  153. return self.columnsName_modify(df_budget)
  154. def report_campaignsRecord_ETL(self):
  155. today = datetime.today()
  156. date = (datetime(today.year,today.month,today.day,tzinfo=timezone.utc)-timedelta(days=1)).strftime("%Y%m%d")
  157. print(date)
  158. need_removedList = []
  159. if need_removedList is not None:
  160. [SB_ETL.reportMetrics.remove(i) for i in need_removedList]
  161. list_campaigns_report = self.get_v3_report(record_type="campaigns",metrics=SB_ETL.reportMetrics,report_date=date)
  162. # print(list_campaigns_report)
  163. df_campaign_report = pd.json_normalize(list_campaigns_report)
  164. return df_campaign_report
  165. class SD_ETL(SDClient,Common_ETLMethod):
  166. def campaigns_ETL(self):
  167. list_campaign_SD = self.get_campaigns()
  168. df_campaign = pd.json_normalize(list_campaign_SD)
  169. df_campaign['startDate'] = df_campaign['startDate'].map(lambda x: datetime.strptime(x,"%Y%m%d").date()) # 转换为标准时间格式
  170. df_campaign['portfolioId'] = df_campaign['portfolioId'].fillna(-1).astype("int64") # 将portfolio列为空的填充为-1
  171. return self.columnsName_modify(df_campaign)
  172. def adGroups_ETL(self,**param):
  173. list_adGroups_SD = [row for _ in list(self.iter_adGroups(**param)) for row in _]
  174. df_adGroups_SD = pd.json_normalize(list_adGroups_SD)
  175. tactic = {"T00020":"Contextual targeting","T00030":"Audiences targeting"}
  176. df_adGroups_SD["tactic_type"] = df_adGroups_SD['tactic'].map(tactic) # T00020、T00030解释字段
  177. return self.columnsName_modify(df_adGroups_SD)
  178. def ads_ETL(self):
  179. list_ads_SD = [row for _ in list(self.iter_ads()) for row in _]
  180. df_ads_SD = pd.json_normalize(list_ads_SD)
  181. return self.columnsName_modify(df_ads_SD)
  182. def targets_ETL(self,**param):
  183. list_targets = [row for _ in list(self.iter_targets(**param)) for row in _]
  184. df_targets = pd.json_normalize(list_targets)
  185. df_targets = self.expression_split(df_targets, "resolvedExpression")
  186. return self.columnsName_modify(df_targets)
  187. def budget_ETL(self,campaignsIds:list):
  188. list_budget = self.get_budget(campaignIds=campaignsIds)['success']
  189. df_budget = pd.json_normalize(list_budget)
  190. df_budget = self.TZ_Deal(df_budget,["usageUpdatedTimestamp"])
  191. return self.columnsName_modify(df_budget)
  192. if __name__ == '__main__':
  193. AWS_CREDENTIALS = {
  194. 'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
  195. 'refresh_token': "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
  196. 'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
  197. 'profile_id': "3006125408623189"
  198. }
  199. ac_etl = SB_ETL(**AWS_CREDENTIALS)
  200. # print(ac_etl.budget_ETL(campaign_ids=["126327624499318"]))
  201. print(ac_etl.report_targetsRecord_ETL())