Ver código fonte

add portfolios

guojing_wu 1 ano atrás
pai
commit
91577d424d

+ 1 - 0
.gitignore

@@ -8,6 +8,7 @@ __pycache__/
 
 # C extensions
 *.so
+Pipfile.lock
 
 # Distribution / packaging
 .Python

+ 3 - 3
Pipfile

@@ -4,9 +4,9 @@ verify_ssl = true
 name = "pypi.tuna.tsinghua.edu.cn"
 
 [packages]
-retry = "*"
-requests = "*"
-cachetools = "*"
+retry = "==0.9.2"
+requests = "==2.31.0"
+cachetools = "==5.3.1"
 
 [dev-packages]
 

+ 9 - 0
start_sync_amz.py

@@ -0,0 +1,9 @@
+from sync_amz_data.tasks.account import AccountTask
+from sync_amz_data.settings import LOG_CONF
+
+import logging.config
+
+logging.config.dictConfig(LOG_CONF)
+
+if __name__ == '__main__':
+    AccountTask("3006125408623189").do({"record": "portfolios"})

+ 2 - 0
sync_amz_data/public/__init__.py

@@ -0,0 +1,2 @@
+from .amz_ad_client import SPClient, SBClient, SDClient, BaseClient, AccountClient
+from .asj_client import asj_api

+ 106 - 63
sync_amz_data/public/amz_ad_client.py

@@ -1,4 +1,6 @@
 import requests
+from urllib3.util.retry import Retry
+from requests.adapters import HTTPAdapter
 import json
 import time
 from cachetools import TTLCache
@@ -6,14 +8,20 @@ from urllib.parse import urljoin
 from typing import List, Literal, Iterable, Iterator
 import gzip
 from pathlib import Path
-from logging import getLogger
+
+import logging
 
 URL_AUTH = "https://api.amazon.com/auth/o2/token"
 URL_AD_API = "https://advertising-api.amazon.com"
 
 cache = TTLCache(maxsize=10, ttl=3200)
 
-logger = getLogger(__name__)
+logger = logging.getLogger(__name__)
+
+
+class RateLimitError(Exception):
+    def __init__(self, retry_after: str = None):
+        self.retry_after = retry_after
 
 
 def gz_decompress(file_path: str, chunk_size: int = 1024 * 1024):
@@ -41,6 +49,19 @@ class BaseClient:
         if not self.data_path.exists():
             self.data_path.mkdir(parents=True)
 
+        retry_strategy = Retry(
+            total=5,  # 重试次数
+            allowed_methods=["GET", "POST"],
+            # 强制重试的状态码,在method_whitelist中的请求方法才会重试
+            status_forcelist=[429, 500, 502, 503, 504],
+            raise_on_status=False,  # 在status_forcelist中的状态码达到重试次数后是否抛出异常
+            # backoff_factor * (2 ** (retry_time-1)), 即间隔1s, 2s, 4s, 8s, ...
+            backoff_factor=1,
+        )
+        adapter = HTTPAdapter(max_retries=retry_strategy)
+        self.session = requests.session()
+        self.session.mount("https://", adapter)
+
     @property
     def access_token(self) -> str:
         try:
@@ -72,15 +93,19 @@ class BaseClient:
         head = self.auth_headers
         if headers:
             head.update(headers)
-        resp = requests.request(
+        resp = self.session.request(
             method=method,
             url=urljoin(URL_AD_API, url_path),
             headers=head,
             params=params,
             json=body,
         )
-        js = resp.json()
-        return js
+        if resp.status_code == 429:
+            raise RateLimitError(resp.headers.get("Retry-After"))
+        if resp.status_code >= 400:
+            raise Exception(resp.text)
+
+        return resp.json()
 
 
 class SPClient(BaseClient):
@@ -149,6 +174,7 @@ class SPClient(BaseClient):
             "Content-Type": "application/vnd.spKeyword.v3+json"
         }
         return self._request(url_path, method="POST", body=body, headers=headers)
+
     def iter_keywords(self, **body) -> Iterator[dict]:
         if "maxResults" not in body:
             body["maxResults"] = 100
@@ -186,26 +212,30 @@ class SPClient(BaseClient):
         }
         return self._request(url_path, method="POST", body=body)
 
-    def get_adgroup_bidrecommendation(self,campaignId:str,adGroupId:str,targetingExpressions:list,recommendationType:str="BIDS_FOR_EXISTING_AD_GROUP"):
+    def get_adgroup_bidrecommendation(
+            self, campaignId: str, adGroupId: str, targetingExpressions: list,
+            recommendationType: str = "BIDS_FOR_EXISTING_AD_GROUP"):
         url_path = "/sp/targets/bid/recommendations"
         headers = {
             "Accept": "application/vnd.spthemebasedbidrecommendation.v3+json",
             "Content-Type": "application/vnd.spthemebasedbidrecommendation.v3+json"
         }
