Kaynağa Gözat

Merge branch 'yifan' of ASJ_ADS/sync_amz_data into master

yifan_huang96 1 yıl önce
ebeveyn
işleme
15d43b93cb

Dosya farkı çok büyük olduğundan ihmal edildi
+ 1135 - 2
sync_amz_data/DataTransform/Data_ETL.py


+ 248 - 42
sync_amz_data/public/amz_ad_client.py

@@ -8,6 +8,8 @@ from urllib.parse import urljoin
 from typing import List, Literal, Iterable, Iterator
 import gzip
 from pathlib import Path
+import s3fs
+from s3fs import S3FileSystem
 
 import logging
 
@@ -56,7 +58,7 @@ class BaseClient:
             status_forcelist=[429, 500, 502, 503, 504],
             raise_on_status=False,  # 在status_forcelist中的状态码达到重试次数后是否抛出异常
             # backoff_factor * (2 ** (retry_time-1)), 即间隔1s, 2s, 4s, 8s, ...
-            backoff_factor=1,
+            backoff_factor=2,
         )
         adapter = HTTPAdapter(max_retries=retry_strategy)
         self.session = requests.session()
@@ -234,7 +236,77 @@ class SPClient(BaseClient):
         body = {"adGroupId": adGroupId,
                 "keywords": keywords}
         return self._request(url_path, method="POST", body=body)
+    def get_v3_report(self,
+                      groupby:list,
+                      columns:list,
+                      startDate:str,
+                      endDate:str,
+                      reportType: Literal['spCampaigns','spAdvertisedProduct' ,'spPurchasedProduct', 'spTargeting', 'spSearchTerm'],
+                      timeUnit="DAILY",
+                      download=True):
+        """
+        @param groupby: 聚合条件,[campaign,adGroup, searchTerm,purchasedAsin,campaignPlacement,targeting,searchTerm,advertiser,asin]
+        columns: 需要获取的字段
+        """
+        url_path = "/reporting/reports"
+        headers = {
+            "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
+            }
+        body = {
+                "name":"SP campaigns report",
+                "startDate":startDate,
+                "endDate":endDate,
+                "configuration":{
+                    "adProduct":"SPONSORED_PRODUCTS",
+                    "groupBy":groupby,
+                    "columns":columns,
+                    "reportTypeId":reportType,
+                    "timeUnit":timeUnit,
+                    "format":"GZIP_JSON"
+                }
+            }
+        ret = self._request(url_path,method="POST",headers=headers,body=body)
+        # print(ret)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status in ["PROCESSING","PENDING"]:
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/reporting/reports/{report_id}")
+            print(ret)
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            pid = self.profile_id
+            reportrel= self.download_v3_report(ret['url'],f"s3://reportforspsbsd/zosi/us/sp/{str(groupby)}_{startDate}_{endDate}_{reportType}_{str(pid)}.json.gz")
+            return reportrel
+        else:
+            return ret
 
+    def download_v3_report(self, url, file_path: str, decompress: bool = True) -> str:
+        resp = requests.get(url, stream=True, allow_redirects=True)
+        print(resp)
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                f.write(data)
+        if not decompress:
+            return file_path
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
+        logger.info(f"解压完成:{de_file}")
+        # print(de_file)
+        return de_file
 
 class SBClient(BaseClient):
     def get_campaigns(self, **body):
@@ -334,7 +406,85 @@ class SBClient(BaseClient):
         url_path = "/sb/recommendations/bids"
         return self._request(url_path, method="POST", body=body)
 
