Quellcode durchsuchen

report v2_v3 receive function

huangyifan vor 1 Jahr
Ursprung
Commit
6247ed716e
2 geänderte Dateien mit 241 neuen und 45 gelöschten Zeilen
  1. 46 3
      sync_amz_data/DataTransform/Data_ETL.py
  2. 195 42
      sync_amz_data/public/amz_ad_client.py

+ 46 - 3
sync_amz_data/DataTransform/Data_ETL.py

@@ -6,7 +6,7 @@ pd.set_option('display.max_columns', None)
 import warnings
 warnings.filterwarnings('ignore')
 pd.set_option('expand_frame_repr', False)
-from datetime import datetime,timezone
+from datetime import datetime,timezone,timedelta
 
 class Common_ETLMethod:
     def columnsName_modify(self,df):
@@ -104,6 +104,34 @@ class SP_ETL(SPClient,Common_ETLMethod):
         return self.columnsName_modify(df_budget)
 
 class SB_ETL(SBClient,Common_ETLMethod):
+    reportMetrics = [
+        'applicableBudgetRuleId',
+        'applicableBudgetRuleName',
+        'attributedConversions14d',
+        'attributedConversions14dSameSKU',
+        'attributedDetailPageViewsClicks14d',
+        'attributedOrderRateNewToBrand14d',
+        'attributedOrdersNewToBrand14d',
+        'attributedOrdersNewToBrandPercentage14d',
+        'attributedSales14d',
+        'attributedSales14dSameSKU',
+        'attributedSalesNewToBrand14d',
+        'attributedSalesNewToBrandPercentage14d',
+        'attributedUnitsOrderedNewToBrand14d',
+        'attributedUnitsOrderedNewToBrandPercentage14d',
+        'campaignBudget',
+        'campaignBudgetType',
+        'campaignId',
+        'campaignName',
+        'campaignRuleBasedBudget',
+        'campaignStatus',
+        'clicks',
+        'cost',
+        'dpv14d',
+        'impressions',
+        'unitsSold14d',
+        'attributedBrandedSearches14d',
+        'topOfSearchImpressionShare']
     def campaigns_ETL(self):
         list_campaign_SB = list(self.iter_campaigns(**{"includeExtendedDataFields":True}))
         df_campaign = pd.json_normalize(list_campaign_SB)
@@ -144,6 +172,19 @@ class SB_ETL(SBClient,Common_ETLMethod):
         df_budget = self.TZ_Deal(df_budget,["usageUpdatedTimestamp"])
         return self.columnsName_modify(df_budget)
 
+    def report_campaignsRecord_ETL(self):
+        today = datetime.today()
+        date = (datetime(today.year,today.month,today.day,tzinfo=timezone.utc)-timedelta(days=1)).strftime("%Y%m%d")
+        print(date)
+        need_removedList = []
+        if need_removedList is not None:
+            [SB_ETL.reportMetrics.remove(i) for i in need_removedList]
+        list_campaigns_report = self.get_v3_report(record_type="campaigns",metrics=SB_ETL.reportMetrics,report_date=date)
+        # print(list_campaigns_report)
+        df_campaign_report = pd.json_normalize(list_campaigns_report)
+        return df_campaign_report
+
+
 class SD_ETL(SDClient,Common_ETLMethod):
     def campaigns_ETL(self):
         list_campaign_SD = self.get_campaigns()
@@ -176,6 +217,8 @@ class SD_ETL(SDClient,Common_ETLMethod):
         df_budget = self.TZ_Deal(df_budget,["usageUpdatedTimestamp"])
         return self.columnsName_modify(df_budget)
 
+
+
 if __name__ == '__main__':
     AWS_CREDENTIALS = {
         'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
@@ -183,6 +226,6 @@ if __name__ == '__main__':
         'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
         'profile_id': "3006125408623189"
     }
-    ac_etl = SD_ETL(**AWS_CREDENTIALS)
+    ac_etl = SB_ETL(**AWS_CREDENTIALS)
     # print(ac_etl.budget_ETL(campaign_ids=["126327624499318"]))
-    print(ac_etl.budget_ETL(["257424912382921"]))
+    print(ac_etl.report_targetsRecord_ETL())

+ 195 - 42
sync_amz_data/public/amz_ad_client.py

@@ -8,6 +8,8 @@ from urllib.parse import urljoin
 from typing import List, Literal, Iterable, Iterator
 import gzip
 from pathlib import Path
+import s3fs
+from s3fs import S3FileSystem
 
 import logging
 
@@ -234,7 +236,77 @@ class SPClient(BaseClient):
         body = {"adGroupId": adGroupId,
                 "keywords": keywords}
         return self._request(url_path, method="POST", body=body)