-        body = {"campaignId":campaignId,
-                "adGroupId":adGroupId,
-                "recommendationType":recommendationType,
-                "targetingExpressions":targetingExpressions}
+        body = {
+            "campaignId": campaignId,
+            "adGroupId": adGroupId,
+            "recommendationType": recommendationType,
+            "targetingExpressions": targetingExpressions
+        }
         return self._request(url_path, method="POST", body=body, headers=headers)
 
-    def get_keyword_bidrecommendation(self,adGroupId:str,keyword:list,matchType:list):
-        keywords = list(map(lambda x:{"keyword":x[0],"matchType":x[1]},list(zip(keyword,matchType))))
-        print(keywords)
+    def get_keyword_bidrecommendation(self, adGroupId: str, keyword: list, matchType: list):
+        keywords = list(map(lambda x: {"keyword": x[0], "matchType": x[1]}, list(zip(keyword, matchType))))
         url_path = "/v2/sp/keywords/bidRecommendations"
-        body = {"adGroupId":adGroupId,
-                "keywords":keywords}
+        body = {"adGroupId": adGroupId,
+                "keywords": keywords}
         return self._request(url_path, method="POST", body=body)
 
+
 class SBClient(BaseClient):
     def get_campaigns(self, **body):
         url_path = "/sb/v4/campaigns/list"
@@ -226,33 +256,34 @@ class SBClient(BaseClient):
             body["nextToken"] = info["nextToken"]
         # logger.info(f"总共数量:{info['totalResults']}")
 
-    def get_ad_groups(self,**body):
+    def get_ad_groups(self, **body):
         url_path = "/sb/v4/adGroups/list"
         headers = {
             'Content-Type': "application/vnd.sbadgroupresource.v4+json",
             'Accept': "application/vnd.sbadgroupresource.v4+json"
         }
-        return self._request(url_path, method="POST", headers=headers,body=body)
+        return self._request(url_path, method="POST", headers=headers, body=body)
 
     def iter_adGroups(self, **body) -> Iterator[dict]:
         if "maxResults" not in body:
             body["maxResults"] = 100
         while True:
             info: dict = self.get_ad_groups(**body)
-            print(info)
+            # print(info)
             yield from info["adGroups"]
             if not info.get("nextToken"):
                 break
             body["nextToken"] = info["nextToken"]
 
-    def get_ads(self,**body):
+    def get_ads(self, **body):
         url_path = "/sb/v4/ads/list"
-        headers = {'Content-Type': "application/vnd.sbadresource.v4+json",
-                  'Accept': "application/vnd.sbadresource.v4+json"
-                  }
-        return self._request(url_path, method="POST", headers=headers,body=body)
+        headers = {
+            'Content-Type': "application/vnd.sbadresource.v4+json",
+            'Accept': "application/vnd.sbadresource.v4+json"
+        }
+        return self._request(url_path, method="POST", headers=headers, body=body)
 
-    def iter_ads(self,**body):
+    def iter_ads(self, **body):
         if "maxResults" not in body:
             body["maxResults"] = 100
         while True:
@@ -262,15 +293,16 @@ class SBClient(BaseClient):
             if not info.get("nextToken"):
                 break
             body["nextToken"] = info["nextToken"]
+
     def get_keywords(self):
         url_path = "/sb/keywords"
         return self._request(url_path, method="GET")
 
-    def get_targets(self,**body):
+    def get_targets(self, **body):
         url_path = "/sb/targets/list"
         return self._request(url_path, method="POST", body=body)
 
-    def iter_targets(self,**body):
+    def iter_targets(self, **body):
         if "maxResults" not in body:
             body["maxResults"] = 100
         while True:
@@ -281,11 +313,12 @@ class SBClient(BaseClient):
                 break
             body["nextToken"] = info["nextToken"]
 
-    def get_budget(self,campaignIds:list):
+    def get_budget(self, campaignIds: list):
         url_path = "/sb/campaigns/budget/usage"
-        body = {"campaignIds":campaignIds}
-        return self._request(url_path, method="POST",body=body)
-    def get_keyword_bidrecommendation(self,**body):
+        body = {"campaignIds": campaignIds}
+        return self._request(url_path, method="POST", body=body)
+
+    def get_keyword_bidrecommendation(self, **body):
         url_path = "/sb/recommendations/bids"
         return self._request(url_path, method="POST", body=body)
 
