Ver código fonte

first commit

guojing_wu 1 ano atrás
commit
dc0c593d22

+ 62 - 0
.gitignore

@@ -0,0 +1,62 @@
+.idea/
+.vscode/
+# ---> Python
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+

+ 14 - 0
Pipfile

@@ -0,0 +1,14 @@
+[[source]]
+url = "https://pypi.tuna.tsinghua.edu.cn/simple"
+verify_ssl = true
+name = "pypi.tuna.tsinghua.edu.cn"
+
+[packages]
+retry = "*"
+requests = "*"
+cachetools = "*"
+
+[dev-packages]
+
+[requires]
+python_version = "3.10"

+ 0 - 0
sync_amz_data/__init__.py


+ 0 - 0
sync_amz_data/public/__init__.py


+ 273 - 0
sync_amz_data/public/amz_ad_client.py

@@ -0,0 +1,273 @@
+import requests
+import json
+import time
+from cachetools import TTLCache
+from urllib.parse import urljoin
+from typing import List, Literal, Iterable, Iterator
+import gzip
+from pathlib import Path
+from nb_log import get_logger
+
+URL_AUTH = "https://api.amazon.com/auth/o2/token"
+URL_AD_API = "https://advertising-api.amazon.com"
+
+cache = TTLCache(maxsize=10, ttl=3200)
+
+logger = get_logger(__name__)
+
+
+def gz_decompress(file_path: str, chunk_size: int = 1024 * 1024):
+    decompressed_file = file_path.rstrip(".gz")
+    with open(decompressed_file, "wb") as pw:
+        zf = gzip.open(file_path, mode='rb')
+        while True:
+            chunk = zf.read(size=chunk_size)
+            if not chunk:
+                break
+            pw.write(chunk)
+    return decompressed_file
+
+
+class BaseClient:
+    def __init__(
+            self, lwa_client_id: str, lwa_client_secret: str, refresh_token: str = None, profile_id: str = None,
+            data_path: str = "./"
+    ):
+        self.lwa_client_id = lwa_client_id
+        self.lwa_client_secret = lwa_client_secret
+        self.refresh_token = refresh_token
+        self.profile_id = profile_id
+        self.data_path = Path(data_path)
+        if not self.data_path.exists():
+            self.data_path.mkdir(parents=True)
+
+    @property
+    def access_token(self) -> str:
+        try:
+            return cache[self.refresh_token]
+        except KeyError:
+            resp = requests.post(URL_AUTH, data={
+                "grant_type": "refresh_token",
+                "client_id": self.lwa_client_id,
+                "refresh_token": self.refresh_token,
+                "client_secret": self.lwa_client_secret,
+            })
+            if resp.status_code != 200:
+                raise Exception(resp.text)
+            js = resp.json()
+            cache[self.refresh_token] = js["access_token"]
+            self.refresh_token = js["refresh_token"]
+            return js["access_token"]
+
+    @property
+    def auth_headers(self):
+        return {
+            "Amazon-Advertising-API-ClientId": self.lwa_client_id,
+            "Amazon-Advertising-API-Scope": self.profile_id,
+            "Authorization": f"Bearer {self.access_token}",
+        }
+
+    def _request(self, url_path: str, method: str = "GET", headers: dict = None, params: dict = None,
+                 body: dict = None):
+        head = self.auth_headers
+        if headers:
+            head.update(headers)
+        resp = requests.request(
+            method=method,
+            url=urljoin(URL_AD_API, url_path),
+            headers=head,
+            params=params,
+            json=body,
+        )
+        js = resp.json()
+        return js
+
+
+class SPClient(BaseClient):
+
+    def get_campaigns(self, **body):
+        url_path = "/sp/campaigns/list"
+        headers = {
+            "Accept": "application/vnd.spcampaign.v3+json",
+            "Content-Type": "application/vnd.spcampaign.v3+json"
+        }
+        return self._request(url_path, method="POST", headers=headers, body=body)
+
+    def get_ad_groups(self, **body):
+        url_path = "/sp/adGroups/list"
+        headers = {
+            "Accept": "application/vnd.spadGroup.v3+json",
+            "Content-Type": "application/vnd.spadGroup.v3+json"
+        }
+        return self._request(url_path, method="POST", body=body, headers=headers)
+
+    def get_ads(self, **body):
+        url_path = "/sp/productAds/list"
+        headers = {
+            "Accept": "application/vnd.spproductAd.v3+json",
+            "Content-Type": "application/vnd.spproductAd.v3+json"
+        }
+        return self._request(url_path, method="POST", body=body, headers=headers)
+
+    def get_keywords(self, **body):
+        url_path = "/sp/keywords/list"
+        headers = {
+            "Accept": "application/vnd.spKeyword.v3+json",
+            "Content-Type": "application/vnd.spKeyword.v3+json"
+        }
+        return self._request(url_path, method="POST", body=body, headers=headers)
+
+    def get_targets(self, **body):
+        url_path = "/sp/targets/list"
+        headers = {
+            "Accept": "application/vnd.sptargetingClause.v3+json",
+            "Content-Type": "application/vnd.sptargetingClause.v3+json"
+        }
+        return self._request(url_path, method="POST", body=body, headers=headers)
+
+    def get_budget(self, campaign_ids: list):
+        url_path = "/sp/campaigns/budget/usage"
+        body = {
+            "campaignIds": campaign_ids
+        }
+        ret_data = self._request(url_path, method="POST", body=body)
+        print(json.dumps(ret_data, ensure_ascii=False))
+
+
+class SBClient(BaseClient):
+    def get_campaigns(self, **body):
+        url_path = "/sb/v4/campaigns/list"
+        headers = {
+            "Accept": "application/vnd.sbcampaignresouce.v4+json",
+            "Content-Type": "application/vnd.sbcampaignresouce.v4+json"
+        }
+        return self._request(url_path, method="POST", body=body, headers=headers)
+
+    def iter_campaigns(self, **body) -> Iterator[dict]:
+        if "maxResults" not in body:
+            body["maxResults"] = 100
+        while True:
+            info: dict = self.get_campaigns(**body)
+            yield from info["campaigns"]
+            if not info.get("nextToken"):
+                break
+            body["nextToken"] = info["nextToken"]
+        logger.info(f"总共数量:{info['totalResults']}")
+
+    def get_ad_groups(self):
+        pass
+
+    def get_report(
+            self,
+            record_type: Literal['campaigns', 'adGroups', 'ads', 'targets', 'keywords'],
+            report_date: str,
+            metrics: List[str],
+            segment: Literal['placement', 'query'] = None,
+            creative_type: Literal['video', 'all'] = "all",
+            download: bool = True
+    ):
+        """
+        @param download: 是否下载文件
+        @param record_type:
+        @param report_date: 格式为YYYYMMDD,以请求的卖家市场所对应的时区为准,超过60天的报告不可用
+        @param metrics:
+        @param segment:
+        @param creative_type:
+            None:仅包含非视频广告
+            'video':仅包含视频广告
+            'all':包含视频和非视频广告
+        @return:
+        """
+        url = f"/v2/hsa/{record_type}/report"
+        body = {
+            "reportDate": report_date,
+            "metrics": ",".join(metrics),
+            "creativeType": creative_type,
+            "segment": segment
+        }
+        if record_type == "ads":
+            body["creativeType"] = "all"
+        ret = self._request(url, method="POST", body=body)
+        report_id = ret["reportId"]
+        status = ret["status"]
+        if status == "FAILURE":
+            raise Exception(ret)
+        logger.info(f"创建报告成功:{ret}")
+        while status == "IN_PROGRESS":
+            logger.debug(f"报告{report_id}正在处理中...")
+            time.sleep(3)
+            ret = self._request(f"/v2/reports/{report_id}")
+            status = ret["status"]
+            if status == "FAILURE":
+                raise Exception(ret)
+        logger.info(f"报告处理完成:{ret}")
+        if download:
+            self.download_report(report_id, str(self.data_path / f"sb_{record_type}.json.gz"))
+        else:
+            return ret
+
+    def download_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
+        url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
+        resp = requests.get(url, headers=self.auth_headers, stream=True)
+        logger.info(f"开始下载报告:{report_id}")
+        with open(file_path, "wb") as file:
+            for data in resp.iter_content(chunk_size=10 * 1024):
+                file.write(data)
+        logger.info(f"报告{report_id}下载完成:{file_path}")
+        if not decompress:
+            return file_path
+        de_file = gz_decompress(file_path)
+        logger.info(f"解压完成:{de_file}")
+        return de_file
+
+
+class SDClient(BaseClient):
+    def get_campaigns(self, **params) -> List[dict]:
+        url_path = "/sd/campaigns"
+        return self._request(url_path, params=params)
+
+
+if __name__ == '__main__':
+    AWS_CREDENTIALS = {
+        'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
+        'refresh_token': "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
+        'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
+        'profile_id': "3006125408623189"
+    }
+    # sd = SDClient(**AWS_CREDENTIALS)
+    # print(sd.get_campaigns(startIndex=10, count=10))
+
+    sb = SBClient(**AWS_CREDENTIALS)
+    metrics = [
+        'applicableBudgetRuleId',
+        'applicableBudgetRuleName',
+        'attributedConversions14d',
+        'attributedConversions14dSameSKU',
+        'attributedDetailPageViewsClicks14d',
+        'attributedOrderRateNewToBrand14d',
+        'attributedOrdersNewToBrand14d',
+        'attributedOrdersNewToBrandPercentage14d',
+        'attributedSales14d',
+        'attributedSales14dSameSKU',
+        'attributedSalesNewToBrand14d',
+        'attributedSalesNewToBrandPercentage14d',
+        'attributedUnitsOrderedNewToBrand14d',
+        'attributedUnitsOrderedNewToBrandPercentage14d',
+        'campaignBudget',
+        'campaignBudgetType',
+        'campaignId',
+        'campaignName',
+        'campaignRuleBasedBudget',
+        'campaignStatus',
+        'clicks',
+        'cost',
+        'dpv14d',
+        'impressions',
+        'unitsSold14d',
+        'attributedBrandedSearches14d',
+        'topOfSearchImpressionShare']
+    sb.get_report(
+        record_type="campaigns",
+        report_date="20231008",
+        metrics=metrics
+    )