-    def get_report(
+    def get_v3_report(self,
+                      groupby:list,
+                      columns:list,
+                      startDate:str,
+                      endDate:str,
+                      reportType: Literal['sbCampaigns', 'sbPurchasedProduct', 'sbTargeting', 'sbSearchTerm'],
+                      timeUnit="DAILY",
+                      download=True):
+        """
+        Now about reportType is only sbPurchasedProduct available.
+        @param groupby: 聚合条件
+        @param columns: 需要获取的字段[campaign,purchasedAsin,targeting,searchTerm]
+        @param startDate: 请求开始的日期
+        @param endDate: 请求结束的日期
+        @param reportType: 广告类型
+        @param timeUnit: 时间指标-[DAILY, SUMMARY]
+        @param download: 下载报告
+        """
+        url_path = "/reporting/reports"
+        headers = {
+            "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
+            }
+        body = {
+                "name":"SB campaigns report",
+                "startDate":startDate,
+                "endDate":endDate,
+                "configuration":{
+                    "adProduct":"SPONSORED_BRANDS",
+                    "groupBy":groupby,
+                    "columns":columns,
+                    "reportTypeId":reportType,
+                    "timeUnit":timeUnit,
+                    "format":"GZIP_JSON"
+                }
+            }
+        ret = self._request(url_path,method="POST",headers=headers,body=body)
+        # print(ret)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status in ["PROCESSING","PENDING"]:
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/reporting/reports/{report_id}")
+            print(ret)
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            pid = self.profile_id
+            reportrel= self.download_v3_report(ret['url'],f"s3://reportforspsbsd/zosi/us/sb/{startDate}_{endDate}_{reportType}_{str(groupby)}_{str(pid)}.json.gz")
+            return reportrel
+        else:
+            return ret
+
+    def download_v3_report(self, url, file_path: str, decompress: bool = True) -> str:
+        resp = requests.get(url, stream=True, allow_redirects=True)
+        print(resp)
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                f.write(data)
+        if not decompress:
+            return file_path
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
+        logger.info(f"解压完成:{de_file}")
+        # print(de_file)
+        return de_file
+
+    def get_v2_report(
             self,
             record_type: Literal['campaigns', 'adGroups', 'ads', 'targets', 'keywords'],
             report_date: str,
@@ -374,27 +524,39 @@ class SBClient(BaseClient):
             logger.debug(f"报告{report_id}正在处理中...")
             time.sleep(3)
             ret = self._request(f"/v2/reports/{report_id}")
+            print(ret)
             status = ret["status"]
             if status == "FAILURE":
                 raise Exception(ret)
         logger.info(f"报告处理完成:{ret}")
         if download:
-            self.download_report(report_id, str(self.data_path / f"sb_{record_type}.json.gz"))
+            pid = self.profile_id
+            reportrel= self.download_v2_report(report_id, f"s3://reportforspsbsd/zosi/us/sb/{str(report_date)}_{record_type}_{creative_type}_{segment}_{str(pid)}.gz")
+            return reportrel
         else:
             return ret
 
-    def download_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
+    def download_v2_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
         url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
         resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
         logger.info(f"开始下载报告:{report_id}")
-        with open(file_path, "wb") as file:
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
             for data in resp.iter_content(chunk_size=10 * 1024):
-                file.write(data)
+                f.write(data)
+
         logger.info(f"报告{report_id}下载完成:{file_path}")
         if not decompress:
             return file_path
-        de_file = gz_decompress(file_path)
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
         logger.info(f"解压完成:{de_file}")
+        # print(de_file)
         return de_file
 
 class SDClient(BaseClient):
@@ -405,6 +567,7 @@ class SDClient(BaseClient):
     def get_adGroups(self,**params):
         url_path = '/sd/adGroups'
         return self._request(url_path, params=params)
+
     def iter_adGroups(self,**param):
         if "startIndex" not in param:
             param["startIndex"] = 0
@@ -448,6 +611,7 @@ class SDClient(BaseClient):
                 break
             param["startIndex"] += 5000
             yield info
+
     def get_budget(self, campaignIds: list):
         url_path = "/sd/campaigns/budget/usage"
         body = {"campaignIds": campaignIds}
@@ -472,6 +636,79 @@ class SDClient(BaseClient):
 
         return self._request(url_path, method="POST", headers=headers,body=body,params={"locale":locale})
 
+    def get_v2_report(
+            self,
+            record_type: Literal['campaigns', 'adGroups', 'productAds', 'targets', 'asins'],
+            report_date: str,
+            metrics: List[str],
+            segment: Literal['matchedTarget'] = None,
+            tactic: Literal['T00020', 'T00030'] = None,
+            download: bool = True
+    ):
+        """
+        @param download: 是否下载文件
+        @param record_type:
+        @param report_date: 格式为YYYYMMDD,以请求的卖家市场所对应的时区为准,超过60天的报告不可用
+        @param metrics:
+        @param segment:
+        @param tactic:
+            T00020: contextual targeting
+            T00030: audience targeting
+        @return:
+        """
+        url = f"/sd/{record_type}/report"
+        body = {
+            "reportDate": report_date,
+            "metrics": ",".join(metrics),
+            "tactic": tactic,
+            "segment": segment
+        }
+        ret = self._request(url, method="POST", body=body)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        print(ret)
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status == "IN_PROGRESS":
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/v2/reports/{report_id}")
+            print(ret)
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            pid = self.profile_id
+            reportrel= self.download_v2_report(report_id, f"s3://reportforspsbsd/zosi/us/sd/{str(report_date)}_{record_type}_{tactic}_{segment}_{str(pid)}.gz")
+            return reportrel
+        else:
+            return ret
+
+    def download_v2_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
+        url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
+        resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
+        logger.info(f"开始下载报告:{report_id}")
+        kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                  'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                  'aws_secret_access_key': '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'}
+        s3_ = S3FileSystem(client_kwargs=kwargs)
+        # print()
+        with s3_.open(file_path, 'wb') as f:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                f.write(data)
+
+        logger.info(f"报告{report_id}下载完成:{file_path}")
+        if not decompress:
+            return file_path
+        with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+            data = gzip.GzipFile(fileobj=f, mode='rb')
+            de_file = json.load(data)
+        logger.info(f"解压完成:{de_file}")
+        # print(de_file)
+        return de_file
+
 class Account(BaseClient):
     def get_portfolios(self):
         url_path = "/v2/portfolios/extended"
@@ -501,40 +738,9 @@ if __name__ == '__main__':
 
     # print(sb.get_keyword_bidrecommendation(**{'campaignId': 27333596383941, 'keywords': [
     #     {"matchType": 'broad', "keywordText": "4k security camera system"}]}))
-    a = list(sd.iter_targets(**{"campaignIdFilter":"257424912382921"})) #list(sd.iter_targets())#
-    print(a,len(a))
+    # a = list(sd.iter_targets(**{"campaignIdFilter":"257424912382921"})) #list(sd.iter_targets())#
+    # print(a,len(a))
 
     # sb = SBClient(**AWS_CREDENTIALS)
-    # metrics = [
-    #     'applicableBudgetRuleId',
-    #     'applicableBudgetRuleName',
-    #     'attributedConversions14d',
-    #     'attributedConversions14dSameSKU',
-    #     'attributedDetailPageViewsClicks14d',
-    #     'attributedOrderRateNewToBrand14d',
-    #     'attributedOrdersNewToBrand14d',
-    #     'attributedOrdersNewToBrandPercentage14d',
-    #     'attributedSales14d',
-    #     'attributedSales14dSameSKU',
-    #     'attributedSalesNewToBrand14d',
-    #     'attributedSalesNewToBrandPercentage14d',
-    #     'attributedUnitsOrderedNewToBrand14d',
-    #     'attributedUnitsOrderedNewToBrandPercentage14d',
-    #     'campaignBudget',
-    #     'campaignBudgetType',
-    #     'campaignId',
-    #     'campaignName',
-    #     'campaignRuleBasedBudget',
-    #     'campaignStatus',
-    #     'clicks',
-    #     'cost',
-    #     'dpv14d',
-    #     'impressions',
-    #     'unitsSold14d',
-    #     'attributedBrandedSearches14d',
-    #     'topOfSearchImpressionShare']
-    # sb.get_report(
-    #     record_type="campaigns",
-    #     report_date="20231008",
-    #     metrics=metrics
-    # )
+
+    print(sd.get_v2_report(record_type="campaigns",report_date="20231020",tactic="T00030",metrics=['impressions']))

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor