Bläddra i källkod

modify spCampaigns Report ETL

huangyifan 1 år sedan
förälder
incheckning
765352d47f
3 ändrade filer med 165 tillägg och 6 borttagningar
  1. 1 0
      start_sync_amz.py
  2. 72 2
      sync_amz_data/DataTransform/Data_ETL.py
  3. 92 4
      sync_amz_data/public/amz_ad_client.py

+ 1 - 0
start_sync_amz.py

@@ -6,6 +6,7 @@ import logging.config
 from apscheduler.schedulers.blocking import BlockingScheduler
 
 
+
 logging.config.dictConfig(LOG_CONF)
 
 if __name__ == '__main__':

+ 72 - 2
sync_amz_data/DataTransform/Data_ETL.py

@@ -486,6 +486,46 @@ class SB_ETL(SBClient, Common_ETLMethod):
         df_budget = self.TZ_Deal(df_budget, ["usageUpdatedTimestamp"])
         return self.columnsName_modify(df_budget)
 
+    def reportV3_campaign_sbCampaigns_ETL(self, conn, params={}):
+        timeZone_ = self.timeZone()
+        today = datetime.now(tz=pytz.timezone(timeZone_))
+        if params.get("endDate") == None:
+            params["endDate"] = (datetime(today.year, today.month, today.day) - timedelta(days=1)).strftime("%Y-%m-%d")
+        if params.get("startDate") == None:
+            params["startDate"] = (datetime(today.year, today.month, today.day) - timedelta(days=1)).strftime(
+                "%Y-%m-%d")
+        params['reportType'] = "sbCampaigns" #sbCampaigns
+        params['columns'] = ['campaignId',
+                             'campaignName','campaignBudgetAmount', 'campaignBudgetCurrencyCode', 'campaignBudgetType', 'topOfSearchImpressionShare',
+                             'addToCart', 'addToCartClicks', 'addToCartRate', 'brandedSearches', 'brandedSearchesClicks',
+                             'campaignBudgetAmount', 'campaignBudgetCurrencyCode', 'campaignBudgetType',  'campaignStatus', 'clicks', 'cost',
+                             'costType', 'date', 'detailPageViews','detailPageViewsClicks', 'eCPAddToCart', 'endDate', 'impressions', 'newToBrandDetailPageViewRate',
+                             'newToBrandDetailPageViews', 'newToBrandDetailPageViewsClicks', 'newToBrandECPDetailPageView',
+                             'newToBrandPurchases', 'newToBrandPurchasesClicks', 'newToBrandPurchasesPercentage',
+                             'newToBrandPurchasesRate', 'newToBrandSales', 'newToBrandSalesClicks', 'newToBrandSalesPercentage',
+                             'newToBrandUnitsSold', 'newToBrandUnitsSoldClicks', 'newToBrandUnitsSoldPercentage', 'purchases',
+                             'purchasesClicks', 'purchasesPromoted', 'sales', 'salesClicks', 'salesPromoted', 'startDate',
+                             'topOfSearchImpressionShare', 'unitsSold', 'unitsSoldClicks', 'video5SecondViewRate',
+                             'video5SecondViews', 'videoCompleteViews', 'videoFirstQuartileViews', 'videoMidpointViews',
+                             'videoThirdQuartileViews', 'videoUnmutes', 'viewabilityRate', 'viewableImpressions',
+                             'viewClickThroughRate'
+        ]  # 'startDate', 'endDate',
+        params['groupby'] = ['campaign']
+        params['timeUnit'] = 'DAILY'
+        list_report = self.get_v3_report(timeUnit=params['timeUnit'], groupby=params['groupby'],
+                                         columns=params['columns'], startDate=params['startDate'],
+                                         endDate=params['endDate'], reportType=params['reportType'])
+        # print(list_report)
+        df_report = pd.json_normalize(list_report)
+        df_report = self.type_trans(df_report, params['columns'], timeZone_, extra_columns=[])
+
+        print(df_report)
+        # conn.insert_df("AmazonReport.SB_sbPurchasedProduct_asinV3", df_report[params['columns']])
+        # print("插入完成SB_sbPurchasedProduct_asinV3")
+
+        return df_report[params['columns']]
+
+
     def reportV3_purchasedAsinRecord_ETL(self, conn, params={}):
         timeZone_ = self.timeZone()
         today = datetime.now(tz=pytz.timezone(timeZone_))
