guojing_wu 1 жил өмнө
parent
commit
c06bd76783

+ 1 - 0
.gitignore

@@ -8,6 +8,7 @@ __pycache__/
 
 # C extensions
 *.so
+Pipfile.lock
 
 # Distribution / packaging
 .Python

+ 4 - 3
Pipfile

@@ -4,9 +4,10 @@ verify_ssl = true
 name = "pypi.tuna.tsinghua.edu.cn"
 
 [packages]
-retry = "*"
-requests = "*"
-cachetools = "*"
+retry = "==0.9.2"
+requests = "==2.31.0"
+cachetools = "==5.3.1"
+nb-log = "==10.2"
 
 [dev-packages]
 

+ 38 - 0
start_sync_amz_data.py

@@ -0,0 +1,38 @@
+from sync_amz_data.settings import LOG_CONF, REDIS_CONF
+from sync_amz_data.public import SPClient
+from sync_amz_data.tasks import *
+
+import logging.config
+import dramatiq
+from dramatiq import actor
+from dramatiq.brokers.redis import RedisBroker
+
+dramatiq.set_broker(RedisBroker(**REDIS_CONF))
+
+
+# @actor(queue_name="sp")
+# def sp_task():
+#     pass
+
+
+@actor(queue_name="sb")
+def sb(**task_info):
+    SBTask(task_info["profile_id"]).do(task_info)
+
+
+# @actor(queue_name="sd")
+# def sd(**data):
+#     pass
+
+
+if __name__ == '__main__':
+    logging.config.dictConfig(LOG_CONF)
+
+    AWS_CREDENTIALS = {
+        'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
+        'refresh_token': "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
+        'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
+        'profile_id': "3006125408623189"
+    }
+    sp = SPClient(**AWS_CREDENTIALS)
+    sp.get_campaigns(count=10)

+ 1 - 0
sync_amz_data/public/__init__.py

@@ -0,0 +1 @@
+from .amz_ad_client import SPClient, SBClient, SDClient, BaseClient

+ 29 - 8
sync_amz_data/public/amz_ad_client.py

@@ -6,14 +6,19 @@ from urllib.parse import urljoin
 from typing import List, Literal, Iterable, Iterator
 import gzip
 from pathlib import Path
-from nb_log import get_logger
+import logging
 
 URL_AUTH = "https://api.amazon.com/auth/o2/token"
 URL_AD_API = "https://advertising-api.amazon.com"
 
 cache = TTLCache(maxsize=10, ttl=3200)
 
-logger = get_logger(__name__)
+logger = logging.getLogger(__name__)
+
+
+class RateLimitError(Exception):
+    def __init__(self, retry_after: str = None):
+        self.retry_after = retry_after
 
 
 def gz_decompress(file_path: str, chunk_size: int = 1024 * 1024):
@@ -79,7 +84,11 @@ class BaseClient:
             params=params,
             json=body,
         )
+        if 400 <= resp.status_code <= 500:
+            raise Exception(resp.text)
         js = resp.json()
+        if resp.status_code == 429:
+            raise RateLimitError(resp.headers.get("Retry-After"))
         return js
 
 
@@ -93,6 +102,17 @@ class SPClient(BaseClient):
         }
         return self._request(url_path, method="POST", headers=headers, body=body)
 
+    def iter_campaigns(self, **body) -> Iterator[dict]:
+        if "maxResults" not in body:
+            body["maxResults"] = 100
+        while True:
+            info: dict = self.get_campaigns(**body)
+            yield from info["campaigns"]
+            if not info.get("nextToken"):
+                break
+            body["nextToken"] = info["nextToken"]
+        logger.info(f"总共数量:{info['totalResults']}")
+
     def get_ad_groups(self, **body):
         url_path = "/sp/adGroups/list"
         headers = {
@@ -208,7 +228,7 @@ class SBClient(BaseClient):
 
     def download_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
         url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
-        resp = requests.get(url, headers=self.auth_headers, stream=True)
+        resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
         logger.info(f"开始下载报告:{report_id}")
         with open(file_path, "wb") as file:
             for data in resp.iter_content(chunk_size=10 * 1024):
@@ -266,8 +286,9 @@ if __name__ == '__main__':
         'unitsSold14d',
         'attributedBrandedSearches14d',
         'topOfSearchImpressionShare']
-    sb.get_report(
-        record_type="campaigns",
-        report_date="20231008",
-        metrics=metrics
-    )
+    # sb.get_report(
+    #     record_type="campaigns",
+    #     report_date="20231008",
+    #     metrics=metrics
+    # )
+    sb.iter_ad_groups()

+ 7 - 2
sync_amz_data/settings/__init__.py

@@ -1,4 +1,5 @@
 import os
+import logging
 
 env = os.getenv("ANS_ENV")
 if not env:
