Browse Source

insert To Clickhouse

huangyifan 1 year ago
parent
commit
daa07ce2e9
1 changed files with 172 additions and 22 deletions
  1. 172 22
      sync_amz_data/DataTransform/Data_ETL.py

+ 172 - 22
sync_amz_data/DataTransform/Data_ETL.py

@@ -139,6 +139,7 @@ class SP_ETL(SPClient,Common_ETLMethod):
         df_report['campaignId'] = df_report['campaignId'].astype("string")
         df_report['date'] = df_report['date'].astype("datetime64")
         df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        print(df_report)
         params['columns'].append('profileId')
         conn = self.clickhouse_connect()
         conn.insert_df("AllReport.SP_spCampaigns_campaignV3",df_report[params['columns']])
@@ -437,12 +438,21 @@ class SB_ETL(SBClient,Common_ETLMethod):
             df_report[df_needManualAdd] = None
         df_report['profileId'] = self.profile_id
         params['columns'].append('profileId')
+
+        df_report[['campaignId','adGroupId']] = df_report[['campaignId','adGroupId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_sbPurchasedProduct_asinV3",df_report[params['columns']])
+        print("插入完成")
+
         return df_report[params['columns']]
 
     def reportV2_campaignsRecord_ETL(self,**params):
         today = datetime.today()
         if params.get("date")==None:
             params["date"] = (datetime(today.year,today.month,today.day,tzinfo=timezone.utc)-timedelta(days=1)).strftime("%Y%m%d")
+
         params['record_type']='campaigns'
         if params.get('metrics')==None:
             params['metrics'] = ['campaignId',
@@ -467,8 +477,8 @@ class SB_ETL(SBClient,Common_ETLMethod):
                                  'campaignRuleBasedBudget',
                                  'campaignStatus',
                                  'dpv14d',
-                                 'topOfSearchImpressionShare'
-                                 'unitsSold14d',
+                                 'topOfSearchImpressionShare',
+                                 'unitsSold14d'
 
             ]
         list_report = self.get_v2_report(record_type=params['record_type'],report_date=params["date"],metrics=params['metrics'])
@@ -477,8 +487,19 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
+        params['metrics'].append('date')
         df_report['profileId'] = self.profile_id
         params['metrics'].append('profileId')
+
+        df_report[['campaignId']] = df_report[['campaignId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_campaignsV2",df_report[params['metrics']])
+        print("插入完成")
+
         return df_report[params['metrics']]
 
     def reportV2_campaignsVideo_ETL(self,**params):
@@ -528,8 +549,19 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
+        params['metrics'].append('date')
         df_report['profileId'] = self.profile_id
         params['metrics'].append('profileId')
+
+        df_report[['campaignId']] = df_report[['campaignId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_campaignsVideoV2",df_report[params['metrics']])
+        print("插入完成")
+        # print(df_report[params['metrics']].info())
         return df_report[params['metrics']]
 
     def reportV2_placementRecord_ETL(self,**params):
@@ -563,17 +595,26 @@ class SB_ETL(SBClient,Common_ETLMethod):
                                  'campaignRuleBasedBudget',
                                  'campaignStatus',
                                  'dpv14d',
-                                 'unitsSold14d',
+                                 'unitsSold14d'
                 ] #'placement'
         # print(date)
         list_report = self.get_v2_report(record_type=params['record_type'],report_date=params["date"],metrics=params['metrics'],segment='placement')
-        # print(list_report)
         df_report = pd.json_normalize(list_report)
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['placement','date','profileId'])
+        df_report[['campaignId']] = df_report[['campaignId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_campaignsPlacementV2",df_report[params['metrics']])
+        print("插入完成")
+        # print(df_report[params['metrics']].info())
         return df_report[params['metrics']]
 
     def reportV2_placementVideo_ETL(self,**params):
@@ -607,7 +648,7 @@ class SB_ETL(SBClient,Common_ETLMethod):
                 'currency',
                 'dpv14d',
                 'vctr',
-                'vtr'
+                'vtr',
                 'video5SecondViewRate',
                 'video5SecondViews',
                 'videoCompleteViews',
@@ -624,8 +665,19 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['placement','date','profileId'])
+        df_report[['campaignId']] = df_report[['campaignId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_campaignsPlacementVideoV2",df_report[params['metrics']])
+        print("插入完成")
+
         return df_report[params['metrics']]
 
     def reportV2_adGroupsRecord_ETL(self,**params):
@@ -640,7 +692,7 @@ class SB_ETL(SBClient,Common_ETLMethod):
                 'adGroupId',
                 'adGroupName',
                 'impressions','clicks', 'cost',
-                'attributedBrandedSearches14d'
+                'attributedBrandedSearches14d',
                 'attributedConversions14d',
                 'attributedConversions14dSameSKU',
                 'attributedDetailPageViewsClicks14d',
@@ -667,8 +719,19 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId']] = df_report[['campaignId','adGroupId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_adGroupsV2",df_report[params['metrics']])
+        print("插入完成")
+
         return df_report[params['metrics']]
 
     def reportV2_adGroupsVideo_ETL(self,**params):
@@ -721,8 +784,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId']] = df_report[['campaignId','adGroupId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_adGroupsVideoV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
 
@@ -769,8 +841,19 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId','targetId']] = df_report[['campaignId','adGroupId','targetId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_targetsV2",df_report[params['metrics']])
+        print("插入完成")
+
         return df_report[params['metrics']]
 
     def reportV2_targetsVideo_ETL(self,**params):
@@ -828,8 +911,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId','targetId']] = df_report[['campaignId','adGroupId','targetId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_targetsVideoV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
     def reportV2_keywordsRecord_ETL(self,**params):
@@ -881,8 +973,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId','keywordId','applicableBudgetRuleId']] = df_report[['campaignId','adGroupId','keywordId','applicableBudgetRuleId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_keywordsV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
     def reportV2_keywordsVideo_ETL(self,**params):
@@ -938,8 +1039,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId','keywordId']] = df_report[['campaignId','adGroupId','keywordId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_keywordsVideoV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
     def reportV2_searchtermsRecord_ETL(self,**params):
@@ -974,8 +1084,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['query','date','profileId'])
+        df_report[['campaignId','adGroupId','keywordId']] = df_report[['campaignId','adGroupId','keywordId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_keywordsQueryV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
     def reportV2_searchtermsVideo_ETL(self,**params):
@@ -1021,8 +1140,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['query','date','profileId'])
+        df_report[['campaignId','adGroupId','keywordId']] = df_report[['campaignId','adGroupId','keywordId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_keywordsQueryVideoV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
     def reportV2_adsRecord_ETL(self,**params):
@@ -1042,6 +1170,7 @@ class SB_ETL(SBClient,Common_ETLMethod):
                 'cost',
                 'applicableBudgetRuleId',
                 'applicableBudgetRuleName',
+                'attributedBrandedSearches14d',
                 'attributedConversions14d',
                 'attributedConversions14dSameSKU',
                 'attributedDetailPageViewsClicks14d',
@@ -1061,7 +1190,7 @@ class SB_ETL(SBClient,Common_ETLMethod):
                 'dpv14d',
                 'unitsSold14d',
                 'vctr',
-                'attributedBrandedSearches14d'
+
                 ] #
         # print(date)
         list_report = self.get_v2_report(record_type=params['record_type'],report_date=params["date"],metrics=params['metrics'])
@@ -1070,8 +1199,18 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId','adId','applicableBudgetRuleId']] = df_report[['campaignId','adGroupId','adId','applicableBudgetRuleId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        print(df_report[params['metrics']].info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_adsV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
     def reportV2_adsVideo_ETL(self,**params):
@@ -1086,9 +1225,9 @@ class SB_ETL(SBClient,Common_ETLMethod):
                 'adGroupId',
                 'adGroupName',
                 'adId', 'impressions','clicks', 'cost',
-                'attributedBrandedSearches14d',
                 'applicableBudgetRuleId',
                 'applicableBudgetRuleName',
+                'attributedBrandedSearches14d',
                 'attributedConversions14d',
                 'attributedConversions14dSameSKU',
                 'attributedDetailPageViewsClicks14d',
@@ -1125,8 +1264,17 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_needManualAdd = [i for i in params['metrics'] if i not in df_report.columns]
         if len(df_needManualAdd)>0:
             df_report[df_needManualAdd] = None
+        date = datetime.strptime(params['date'], '%Y%m%d')
+        df_report['date'] = date
         df_report['profileId'] = self.profile_id
-        params['metrics'].append('profileId')
+        params['metrics'].extend(['date','profileId'])
+        df_report[['campaignId','adGroupId','adId','applicableBudgetRuleId']] = df_report[['campaignId','adGroupId','adId','applicableBudgetRuleId']].astype("string")
+        df_report['date'] = df_report['date'].astype("datetime64")
+        df_report[df_report.select_dtypes('O').columns] = df_report[df_report.select_dtypes('O').columns].astype('string')
+        # print(df_report.info())
+        conn = self.clickhouse_connect()
+        conn.insert_df("AllReport.SB_adsVideoV2",df_report[params['metrics']])
+        print("插入完成")
         return df_report[params['metrics']]
 
 class SD_ETL(SDClient,Common_ETLMethod):
@@ -1861,7 +2009,9 @@ if __name__ == '__main__':
         'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
         'profile_id': "3006125408623189"
     }
-    ac_etl = SP_ETL(**AWS_CREDENTIALS)
+    ac_etl = Acount_ETL(**AWS_CREDENTIALS)
     # print(ac_etl.budget_ETL(campaign_ids=["126327624499318"]))
-    print(ac_etl.reportV3_campaign_spCampaignsETL(**{}))
+    print(ac_etl.portfolio_ETL(**{}))
+
+    ###