@@ -511,6 +551,7 @@ class SB_ETL(SBClient, Common_ETLMethod):
         df_report = pd.json_normalize(list_report)
         df_report = self.type_trans(df_report, params['columns'], timeZone_, extra_columns=[])
 
+        print(df_report)
         conn.insert_df("AmazonReport.SB_sbPurchasedProduct_asinV3", df_report[params['columns']])
         print("插入完成SB_sbPurchasedProduct_asinV3")
 
@@ -982,6 +1023,35 @@ class SD_ETL(SDClient, Common_ETLMethod):
         df_budget = self.TZ_Deal(df_budget, ["usageUpdatedTimestamp"])
         return self.columnsName_modify(df_budget)
 
+    def reportV3_campaign_sdCampaigns_ETL(self, conn, params={}):
+        timeZone_ = self.timeZone()
+        today = datetime.now(tz=pytz.timezone(timeZone_))
+        if params.get("endDate") == None:
+            params["endDate"] = (datetime(today.year, today.month, today.day) - timedelta(days=1)).strftime("%Y-%m-%d")
+        if params.get("startDate") == None:
+            params["startDate"] = (datetime(today.year, today.month, today.day) - timedelta(days=1)).strftime(
+                "%Y-%m-%d")
+        params['reportType'] = "sdCampaigns"
+        params['columns'] = [
+            "impressions","clicks","cost","campaignId",'date'
+        ]  # 'startDate', 'endDate',
+
+
+        params['groupby'] = ['campaign']
+        params['timeUnit'] = 'DAILY'
+        list_report = self.get_v3_report(timeUnit=params['timeUnit'], groupby=params['groupby'],
+                                         columns=params['columns'], startDate=params['startDate'],
+                                         endDate=params['endDate'], reportType=params['reportType'])
+        # print(list_report)
+        df_report = pd.json_normalize(list_report)
+        df_report = self.type_trans(df_report, params['columns'], timeZone_, extra_columns=[])
+
+        print(df_report)
+        # conn.insert_df("AmazonReport.SB_sbPurchasedProduct_asinV3", df_report[params['columns']])
+        # print("插入完成SB_sbPurchasedProduct_asinV3")
+
+        return df_report[params['columns']]
+
     campaigns_metrics = [
         'campaignId','campaignName','impressions','clicks','cost','attributedBrandedSearches14d',
         'attributedConversions1d','attributedConversions1dSameSKU','attributedConversions7d',
@@ -1469,8 +1539,8 @@ if __name__ == '__main__':
     conn = Common_ETLMethod(**AWS_CREDENTIALS).clickhouse_connect()
 
     # SD
-    ac_etl = SP_ETL(**AWS_CREDENTIALS)
-    ls = ac_etl.reportV3_campaignPlacement_spCampaignsETL(conn,params={"startDate":"2023-10-28","endDate":"2023-10-28"})
+    ac_etl = SD_ETL(**AWS_CREDENTIALS)
+    ls = ac_etl.reportV3_campaign_sdCampaigns_ETL(conn)
     print(ls)
     # print(ls.to_excel('obse11.xlsx'))
     # ac_etl.reportV2_campaignsRecord_t2_ETL(conn)

+ 92 - 4
sync_amz_data/public/amz_ad_client.py

@@ -134,7 +134,7 @@ class SPClient(BaseClient):
             body["nextToken"] = info["nextToken"]
         logger.info(f"总共数量:{info['totalResults']}")
 
-    def get_spbudgetrecommendation(self, campaign_ids):
+    def get_budgetrecommendation(self, campaign_ids):
         url_path = "/sp/campaigns/budgetRecommendations"
         body = {
             "campaignIds": campaign_ids
@@ -145,6 +145,13 @@ class SPClient(BaseClient):
         }
         return self._request(url_path, method="POST", headers=headers, body=body)
 
+    def iter_budgetrecommendation(self,campaign_ids):
+        for i in range(0,len(campaign_ids),100):
+            campaign_ids = campaign_ids[i:i+100]
+            info: dict = self.get_budgetrecommendation(campaign_ids)
+            yield from info["budgetRecommendationsSuccessResults"]
+
+
     def get_ad_groups(self, **body):
         url_path = "/sp/adGroups/list"
         headers = {
@@ -210,6 +217,7 @@ class SPClient(BaseClient):
         }
         return self._request(url_path, method="POST", body=body, headers=headers)
 
+
     def iter_targets(self, **body) -> Iterator[dict]:
         if "maxResults" not in body:
             body["maxResults"] = 100
@@ -250,6 +258,7 @@ class SPClient(BaseClient):
         body = {"adGroupId": adGroupId,
                 "keywords": keywords}
         return self._request(url_path, method="POST", body=body)
+
     def get_v3_report(self,
                       groupby:list,
                       columns:list,
@@ -664,6 +673,83 @@ class SDClient(BaseClient):
                  }
 
         return self._request(url_path, method="POST", headers=headers,body=body,params={"locale":locale})
+    def get_v3_report(self,
+                      groupby:list,
+                      columns:list,
+                      startDate:str,
+                      endDate:str,
+                      reportType: Literal['sdCampaigns', 'sdPurchasedProduct', 'sdTargeting', 'sdSearchTerm'],
+                      timeUnit="DAILY",
+                      download=True):
+        """
+        Now about reportType is only sbPurchasedProduct available.
+        @param groupby: 聚合条件
+        @param columns: 需要获取的字段[campaign,purchasedAsin,targeting,searchTerm]
+        @param startDate: 请求开始的日期
+        @param endDate: 请求结束的日期
+        @param reportType: 广告类型
+        @param timeUnit: 时间指标-[DAILY, SUMMARY]
+        @param download: 下载报告
+        """
+        url_path = "/reporting/reports"
+        headers = {
+            "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
+            }
+        body = {
+                "name":"SD campaigns report",
+                "startDate":startDate,
+                "endDate":endDate,
+                "configuration":{
+                    "adProduct":"SPONSORED_DISPLAY",
+                    "groupBy":groupby,
+                    "columns":columns,
+                    "reportTypeId":reportType,
+                    "timeUnit":timeUnit,
+                    "format":"GZIP_JSON"
+                }
+            }
+        ret = self._request(url_path,method="POST",headers=headers,body=body)
+        # print(ret)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status in ["PROCESSING","PENDING"]:
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/reporting/reports/{report_id}")
+            print(ret)
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            pid = self.profile_id
+            reportrel= self.download_v3_report(ret['url'],f"s3://reportforspsbsd/zosi/us/sd/{startDate}_{endDate}_{reportType}_{str(groupby)}_{str(pid)}.json.gz")
+            return reportrel
+        else:
+            return ret
+
+    def download_v3_report(self, url, file_path: str, decompress: bool = True) -> str:
+        resp = requests.get(url, stream=True, allow_redirects=True)
+        print(resp)
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                f.write(data)
+        if not decompress:
+            return file_path
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
+        logger.info(f"解压完成:{de_file}")
+        # print(de_file)
+        return de_file
 
     def get_v2_report(
             self,
@@ -764,9 +850,11 @@ if __name__ == '__main__':
     # keyword=["8mp security camera system","8mp security camera system"],
     # matchType=["broad","exact"]))
 
-    sd = SBClient(**AWS_CREDENTIALS)
-    # print(sd.get_keywords(**{"campaignIdFilter":"144215934259340472"}))
-    print(sd.get_campaign_v3("144215934259340472"))
+    sb = SPClient(**AWS_CREDENTIALS)
+    print(list(sb.iter_budgetrecommendation(["171417956384778"])))
+    # print(list(sb.iter_campaigns(**{"stateFilter": {
+    #                     "include": ["ARCHIVED","PAUSED","ENABLED"]
+    #                     }})))
     # print(sb.get_keyword_bidrecommendation(**{'campaignId': 27333596383941, 'keywords': [
     #     {"matchType": 'broad', "keywordText": "4k security camera system"}]}))
     # a = list(sd.iter_targets(**{"campaignIdFilter":"257424912382921"})) #list(sd.iter_targets())#