@@ -22,7 +23,7 @@ REDIS_CONF = {
     "db": 0
 }
 
-AWS_CREDENTIALS = {
+AWS_LWA_CLIENT = {
     'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
     'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
 }
@@ -50,12 +51,16 @@ LOG_CONF = {
             'toaddrs': [
                 '2607929016@qq.com',
             ],
-            'subject': 'sync_amz_ad',
+            'subject': 'sync_amz_data',
             'credentials': ('AKIARBAGHTGOTVILWLB7', 'BEVt2GkzivI0xhnLWgwJCT2k7lDSg8HW+APD3mc4uHVg'),
             'secure': {}
         }
     },
     "loggers": {
+        "": {
+            "handlers": LOG_HANDLERS,
+            "level": "DEBUG",
+        },
         "sync_amz_ad": {
             "handlers": LOG_HANDLERS,
             "level": LOG_LEVEL,

+ 1 - 1
sync_amz_data/settings/dev.py

@@ -9,7 +9,7 @@ DB_CONF = {
     "database": "amzn_retail_ad",
 }
 
-LOG_LEVEL = "DEBUG"
+LOG_LEVEL = 10
 LOG_HANDLERS = ["console"]
 
 DATA_PATH = "/tmp"

+ 2 - 0
sync_amz_data/tasks/__init__.py

@@ -0,0 +1,2 @@
+from .sb import SBTask
+

+ 36 - 0
sync_amz_data/tasks/_base.py

@@ -0,0 +1,36 @@
+from functools import lru_cache
+import logging
+
+from sync_amz_data.public import BaseClient
+from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
+
+logger = logging.getLogger(__name__)
+
+
+@lru_cache(maxsize=100)
+def query_shop_info(profile_id: str) -> dict:
+    return {}
+
+
+class BaseTask:
+    AmzAdClientClass = BaseClient
+
+    def __init__(self, profile_id: str):
+        self.shop_info = query_shop_info(profile_id)
+        self.ad_cil = self.AmzAdClientClass(
+            profile_id=profile_id,
+            refresh_token=self.shop_info["refresh_token"],
+            data_path=DATA_PATH, **AWS_LWA_CLIENT)
+
+    def do(self, task_info: dict):
+        record = task_info["record"]
+        iter_records = getattr(self.ad_cil, f"iter_{record}")
+        change_func = getattr(self, f"change_{record}", None)
+        for data in iter_records(task_info["params"]):
+            if change_func:
+                data = change_func(self, data)
+            self.to_mysql(record, data)
+            logger.info(data)
+
+    def to_mysql(self, record: str, data: dict):
+        pass

+ 41 - 0
sync_amz_data/tasks/sb.py

@@ -0,0 +1,41 @@
+from sync_amz_data.public import SBClient
+from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
+from ._base import query_shop_info, BaseTask
+
+import logging
+from pydantic import BaseModel
+from typing import Literal, List, Dict
+from datetime import date
+
+logger = logging.getLogger(__name__)
+
+
+class SBCampaign(BaseModel):
+    portfolioId: str = None
+    campaignId: str
+    name: str
+    budgetType: Literal['DAILY', 'LIFETIME']
+    state: Literal['ENABLED', 'PAUSED', 'ARCHIVED']
+    budget: float
+    ruleBasedBudget: dict = None
+    brandEntityId: str = None
+    isMultiAdGroupsEnabled: bool = None
+    goal: str = None
+    bidding: dict = None
+    startDate: date = None
+    endDate: date = None
+    productLocation: str = None
+    tags: dict = None
+    costType: str = None
+    smartDefault: List[str] = None
+    extendedData: dict = None
+
+
+class SBTask(BaseTask):
+    AmzAdClientClass = SBClient
+
+    def change_campaign(self, data: dict):
+        return data
+
+    def change_ad_group(self, data: dict):
+        return data

+ 14 - 0
sync_amz_data/tasks/sd.py

@@ -0,0 +1,14 @@
+from sync_amz_data.public import SDClient
+from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
+from ._base import query_shop_info, BaseTask
+
+import logging
+from pydantic import BaseModel
+from typing import Literal, List, Dict
+from datetime import date
+
+logger = logging.getLogger(__name__)
+
+
+class SDTask(BaseTask):
+    AmzAdClientClass = SDClient

+ 14 - 0
sync_amz_data/tasks/sp.py

@@ -0,0 +1,14 @@
+from sync_amz_data.public import SPClient
+
+
+class SpTask:
+    def __init__(self):
+        pass
+
+    def consume(self, data: dict):
+        pass
+
+    def work(self, task_info: dict):
+        sp_cli = SPClient(**task_info)
+        for data in sp_cli.iter_campaigns():
+            self.consume(data)