+    def get_v3_report(self,
+                      groupby:list,
+                      columns:list,
+                      startDate:str,
+                      endDate:str,
+                      reportType: Literal['spCampaigns','spAdvertisedProduct' ,'spPurchasedProduct', 'spTargeting', 'spSearchTerm'],
+                      timeUnit="DAILY",
+                      download=True):
+        """
+        groupby: 聚合条件
+        columns: 需要获取的字段[campaign,adGroup, searchTerm,purchasedAsin,campaignPlacement,targeting,searchTerm,advertiser,asin]
+        """
+        url_path = "/reporting/reports"
+        headers = {
+            "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
+            }
+        body = {
+                "name":"SP campaigns report",
+                "startDate":startDate,
+                "endDate":endDate,
+                "configuration":{
+                    "adProduct":"SPONSORED_PRODUCTS",
+                    "groupBy":groupby,
+                    "columns":columns,
+                    "reportTypeId":reportType,
+                    "timeUnit":timeUnit,
+                    "format":"GZIP_JSON"
+                }
+            }
+        ret = self._request(url_path,method="POST",headers=headers,body=body)
+        # print(ret)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status in ["PROCESSING","PENDING"]:
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/reporting/reports/{report_id}")
+            print(ret)
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            pid = self.profile_id
+            reportrel= self.download_v3_report(ret['url'],f"s3://reportforspsbsd/zosi/us/sp/{str(groupby)}_{startDate}_{endDate}_{reportType}_{str(pid)}.json.gz")
+            return reportrel
+        else:
+            return ret
 
+    def download_v3_report(self, url, file_path: str, decompress: bool = True) -> str:
+        resp = requests.get(url, stream=True, allow_redirects=True)
+        print(resp)
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                f.write(data)
+        if not decompress:
+            return file_path
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
+        logger.info(f"解压完成:{de_file}")
+        # print(de_file)
+        return de_file
 
 class SBClient(BaseClient):
     def get_campaigns(self, **body):
@@ -334,7 +406,79 @@ class SBClient(BaseClient):
         url_path = "/sb/recommendations/bids"
         return self._request(url_path, method="POST", body=body)
 