@@ -340,7 +373,7 @@ class SBClient(BaseClient):
 
     def download_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
         url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
-        resp = requests.get(url, headers=self.auth_headers, stream=True)
+        resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
         logger.info(f"开始下载报告:{report_id}")
         with open(file_path, "wb") as file:
             for data in resp.iter_content(chunk_size=10 * 1024):
@@ -358,11 +391,17 @@ class SDClient(BaseClient):
         url_path = "/sd/campaigns"
         return self._request(url_path, params=params)
 
+
 class Account(BaseClient):
-    def get_portfolio(self):
+    def get_portfolios(self):
         url_path = "/v2/portfolios/extended"
         return self._request(url_path)
 
+    def iter_portfolios(self):
+        yield from self.get_portfolios()
+
+
+AccountClient = Account
 
 if __name__ == '__main__':
     AWS_CREDENTIALS = {
@@ -372,43 +411,47 @@ if __name__ == '__main__':
         'profile_id': "3006125408623189"
     }
     # sp = SPClient(**AWS_CREDENTIALS)
-    # print(sp.get_keyword_bidrecommendation(adGroupId="119753215871672",keyword=["8mp security camera system","8mp security camera system"],matchType=["broad","exact"]))
+    # print(sp.get_keyword_bidrecommendation(
+    # adGroupId="119753215871672",
+    # keyword=["8mp security camera system","8mp security camera system"],
+    # matchType=["broad","exact"]))
     sb = SBClient(**AWS_CREDENTIALS)
     # print(list(sb.iter_targets()))
-    print(sb.get_keyword_bidrecommendation(**{'campaignId': 27333596383941,'keywords':[{"matchType":'broad',"keywordText":"4k security camera system"}]}))
+    print(sb.get_keyword_bidrecommendation(**{'campaignId': 27333596383941, 'keywords': [
+        {"matchType": 'broad', "keywordText": "4k security camera system"}]}))
     print(sb.get_budget([27333596383941]))
     # sd = SDClient(**AWS_CREDENTIALS)
     # print(sd.get_campaigns(startIndex=10, count=10))
 
-    # sb = SBClient(**AWS_CREDENTIALS)
-    # metrics = [
-    #     'applicableBudgetRuleId',
-    #     'applicableBudgetRuleName',
-    #     'attributedConversions14d',
-    #     'attributedConversions14dSameSKU',
-    #     'attributedDetailPageViewsClicks14d',
-    #     'attributedOrderRateNewToBrand14d',
-    #     'attributedOrdersNewToBrand14d',
-    #     'attributedOrdersNewToBrandPercentage14d',
-    #     'attributedSales14d',
-    #     'attributedSales14dSameSKU',
-    #     'attributedSalesNewToBrand14d',
-    #     'attributedSalesNewToBrandPercentage14d',
-    #     'attributedUnitsOrderedNewToBrand14d',
-    #     'attributedUnitsOrderedNewToBrandPercentage14d',
-    #     'campaignBudget',
-    #     'campaignBudgetType',
-    #     'campaignId',
-    #     'campaignName',
-    #     'campaignRuleBasedBudget',
-    #     'campaignStatus',
-    #     'clicks',
-    #     'cost',
-    #     'dpv14d',
-    #     'impressions',
-    #     'unitsSold14d',
-    #     'attributedBrandedSearches14d',
-    #     'topOfSearchImpressionShare']
+    sb = SBClient(**AWS_CREDENTIALS)
+    metrics = [
+        'applicableBudgetRuleId',
+        'applicableBudgetRuleName',
+        'attributedConversions14d',
+        'attributedConversions14dSameSKU',
+        'attributedDetailPageViewsClicks14d',
+        'attributedOrderRateNewToBrand14d',
+        'attributedOrdersNewToBrand14d',
+        'attributedOrdersNewToBrandPercentage14d',
+        'attributedSales14d',
+        'attributedSales14dSameSKU',
+        'attributedSalesNewToBrand14d',
+        'attributedSalesNewToBrandPercentage14d',
+        'attributedUnitsOrderedNewToBrand14d',
+        'attributedUnitsOrderedNewToBrandPercentage14d',
+        'campaignBudget',
+        'campaignBudgetType',
+        'campaignId',
+        'campaignName',
+        'campaignRuleBasedBudget',
+        'campaignStatus',
+        'clicks',
+        'cost',
+        'dpv14d',
+        'impressions',
+        'unitsSold14d',
+        'attributedBrandedSearches14d',
+        'topOfSearchImpressionShare']
     # sb.get_report(
     #     record_type="campaigns",
     #     report_date="20231008",

