huangyifan před 1 rokem
rodič
revize
55141b09e5

+ 330 - 25
sync_amz_data/DataTransform/Data_ETL.py

@@ -210,13 +210,13 @@ class SP_ETL(SPClient, Common_ETLMethod):
         df_targets = pd.json_normalize(list_targets)
         df_targets = self.TZ_Deal(df_targets, ["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
         df_targets['resolvedExpressions_type'] = df_targets['resolvedExpression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'type'))
+            lambda x: self.get_keyOvalue(x, 'type'))
         df_targets['resolvedExpressions_value'] = df_targets['resolvedExpression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'value'))
+            lambda x: self.get_keyOvalue(x, 'value'))
         df_targets['expression_type'] = df_targets['expression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'type'))
+            lambda x: self.get_keyOvalue(x, 'type'))
         df_targets['expression_value'] = df_targets['expression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'value'))
+            lambda x: self.get_keyOvalue(x, 'value'))
         return self.columnsName_modify(df_targets)
 
     def negative_targets_ETL(self):
@@ -224,13 +224,13 @@ class SP_ETL(SPClient, Common_ETLMethod):
         df_targets = pd.json_normalize(list_targets)
         df_targets = self.TZ_Deal(df_targets, ["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
         df_targets['resolvedExpressions_type'] = df_targets['resolvedExpression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'type'))
+            lambda x: self.get_keyOvalue(x, 'type'))
         df_targets['resolvedExpressions_value'] = df_targets['resolvedExpression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'value'))
+            lambda x: self.get_keyOvalue(x, 'value'))
         df_targets['expression_type'] = df_targets['expression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'type'))
+            lambda x: self.get_keyOvalue(x, 'type'))
         df_targets['expression_value'] = df_targets['expression'].map(
-            lambda x: Common_ETLMethod.get_keyOvalue(x, 'value'))
+            lambda x: self.get_keyOvalue(x, 'value'))
         return self.columnsName_modify(df_targets)
 
     def budget_ETL(self, campaign_ids: list):
@@ -243,6 +243,11 @@ class SP_ETL(SPClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:","reportV3_campaign_spCampaignsETL",'\n',"table_name:","SP_spCampaigns_campaignV3")
+        if len(conn.query_df(f"select * from AmazonReport.SP_spCampaigns_campaignV3 where date='{params['startDate']}'"))>0:
+            logging.info("数据已存在...")
+            return 'Pass'
+
         params['reportType'] = "spCampaigns"
         params['columns'] = [
             'campaignName', 'campaignId', 'campaignStatus', 'campaignBudgetAmount', 'campaignBudgetType',
@@ -279,6 +284,11 @@ class SP_ETL(SPClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_adGroup_spCampaignsETL", '\n', "table_name:", "SP_spCampaigns_adGroupV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SP_spCampaigns_adGroupV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "spCampaigns"
         params['columns'] = [
             'adGroupName', 'adGroupId', 'adStatus','campaignName', 'campaignId', 'campaignStatus', 'campaignBudgetAmount', 'campaignBudgetType',
@@ -316,6 +326,11 @@ class SP_ETL(SPClient, Common_ETLMethod):
         timeZone_,today = self.today_()
         params = self.config_params(params)
         params['reportType'] = "spCampaigns"
+        print("func_name:", "reportV3_campaignPlacement_spCampaignsETL", '\n', "table_name:", "SP_spCampaigns_placementV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SP_spCampaigns_placementV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['columns'] = [
             'placementClassification','campaignName', 'campaignId','campaignStatus',  'campaignBudgetAmount', 'campaignBudgetType',
             'campaignRuleBasedBudgetAmount', 'campaignApplicableBudgetRuleId', 'campaignApplicableBudgetRuleName',
@@ -354,6 +369,12 @@ class SP_ETL(SPClient, Common_ETLMethod):
         timeZone_,today = self.today_()
         params = self.config_params(params)
         params['reportType'] = "spTargeting"
+        print("func_name:", "reportV3_targeting_spTargetingETL", '\n', "table_name:",
+              "SP_spTargeting_targetingV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SP_spTargeting_targetingV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['columns'] = [
             'adKeywordStatus',
             'impressions', 'clicks', 'costPerClick', 'clickThroughRate', 'cost', 'purchases1d', 'purchases7d',
@@ -392,6 +413,12 @@ class SP_ETL(SPClient, Common_ETLMethod):
     def reportV3_searchTerm_spSearchTermETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_searchTerm_spSearchTermETL", '\n', "table_name:",
+              "SP_spSearchTerm_searchTermV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SP_spSearchTerm_searchTermV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "spSearchTerm"
         params['columns'] = [
             'adKeywordStatus',
@@ -431,6 +458,12 @@ class SP_ETL(SPClient, Common_ETLMethod):
     def reportV3_advertiser_spAdvertisedProductETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_advertiser_spAdvertisedProductETL", '\n', "table_name:",
+              "SP_spAdvertisedProduct_advertiserV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SP_spAdvertisedProduct_advertiserV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "spAdvertisedProduct"
         params['columns'] = [
             'date', 'campaignName', 'campaignId', 'adGroupName', 'adGroupId', 'adId', 'portfolioId', 'impressions',
@@ -467,6 +500,12 @@ class SP_ETL(SPClient, Common_ETLMethod):
     def reportV3_asin_spPurchasedProductETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_asin_spPurchasedProductETL", '\n', "table_name:",
+              "SP_spPurchasedProduct_asinV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SP_spPurchasedProduct_asinV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "spPurchasedProduct"
         params['columns'] = [
             'date', 'portfolioId', 'campaignName', 'campaignId', 'adGroupName', 'adGroupId', 'keywordId', 'keyword',
@@ -532,8 +571,8 @@ class SB_ETL(SBClient, Common_ETLMethod):
         # df_targets = self.TZ_Deal(df_targets, ["extendedData.creationDateTime", "extendedData.lastUpdateDateTime"])
         # df_targets = self.expression_split(df_targets, "resolvedExpressions")
         df_targets = self.id_type_trans(df_targets)
-        df_targets['resolvedExpressions_type'] = df_targets['resolvedExpressions'].map(lambda x:Common_ETLMethod.get_keyOvalue(x,'type'))
-        df_targets['resolvedExpressions_value'] = df_targets['resolvedExpressions'].map(lambda x:Common_ETLMethod.get_keyOvalue(x,'value'))
+        df_targets['resolvedExpressions_type'] = df_targets['resolvedExpressions'].map(lambda x:self.get_keyOvalue(x,'type'))
+        df_targets['resolvedExpressions_value'] = df_targets['resolvedExpressions'].map(lambda x:self.get_keyOvalue(x,'value'))
         return self.columnsName_modify(df_targets)
 
     def budget_ETL(self, campaign_ids: list):
@@ -546,6 +585,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_campaign_sbCampaigns_ETL", '\n', "table_name:",
+              "SB_sbCampaigns_campaignV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbCampaigns_campaignV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbCampaigns" #sbCampaigns
         params['columns'] = ['campaignName','campaignId','campaignStatus',
                              'campaignBudgetAmount', 'campaignBudgetCurrencyCode', 'campaignBudgetType','impressions',
@@ -581,6 +626,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_adGroup_sbAdGroup_ETL", '\n', "table_name:",
+              "SB_sbAdGroup_adGroupV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbAdGroup_adGroupV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbAdGroup" #sbCampaigns
         params['columns'] = ['campaignName','campaignId','campaignBudgetCurrencyCode','adGroupName','adGroupId', 'impressions',  'clicks', 'cost',
                                 'addToCartRate',  'brandedSearches', 'brandedSearchesClicks','detailPageViews', 'detailPageViewsClicks',
@@ -611,6 +662,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_sbCampaignPlacement_ETL", '\n', "table_name:",
+              "SB_sbCampaigns_placementV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbCampaigns_placementV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbCampaignPlacement" #sbCampaigns
         params['columns'] = ['placementClassification','campaignName','campaignId','campaignStatus','campaignBudgetAmount','campaignBudgetType','campaignBudgetCurrencyCode',
                             'impressions','clicks', 'cost','addToCart', 'addToCartClicks', 'addToCartRate', 'brandedSearches',
@@ -642,6 +699,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_sbTargeting_ETL", '\n', "table_name:",
+              "SB_sbTargeting_targetingV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbTargeting_targetingV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbTargeting" #sbCampaigns
         params['columns'] = [
             'campaignName','campaignId','campaignStatus','campaignBudgetAmount','campaignBudgetType','campaignBudgetCurrencyCode',
@@ -675,6 +738,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_sbSearchTerm_ETL", '\n', "table_name:",
+              "SB_sbSearchTerm_searchTermV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbSearchTerm_searchTermV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbSearchTerm" #sbCampaigns
         params['columns'] = [
             'campaignName','campaignId','campaignStatus','campaignBudgetAmount','campaignBudgetType', 'campaignBudgetCurrencyCode', 'adGroupName',
@@ -703,6 +772,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_sbAds_ETL", '\n', "table_name:",
+              "SB_sbAds_adsV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbAds_adsV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbAds" #sbCampaigns
         params['columns'] = [
             'campaignName', 'campaignId','campaignStatus','campaignBudgetAmount','campaignBudgetCurrencyCode', 'campaignBudgetType',
@@ -739,6 +814,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_purchasedAsinRecord_ETL", '\n', "table_name:",
+              "SB_sbPurchasedProduct_asinV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_sbPurchasedProduct_asinV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sbPurchasedProduct"
         params['columns'] = [
             'campaignId', 'adGroupId', 'date', 'campaignBudgetCurrencyCode', 'campaignName', 'adGroupName',
@@ -769,7 +850,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
-
+        print("func_name:", "reportV2_campaignsRecord_ETL", '\n', "table_name:",
+              "SB_campaignsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_campaignsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = ['campaignId','campaignName', 'impressions', 'clicks', 'cost',
@@ -803,6 +889,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_campaignsVideo_ETL", '\n', "table_name:",
+              "SB_campaignsVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_campaignsVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = [
@@ -838,6 +930,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         print(params)
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_placementRecord_ETL", '\n', "table_name:",
+              "SB_campaignsPlacementV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_campaignsPlacementV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = ['campaignId','campaignName','impressions','clicks','cost',
@@ -868,6 +966,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_placementVideo_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_placementVideo_ETL", '\n', "table_name:",
+              "SB_campaignsPlacementVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_campaignsPlacementVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = [
@@ -901,6 +1005,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_adGroupsRecord_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adGroupsRecord_ETL", '\n', "table_name:",
+              "SB_adGroupsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_adGroupsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'adGroups'
 
         metric = [
@@ -934,7 +1044,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
         timeZone_,today = self.today_()
         params = self.config_params(params)
         params['record_type'] = 'adGroups'
-
+        print("func_name:", "reportV2_adGroupsVideo_ETL", '\n', "table_name:",
+              "SB_adGroupsVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_adGroupsVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         metric = [
                 'campaignId','campaignName','adGroupId','adGroupName','impressions','clicks','cost',
                 'attributedBrandedSearches14d','attributedConversions14d','attributedConversions14dSameSKU',
@@ -965,6 +1080,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_targetsRecord_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_targetsRecord_ETL", '\n', "table_name:",
+              "SB_targetsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_targetsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'targets'
 
         metric = [
@@ -997,6 +1118,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_targetsVideo_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_targetsVideo_ETL", '\n', "table_name:",
+              "SB_targetsVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_targetsVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'targets'
 
         metric = [
@@ -1030,6 +1157,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_keywordsRecord_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_keywordsRecord_ETL", '\n', "table_name:",
+              "SB_keywordsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_keywordsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'keywords'
 
         metric = [
@@ -1062,6 +1195,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_keywordsVideo_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_keywordsVideo_ETL", '\n', "table_name:",
+              "SB_keywordsVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_keywordsVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'keywords'
 
         metric = [
@@ -1095,6 +1234,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_searchtermsRecord_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_searchtermsRecord_ETL", '\n', "table_name:",
+              "SB_keywordsQueryV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_keywordsQueryV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'keywords'
 
         metric = [
@@ -1122,6 +1267,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_searchtermsVideo_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_searchtermsVideo_ETL", '\n', "table_name:",
+              "SB_keywordsQueryVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_keywordsQueryVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'keywords'
 
         metric = [
@@ -1152,6 +1303,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_adsRecord_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adsRecord_ETL", '\n', "table_name:",
+              "SB_adsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_adsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'ads'
 
         metric = [
@@ -1182,6 +1339,12 @@ class SB_ETL(SBClient, Common_ETLMethod):
     def reportV2_adsVideo_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adsVideo_ETL", '\n', "table_name:",
+              "SB_adsVideoV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SB_adsVideoV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'ads'
 
         metric = [
@@ -1240,8 +1403,8 @@ class SD_ETL(SDClient, Common_ETLMethod):
         df_targets = pd.json_normalize(list_targets)
         # df_targets = self.expression_split(df_targets, "resolvedExpression")
         df_targets = self.id_type_trans(df_targets)
-        df_targets['resolvedExpressions_type'] = df_targets['resolvedExpression'].map(lambda x:Common_ETLMethod.get_keyOvalue(x,'type'))
-        df_targets['resolvedExpressions_value'] = df_targets['resolvedExpression'].map(lambda x:Common_ETLMethod.get_keyOvalue(x,'value'))
+        df_targets['resolvedExpressions_type'] = df_targets['resolvedExpression'].map(lambda x:self.get_keyOvalue(x,'type'))
+        df_targets['resolvedExpressions_value'] = df_targets['resolvedExpression'].map(lambda x:self.get_keyOvalue(x,'value'))
         return self.columnsName_modify(df_targets)
 
     def budget_ETL(self, campaignsIds: list):
@@ -1253,6 +1416,13 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_campaign_sdCampaigns_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        params = self.config_params(params)
+        print("func_name:", "reportV3_campaign_sdCampaigns_ETL", '\n', "table_name:",
+              "SD_sdCampaigns_campaignV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_sdCampaigns_campaignV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdCampaigns"
         params['columns'] = [ 'campaignName', 'campaignId','campaignStatus','campaignBudgetAmount', 'impressions','clicks', 'cost',
                              'addToCart', 'addToCartClicks', 'addToCartRate', 'addToCartViews',
@@ -1288,6 +1458,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_campaignMT_sdCampaigns_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_campaignMT_sdCampaigns_ETL", '\n', "table_name:",
+              "SD_sdCampaigns_campaignMatchedTargetV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_sdCampaigns_campaignMatchedTargetV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdCampaigns"
         params['columns'] = [
             'matchedTargetAsin','campaignName', 'campaignId','campaignStatus','campaignBudgetAmount', 'impressions','clicks', 'cost',
@@ -1323,6 +1499,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_adgroup_sdAdGroup_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_adgroup_sdAdGroup_ETL", '\n', "table_name:",
+              "SD_sdAdGroup_adGroupV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_sdAdGroup_adGroupV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdAdGroup"
         params['columns'] = ['campaignName','campaignId', 'adGroupName', 'adGroupId', 'impressions','clicks', 'cost',
                              'addToCart', 'addToCartClicks', 'addToCartRate', 'addToCartViews',
@@ -1359,6 +1541,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_adgroupMT_sdAdGroup_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_adgroupMT_sdAdGroup_ETL", '\n', "table_name:",
+              "SD_sdAdGroup_adGroupMatchedTargetV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_sdAdGroup_adGroupMatchedTargetV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdAdGroup"
         params['columns'] = [
             'matchedTargetAsin','campaignName','campaignId', 'adGroupName', 'adGroupId', 'impressions','clicks', 'cost',
@@ -1395,6 +1583,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_targeting_sdTargeting_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_targeting_sdTargeting_ETL", '\n', "table_name:",
+              "SD_targeting_sdTargetingV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_targeting_sdTargetingV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdTargeting"
         params['columns'] = ['campaignName', 'campaignId','adGroupName','adGroupId', 'targetingText','targetingId','impressions','clicks', 'cost',
             'adKeywordStatus', 'addToCart', 'addToCartClicks', 'addToCartRate', 'addToCartViews',
@@ -1435,6 +1629,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_targetingMT_sdTargeting_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_targetingMT_sdTargeting_ETL", '\n', "table_name:",
+              "SD_targeting_sdTargetingMatchedTargetV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_targeting_sdTargetingMatchedTargetV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdTargeting"
         params['columns'] = [
             'matchedTargetAsin',
@@ -1476,6 +1676,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_advertiser_sdAdvertisedProduct_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_advertiser_sdAdvertisedProduct_ETL", '\n', "table_name:",
+              "SD_advertiser_sdAdvertisedProductV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_advertiser_sdAdvertisedProductV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdAdvertisedProduct"
         params['columns'] = ['campaignName','campaignId','adGroupName','adGroupId','adId','impressions','clicks', 'cost',
             'addToCart', 'addToCartClicks', 'addToCartRate', 'addToCartViews',    'bidOptimization',
@@ -1514,6 +1720,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV3_asin_sdPurchasedProduct_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV3_asin_sdPurchasedProduct_ETL", '\n', "table_name:",
+              "SD_asin_sdPurchasedProductV3")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_asin_sdPurchasedProductV3 where date='{params['startDate']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['reportType'] = "sdPurchasedProduct"
         params['columns'] = ['campaignName','campaignId', 'adGroupName','adGroupId', 'promotedAsin', 'promotedSku',
               'asinBrandHalo', 'campaignBudgetCurrencyCode',
@@ -1561,6 +1773,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_campaignsRecord_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_campaignsRecord_t2_ETL", '\n', "table_name:",
+              "SD_campaignsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_campaignsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = self.campaigns_metrics
@@ -1584,6 +1802,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_campaignsRecord_t3_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_campaignsRecord_t3_ETL", '\n', "table_name:",
+              "SD_campaignsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_campaignsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = self.campaigns_metrics
@@ -1624,6 +1848,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_adGroupsRecord_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adGroupsRecord_t2_ETL", '\n', "table_name:",
+              "SD_adGroupsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_adGroupsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'adGroups'
 
         metric = self.adGroups_metrics
@@ -1645,6 +1875,13 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_adGroupsRecord_t3_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adGroupsRecord_t3_ETL", '\n', "table_name:",
+              "SD_adGroupsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_adGroupsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
+
         params['record_type'] = 'adGroups'
 
         metric = self.adGroups_metrics
@@ -1682,6 +1919,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_productAds_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_productAds_t2_ETL", '\n', "table_name:",
+              "SD_adsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_adsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'productAds'
 
         metric = self.productAds_metrics
@@ -1704,6 +1947,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_productAds_t3_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_productAds_t3_ETL", '\n', "table_name:",
+              "SD_adsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_adsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'productAds'
 
         metric = self.productAds_metrics
@@ -1742,6 +1991,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_targets_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_targets_t2_ETL", '\n', "table_name:",
+              "SD_targetsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_targetsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'targets'
 
         metric = self.targets_metrics
@@ -1764,6 +2019,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_targets_t3_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_targets_t3_ETL", '\n', "table_name:",
+              "SD_targetsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_targetsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'targets'
 
         metric = self.targets_metrics
@@ -1799,6 +2060,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_asins_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_asins_t2_ETL", '\n', "table_name:",
+              "SD_asinsV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_asinsV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'asins'
 
         metric = self.asins_metrics
@@ -1860,6 +2127,13 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_campaign_matchedTarget_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_campaign_matchedTarget_t2_ETL", '\n', "table_name:",
+              "SD_campaignsMatchedTargetV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_campaignsMatchedTargetV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
+
         params['record_type'] = 'campaigns'
 
         metric = self.campaigns_MT_metrics
@@ -1884,6 +2158,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
         timeZone_,today = self.today_()
 
         params = self.config_params(params)
+        print("func_name:", "reportV2_campaign_matchedTarget_t3_ETL", '\n', "table_name:",
+              "SD_campaignsMatchedTargetV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_campaignsMatchedTargetV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'campaigns'
 
         metric = self.campaigns_MT_metrics
@@ -1921,6 +2201,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_adGroups_matchedTarget_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adGroups_matchedTarget_t2_ETL", '\n', "table_name:",
+              "SD_adGroupsMatchedTargetV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_adGroupsMatchedTargetV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'adGroups'
 
         metric = self.adGroups_MT_metrics
@@ -1943,6 +2229,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_adGroups_matchedTarget_t3_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_adGroups_matchedTarget_t3_ETL", '\n', "table_name:",
+              "SD_adGroupsMatchedTargetV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_adGroupsMatchedTargetV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'adGroups'
 
         metric = self.adGroups_MT_metrics
@@ -1978,6 +2270,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_targets_matchedTarget_t2_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_targets_matchedTarget_t2_ETL", '\n', "table_name:",
+              "SD_targetsMatchedTargetV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_targetsMatchedTargetV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'targets'
 
         metric = self.targets_MT_metrics
@@ -1999,6 +2297,12 @@ class SD_ETL(SDClient, Common_ETLMethod):
     def reportV2_targets_matchedTarget_t3_ETL(self, conn, params:dict=None):
         timeZone_,today = self.today_()
         params = self.config_params(params)
+        print("func_name:", "reportV2_targets_matchedTarget_t3_ETL", '\n', "table_name:",
+              "SD_targetsMatchedTargetV2")
+        if len(conn.query_df(
+                f"select * from AmazonReport.SD_targetsMatchedTargetV2 where date='{params['date']}'")) > 0:
+            logging.info("数据已存在...")
+            return 'Pass'
         params['record_type'] = 'targets'
 
         metric = self.targets_MT_metrics
@@ -2027,15 +2331,16 @@ if __name__ == '__main__':
     }
     conn = Common_ETLMethod(**AWS_CREDENTIALS).clickhouse_connect()
 
-    sb_ = SP_ETL(**AWS_CREDENTIALS)
-    print(sb_.targets_ETL())
+    sb_ = SB_ETL(**AWS_CREDENTIALS)
+    conn = sb_.clickhouse_connect()
+    # print(sb_.reportV2_adGroupsRecord_ETL(conn))
     # list_date = ['2023-11-20']
-    # list_date = [f'2023-11-{"0" + str(i) if len(str(i)) == 1 else i}' for i in range(1, 22)]
-    # for date_ in list_date:
-    #     print(date_)
-    #     print(date_.replace("-",""))
-    #     sb_ = SB_ETL(**AWS_CREDENTIALS)
-    #     rel = sb_.reportV3_sbAds_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
-    #     print(rel)
-    #     print(rel.info())
-    #     print(rel.columns)
+    list_date = [f'2023-11-{"0" + str(i) if len(str(i)) == 1 else i}' for i in range(28, 30)]
+    for date_ in list_date:
+        print(date_)
+        print(date_.replace("-",""))
+        sb_ = SB_ETL(**AWS_CREDENTIALS)
+        rel = sb_.reportV3_sbAds_ETL(conn,params={"startDate":date_,"endDate":date_,"date":date_.replace("-","")})
+        # print(rel)
+        # print(rel.info())
+        # print(rel.columns)

+ 25 - 5
sync_amz_data/public/sp_api_client.py

@@ -1,6 +1,7 @@
 import clickhouse_connect
 import time
-
+import warnings
+warnings.filterwarnings('ignore')
 import numpy as np
 from pymysql import Timestamp
 from sp_api.util import throttle_retry, load_all_pages
@@ -78,8 +79,8 @@ class SpApiRequest:
 
         report = Reports(credentials=self.credentials, marketplace=self.marketplace)
         rel = report.create_report(
-                        reportType=reportType,marketplaceIds=[self.marketplace.marketplace_id],reportOptions=reportOptions,
-                        dataStartTime=dataStartTime,dataEndTime=dataEndTime
+            reportType=reportType,marketplaceIds=[kwargs['marketpalceids'] if kwargs.get('marketpalceids') is not None else self.marketplace.marketplace_id],
+            reportOptions=reportOptions,dataStartTime=dataStartTime,dataEndTime=dataEndTime
                         )
         reportId = rel.payload.get("reportId")
         print(reportId)
@@ -238,6 +239,21 @@ class SpApiRequest:
             img_url = None if img is None else img.get("link")
         return img_url
 
+    def GET_FLAT_FILE_OPEN_LISTINGS_DATA(self):
+        start = time.time()
+        para = {"reportType": ReportType.GET_MERCHANT_LISTINGS_ALL_DATA}
+        reportid = self.create_report(**para)
+        df = self.decompression(reportid)
+        df['sellerid'] = ''
+        df['marketplace_id'] = self.marketplace.marketplace_id
+        df['country_code'] = str(self.marketplace)[-2:]
+        df['fulfillment_channel'] = df['fulfillment-channel'].map({"DEFAULT":"FBM","AMAZON_NA":"FBA"})
+
+        reserve_columns = ['listing_id','seller_id','asin','sku','country_code','marketplace_id',
+                        'quantity','fulfillment_channel','price','status']
+        print(df.columns)
+        print(df)
+
     def GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL(self):
         timezone_ = pytz.timezone(self.timezone)
         shopReportday = (datetime.now(tz=timezone_) + timedelta(days=-1)).strftime("%Y-%m-%d")
@@ -443,8 +459,12 @@ if __name__ == '__main__':
         'aws_secret_key': 'OSbkKKjShvDoWGBwRORSUqDryBtKWs8AckzwNMzR',
         'role_arn': 'arn:aws:iam::070880041373:role/Amazon_SP_API_ROLE'
     }
-    sp_ = SpApiRequest(aws_credentials,Marketplaces.US,'3006125408623189')
-    sp_.GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL()
+    sp_ = SpApiRequest(aws_credentials,Marketplaces.CA,'3006125408623189')
+    # print(sp_.shopInfo)
+    print(str(Marketplaces.CA)[-2:])
+    # for i in Marketplaces:
+    #     print(i.marketplace_id)
+    sp_.GET_FLAT_FILE_OPEN_LISTINGS_DATA()
     """
     create database amz_sp_api;
     """