+ 64 - 0
sync_amz_data/settings/__init__.py

@@ -0,0 +1,64 @@
+import os
+
+env = os.getenv("ANS_ENV")
+if not env:
+    raise Exception("get ANS_ENV failed, set it IN CAUTION!!!")
+elif env == "online":
+    from .online import *
+elif env == "test":
+    from .test import *
+elif env == "dev":
+    from .dev import *
+else:
+    raise Exception("ANS_ENV not in (online, test, dev)")
+
+REDIS_CONF = {
+    "host": REDIS_HOST,
+    "port": 6379,
+    "password": REDIS_PASSWORD,
+    "decode_responses": False,
+    "retry_on_timeout": True,
+    "max_connections": 50,
+    "db": 0
+}
+
+AWS_CREDENTIALS = {
+    'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
+    'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
+}
+
+LOG_CONF = {
+    "version": 1,
+    "disable_existing_loggers": False,
+    "formatters": {
+        "normal": {
+            "class": "logging.Formatter",
+            "format": "%(asctime)s [%(name)s] %(levelname)s: %(message)s"
+        }
+    },
+    "handlers": {
+        "console": {
+            "level": LOG_LEVEL,
+            "class": "logging.StreamHandler",
+            "formatter": "normal"
+        },
+        "email": {
+            'level': 'WARNING',
+            'class': 'logging.handlers.SMTPHandler',
+            'mailhost': ('email-smtp.us-east-1.amazonaws.com', 587),
+            'fromaddr': 'datalab@ansjer.com',
+            'toaddrs': [
+                '2607929016@qq.com',
+            ],
+            'subject': 'sync_amz_ad',
+            'credentials': ('AKIARBAGHTGOTVILWLB7', 'BEVt2GkzivI0xhnLWgwJCT2k7lDSg8HW+APD3mc4uHVg'),
+            'secure': {}
+        }
+    },
+    "loggers": {
+        "sync_amz_ad": {
+            "handlers": LOG_HANDLERS,
+            "level": LOG_LEVEL,
+        }
+    }
+}

+ 15 - 0
sync_amz_data/settings/dev.py

@@ -0,0 +1,15 @@
+REDIS_HOST = "172.18.0.4"
+REDIS_PASSWORD = "iwP5Zh1J75E5yj"
+
+DB_CONF = {
+    "host": "retail-data.cnrgrbcygoap.us-east-1.rds.amazonaws.com",
+    "port": 3306,
+    "user": "admin",
+    "password": "NSYbBSPbkGQUbOSNOeyy",
+    "database": "amzn_retail_ad",
+}
+
+LOG_LEVEL = "DEBUG"
+LOG_HANDLERS = ["console"]
+
+DATA_PATH = "/tmp"

+ 7 - 0
sync_amz_data/settings/online.py

@@ -0,0 +1,7 @@
+REDIS_HOST = "retail.zafjfx.ng.0001.use1.cache.amazonaws.com"
+REDIS_PASSWORD = None
+
+LOG_HANDLERS = ["console", "email"]  # 使用supervisor重定向日志文件
+
+DATA_PATH = "/var/run/amazon_ad_api"
+

+ 0 - 0
sync_amz_data/settings/test.py


+ 0 - 0
sync_amz_data/tasks/__init__.py


+ 0 - 0
sync_amz_data/tools.py