+ 42 - 0
sync_amz_data/public/asj_client.py

@@ -0,0 +1,42 @@
+import requests
+from urllib3.util.retry import Retry
+from requests.adapters import HTTPAdapter
+import logging
+from typing import Union, List
+
+logger = logging.getLogger(__name__)
+
+
+class AsjHttpClient:
+    """广告后端API客户端"""
+
+    def __init__(self):
+        retry_strategy = Retry(
+            total=3,  # 重试次数
+            # 强制重试的状态码,在method_whitelist中的请求方法才会重试
+            method_whitelist=["GET", "POST"],
+            status_forcelist=[429, 500, 502, 503, 504],
+            raise_on_status=False,  # 在status_forcelist中的状态码达到重试次数后是否抛出异常
+            # backoff_factor * (2 ** (retry_time-1)), 即间隔1s, 2s, 4s, 8s, ...
+            backoff_factor=1,
+        )
+        adapter = HTTPAdapter(max_retries=retry_strategy)
+        self.session = requests.session()
+        self.session.mount("http://", adapter)
+
+    def _request(self, method, url, *, params=None, body=None):
+        headers = {"X-Token": ""}
+        resp = self.session.request(method, url, params=params, headers=headers, json=body)
+        if resp.status_code >= 400:
+            raise Exception(resp.text)
+        js = resp.json()
+        if js.get("code") != 2000:
+            raise Exception(js)
+
+        return js
+
+    def create(self, url: str, data: [dict, List[dict]]):
+        return self._request("POST", url, body=data)
+
+
+asj_api = AsjHttpClient()

+ 7 - 2
sync_amz_data/settings/__init__.py

@@ -1,4 +1,5 @@
 import os
+import logging
 
 env = os.getenv("ANS_ENV")
 if not env:
@@ -22,7 +23,7 @@ REDIS_CONF = {
     "db": 0
 }
 