-    def get_report(
+    def get_v3_report(self,
+                      groupby:list,
+                      columns:list,
+                      startDate:str,
+                      endDate:str,
+                      reportType: Literal['sbCampaigns', 'sbPurchasedProduct', 'sbTargeting', 'sbSearchTerm'],
+                      timeUnit="DAILY",
+                      download=True):
+        """
+        groupby: 聚合条件
+        columns: 需要获取的字段[campaign,targeting,searchTerm,purchasedAsin]
+        """
+        url_path = "/reporting/reports"
+        headers = {
+            "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
+            }
+        body = {
+                "name":"SB campaigns report",
+                "startDate":startDate,
+                "endDate":endDate,
+                "configuration":{
+                    "adProduct":"SPONSORED_BRANDS",
+                    "groupBy":groupby,
+                    "columns":columns,
+                    "reportTypeId":reportType,
+                    "timeUnit":timeUnit,
+                    "format":"GZIP_JSON"
+                }
+            }
+        ret = self._request(url_path,method="POST",headers=headers,body=body)
+        # print(ret)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status in ["PROCESSING","PENDING"]:
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/reporting/reports/{report_id}")
+            print(ret)
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            pid = self.profile_id
+            reportrel= self.download_v3_report(ret['url'],f"s3://reportforspsbsd/zosi/us/sb/{startDate}_{endDate}_{reportType}_{str(pid)}.json.gz")
+            return reportrel
+        else:
+            return ret
+
+    def download_v3_report(self, url, file_path: str, decompress: bool = True) -> str:
+        resp = requests.get(url, stream=True, allow_redirects=True)
+        print(resp)
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                f.write(data)
+        if not decompress:
+            return file_path
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
+        logger.info(f"解压完成:{de_file}")
+        # print(de_file)
+        return de_file
+
+    def get_v2_report(
             self,
             record_type: Literal['campaigns', 'adGroups', 'ads', 'targets', 'keywords'],
             report_date: str,
@@ -379,22 +523,33 @@ class SBClient(BaseClient):
                 raise Exception(ret)
         logger.info(f"报告处理完成:{ret}")
         if download:
-            self.download_report(report_id, str(self.data_path / f"sb_{record_type}.json.gz"))
+            pid = self.profile_id
+            reportrel= self.download_v2_report(report_id, f"s3://reportforspsbsd/zosi/us/sb/{str(report_date)}_{record_type}_{str(pid)}.gz")
+            return reportrel
         else:
             return ret
 
-    def download_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
+    def download_v2_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
         url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
         resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
         logger.info(f"开始下载报告:{report_id}")
-        with open(file_path, "wb") as file:
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
             for data in resp.iter_content(chunk_size=10 * 1024):
-                file.write(data)
+                f.write(data)
+
         logger.info(f"报告{report_id}下载完成:{file_path}")
         if not decompress:
             return file_path
-        de_file = gz_decompress(file_path)
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
         logger.info(f"解压完成:{de_file}")
+        # print(de_file)
         return de_file
 
 class SDClient(BaseClient):
@@ -405,6 +560,7 @@ class SDClient(BaseClient):
     def get_adGroups(self,**params):
         url_path = '/sd/adGroups'
         return self._request(url_path, params=params)
+
     def iter_adGroups(self,**param):
         if "startIndex" not in param:
             param["startIndex"] = 0
@@ -448,6 +604,7 @@ class SDClient(BaseClient):
                 break
             param["startIndex"] += 5000
             yield info
+
     def get_budget(self, campaignIds: list):
         url_path = "/sd/campaigns/budget/usage"
         body = {"campaignIds": campaignIds}
@@ -497,44 +654,40 @@ if __name__ == '__main__':
     # keyword=["8mp security camera system","8mp security camera system"],
     # matchType=["broad","exact"]))
 
-    sd = SDClient(**AWS_CREDENTIALS)
+    sd = SPClient(**AWS_CREDENTIALS)
 
     # print(sb.get_keyword_bidrecommendation(**{'campaignId': 27333596383941, 'keywords': [
     #     {"matchType": 'broad', "keywordText": "4k security camera system"}]}))
-    a = list(sd.iter_targets(**{"campaignIdFilter":"257424912382921"})) #list(sd.iter_targets())#
-    print(a,len(a))
+    # a = list(sd.iter_targets(**{"campaignIdFilter":"257424912382921"})) #list(sd.iter_targets())#
+    # print(a,len(a))
 
     # sb = SBClient(**AWS_CREDENTIALS)
-    # metrics = [
-    #     'applicableBudgetRuleId',
-    #     'applicableBudgetRuleName',
-    #     'attributedConversions14d',
-    #     'attributedConversions14dSameSKU',
-    #     'attributedDetailPageViewsClicks14d',
-    #     'attributedOrderRateNewToBrand14d',
-    #     'attributedOrdersNewToBrand14d',
-    #     'attributedOrdersNewToBrandPercentage14d',
-    #     'attributedSales14d',
-    #     'attributedSales14dSameSKU',
-    #     'attributedSalesNewToBrand14d',
-    #     'attributedSalesNewToBrandPercentage14d',
-    #     'attributedUnitsOrderedNewToBrand14d',
-    #     'attributedUnitsOrderedNewToBrandPercentage14d',
-    #     'campaignBudget',
-    #     'campaignBudgetType',
-    #     'campaignId',
-    #     'campaignName',
-    #     'campaignRuleBasedBudget',
-    #     'campaignStatus',
-    #     'clicks',
-    #     'cost',
-    #     'dpv14d',
-    #     'impressions',
-    #     'unitsSold14d',
-    #     'attributedBrandedSearches14d',
-    #     'topOfSearchImpressionShare']
-    # sb.get_report(
-    #     record_type="campaigns",
-    #     report_date="20231008",
-    #     metrics=metrics
-    # )
+    metrics = [
+        'applicableBudgetRuleId',
+        'applicableBudgetRuleName',
+        'attributedConversions14d',
+        'attributedConversions14dSameSKU',
+        'attributedDetailPageViewsClicks14d',
+        'attributedOrderRateNewToBrand14d',
+        'attributedOrdersNewToBrand14d',
+        'attributedOrdersNewToBrandPercentage14d',
+        'attributedSales14d',
+        'attributedSales14dSameSKU',
+        'attributedSalesNewToBrand14d',
+        'attributedSalesNewToBrandPercentage14d',
+        'attributedUnitsOrderedNewToBrand14d',
+        'attributedUnitsOrderedNewToBrandPercentage14d',
+        'campaignBudget',
+        'campaignBudgetType',
+        'campaignId',
+        'campaignName',
+        'campaignRuleBasedBudget',
+        'campaignStatus',
+        'clicks',
+        'cost',
+        'dpv14d',
+        'impressions',
+        'unitsSold14d',
+        'attributedBrandedSearches14d',
+        'topOfSearchImpressionShare']
+    print(sd.get_v3_report(groupby=['campaign','campaignPlacement'],columns=['impressions'],startDate="2023-10-01",endDate="2023-10-01",reportType="spCampaigns"))