-AWS_CREDENTIALS = {
+AWS_LWA_CLIENT = {
     'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
     'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
 }
@@ -50,12 +51,16 @@ LOG_CONF = {
             'toaddrs': [
                 '2607929016@qq.com',
             ],
-            'subject': 'sync_amz_ad',
+            'subject': 'sync_amz_data',
             'credentials': ('AKIARBAGHTGOTVILWLB7', 'BEVt2GkzivI0xhnLWgwJCT2k7lDSg8HW+APD3mc4uHVg'),
             'secure': {}
         }
     },
     "loggers": {
+        "": {
+            "handlers": LOG_HANDLERS,
+            "level": "DEBUG",
+        },
         "sync_amz_ad": {
             "handlers": LOG_HANDLERS,
             "level": LOG_LEVEL,

+ 1 - 1
sync_amz_data/settings/dev.py

@@ -9,7 +9,7 @@ DB_CONF = {
     "database": "amzn_retail_ad",
 }
 
-LOG_LEVEL = "DEBUG"
+LOG_LEVEL = 10
 LOG_HANDLERS = ["console"]
 
 DATA_PATH = "/tmp"

+ 2 - 0
sync_amz_data/tasks/__init__.py

@@ -0,0 +1,2 @@
+from .sb import SBTask
+

+ 66 - 0
sync_amz_data/tasks/_base.py

@@ -0,0 +1,66 @@
+from functools import lru_cache
+import logging
+from typing import List
+from urllib.parse import urljoin
+
+from sync_amz_data.public import BaseClient, asj_api
+from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
+
+logger = logging.getLogger(__name__)
+ASJ_URL_BASE = "http://127.0.0.1:8000/api/ad_manage/"
+
+
+@lru_cache(maxsize=100)
+def query_shop_info(profile_id: str) -> dict:
+    # todo 临时
+    if profile_id == "3006125408623189":
+        return {
+            "profile_id": profile_id,
+            "id": 1,
+            "shop_name": "ZosiDirect",
+            "region": "NA",
+            "access_token": "Atza|IwEBION12cAeJLW1CrhirYbH8ianuTfh1JQtC2nbnZjWcCk_J0v9XVF00Pm7AYzZPHXAuFgdul0vuQt4XUAcwVdzQF9AzDfk5wKAUXe9fuGZhb0nq0mdJxg08u2BR_rUggxKWSd1sg6OW7szIEq8xQzok9hcr-Ai-WupyaA-CCznOt7STmyEZltNsK8VuJb7ySxTlxwf-DbuX2Tn9JdsEta7DwQIsHcYv2QwsfLYGnk2LcLUUXG-6TtwCWFGHMxHfoScOvN92hOHiPl3CsdTs5RmKO5eVFdf0XUu8OU5Z9icnjuP1tYBy7_e9s2oTL9fVLLfH_ATUJplPsfm1MhtW6ioX9IXxfJSmOJ0ntEv45ndb9t-wHE_vLukuy_4jwmy_50NU_TOU_9pbFJLQhVyAB0f4HVcb5fgPn--feAX89ANhRJnIn5zibVk_rY_rte7Xu7JMJNNW41PoCUvfVdxjnorhGTIGh2u2JHQPqdww2xLcZ93SQ",
+            "status": 1,
+            "refresh_token": "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
+            "update_time": 1688351991,
+            "token_expires_time": 1688355491,
+            "create_time": 1683702488
+        }
+    return {}
+
+
+class BaseTask:
+    AmzAdClientClass = BaseClient
+
+    def __init__(self, profile_id: str):
+        self.shop_info = query_shop_info(profile_id)
+        self.ad_cil = self.AmzAdClientClass(
+            profile_id=profile_id,
+            refresh_token=self.shop_info["refresh_token"],
+            data_path=DATA_PATH, **AWS_LWA_CLIENT)
+
+    def do(self, task_info: dict):
+        record = task_info["record"]
+        iter_records = getattr(self.ad_cil, f"iter_{record}", None)
+        change_func = getattr(self, f"change_{record}", None)
+        ad_api_params = task_info.get("params")
+        if ad_api_params is None:
+            records_iterator = iter_records()
+        else:
+            records_iterator = iter_records(ad_api_params)
+        for data in records_iterator:
+            if change_func:
+                data = change_func(data)
+            self.to_mysql(record, data)
+            logger.info(data)
+
+    def batch_do(self, task_info: dict):
+        """
+        适用于使用pandas等进行批处理
+        @param task_info:
+        @return:
+        """
+        pass
+
+    def to_mysql(self, record: str, data: [dict, List[dict]]):
+        asj_api.create(urljoin(ASJ_URL_BASE, record), data)

+ 19 - 0
sync_amz_data/tasks/account.py

@@ -0,0 +1,19 @@
+from ._base import BaseTask
+from sync_amz_data.public import AccountClient
+from sync_amz_data.tools import timestamp2utc_dt
+
+
+class AccountTask(BaseTask):
+    AmzAdClientClass = AccountClient
+
+    def change_portfolios(self, data: dict):
+        data["shop"] = self.shop_info["profile_id"]
+        for key in ["lastUpdatedDate", "creationDate"]:
+            if key in data:
+                data[key] = timestamp2utc_dt(data[key])
+
+        budget: dict = data.pop("budget", None)
+        if budget:
+            for key, val in budget.items():
+                data[f"budget_{key}"] = val
+        return data

+ 16 - 0
sync_amz_data/tasks/sb.py

@@ -0,0 +1,16 @@
+from sync_amz_data.public import SBClient
+from ._base import BaseTask
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class SBTask(BaseTask):
+    AmzAdClientClass = SBClient
+
+    def change_campaign(self, data: dict):
+        return data
+
+    def change_ad_group(self, data: dict):
+        return data

+ 14 - 0
sync_amz_data/tasks/sd.py

@@ -0,0 +1,14 @@
+from sync_amz_data.public import SDClient
+from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
+from ._base import query_shop_info, BaseTask
+
+import logging
+from pydantic import BaseModel
+from typing import Literal, List, Dict
+from datetime import date
+
+logger = logging.getLogger(__name__)
+
+
+class SDTask(BaseTask):
+    AmzAdClientClass = SDClient

+ 14 - 0
sync_amz_data/tasks/sp.py

@@ -0,0 +1,14 @@
+from sync_amz_data.public import SPClient
+
+
+class SpTask:
+    def __init__(self):
+        pass
+
+    def consume(self, data: dict):
+        pass
+
+    def work(self, task_info: dict):
+        sp_cli = SPClient(**task_info)
+        for data in sp_cli.iter_campaigns():
+            self.consume(data)

+ 15 - 0
sync_amz_data/tools.py

@@ -0,0 +1,15 @@
+# from dateutil.parser import isoparse
+from datetime import date, datetime, timezone
+
+
+def timestamp2utc_dt(timestamp: int, _format: str = "%Y-%m-%d %H:%M:%S") -> [str, datetime]:
+    if len(str(timestamp)) == 13:  # _ms
+        timestamp = int(timestamp / 1000)
+    utc_dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
+    if _format:
+        return utc_dt.strftime(_format)
+    return utc_dt
+
+
+if __name__ == '__main__':
+    print(timestamp2utc_dt(1574232571377))