huangyifan 1 éve
szülő
commit
7bcef9ee88

+ 1 - 0
.gitignore

@@ -8,6 +8,7 @@ __pycache__/
 
 # C extensions
 *.so
+Pipfile.lock
 
 # Distribution / packaging
 .Python

+ 3 - 3
Pipfile

@@ -4,9 +4,9 @@ verify_ssl = true
 name = "pypi.tuna.tsinghua.edu.cn"
 
 [packages]
-retry = "*"
-requests = "*"
-cachetools = "*"
+retry = "==0.9.2"
+requests = "==2.31.0"
+cachetools = "==5.3.1"
 
 [dev-packages]
 

+ 9 - 0
start_sync_amz.py

@@ -0,0 +1,9 @@
+from sync_amz_data.tasks.account import AccountTask
+from sync_amz_data.settings import LOG_CONF
+
+import logging.config
+
+logging.config.dictConfig(LOG_CONF)
+
+if __name__ == '__main__':
+    AccountTask("3006125408623189").do({"record": "portfolios"})

+ 8 - 2
sync_amz_data/DataTransform/Data_ETL.py

@@ -9,7 +9,7 @@ pd.set_option('expand_frame_repr', False)
 
 class Common_ETLMethod:
     def columnsName_modify(self,df):
-        df.columns = [i.replace(".","_") for i in df.columns]
+        df.columns = [i.replace(".","_").lower() for i in df.columns]
         return df
 
     def time_stamp_convert(self,df,time_columns:list):
@@ -120,6 +120,12 @@ class SB_ETL(SBClient,Common_ETLMethod):
 
         return self.columnsName_modify(df_targets)
 
+    def budget_ETL(self,campaign_ids:list):
+        list_budget = self.get_budget(campaignIds = campaign_ids)['success']
+        df_budget = pd.json_normalize(list_budget)
+        df_budget = self.TZ_Deal(df_budget,["usageUpdatedTimestamp"])
+        print(df_budget)
+
 if __name__ == '__main__':
     AWS_CREDENTIALS = {
         'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
@@ -129,4 +135,4 @@ if __name__ == '__main__':
     }
     ac_etl = SB_ETL(**AWS_CREDENTIALS)
     # print(ac_etl.budget_ETL(campaign_ids=["126327624499318"]))
-    print(ac_etl.targets_ETL())
+    print(ac_etl.get_budget(["144123082741012379"]))

+ 2 - 0
sync_amz_data/public/__init__.py

@@ -0,0 +1,2 @@
+from .amz_ad_client import SPClient, SBClient, SDClient, BaseClient, AccountClient
+from .asj_client import asj_api

+ 2 - 0
sync_amz_data/public/amz_ad_client.py

@@ -265,6 +265,7 @@ class SBClient(BaseClient):
     def get_keywords(self,**param):
         url_path = "/sb/keywords"
         return self._request(url_path, method="GET",params=param)
+
     def iter_keywords(self,**param):
         if "startIndex" not in param:
             param["startIndex"] = 0
@@ -296,6 +297,7 @@ class SBClient(BaseClient):
         url_path = "/sb/campaigns/budget/usage"
         body = {"campaignIds":campaignIds}
         return self._request(url_path, method="POST",body=body)
+
     def get_keyword_bidrecommendation(self,**body):
         url_path = "/sb/recommendations/bids"
         return self._request(url_path, method="POST", body=body)

+ 42 - 0
sync_amz_data/public/asj_client.py

@@ -0,0 +1,42 @@
+import requests
+from urllib3.util.retry import Retry
+from requests.adapters import HTTPAdapter
+import logging
+from typing import Union, List
+
+logger = logging.getLogger(__name__)
+
+
+class AsjHttpClient:
+    """广告后端API客户端"""
+
+    def __init__(self):
+        retry_strategy = Retry(
+            total=3,  # 重试次数
+            # 强制重试的状态码,在method_whitelist中的请求方法才会重试
+            method_whitelist=["GET", "POST"],
+            status_forcelist=[429, 500, 502, 503, 504],
+            raise_on_status=False,  # 在status_forcelist中的状态码达到重试次数后是否抛出异常
+            # backoff_factor * (2 ** (retry_time-1)), 即间隔1s, 2s, 4s, 8s, ...
+            backoff_factor=1,
+        )
+        adapter = HTTPAdapter(max_retries=retry_strategy)
+        self.session = requests.session()
+        self.session.mount("http://", adapter)
+
+    def _request(self, method, url, *, params=None, body=None):
+        headers = {"X-Token": "da4ab6bc5cbf1dfa"}
+        resp = self.session.request(method, url, params=params, headers=headers, json=body)
+        if resp.status_code >= 400:
+            raise Exception(resp.text)
+        js = resp.json()
+        if js.get("code") != 2000:
+            raise Exception(js)
+
+        return js
+
+    def create(self, url: str, data: [dict, List[dict]]):
+        return self._request("POST", url, body=data)
+
+
+asj_api = AsjHttpClient()

+ 7 - 2
sync_amz_data/settings/__init__.py

@@ -1,4 +1,5 @@
 import os
+import logging
 
 env = os.getenv("ANS_ENV")
 if not env:
@@ -22,7 +23,7 @@ REDIS_CONF = {
     "db": 0
 }
 
-AWS_CREDENTIALS = {
+AWS_LWA_CLIENT = {
     'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
     'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
 }
@@ -50,12 +51,16 @@ LOG_CONF = {
             'toaddrs': [
                 '2607929016@qq.com',
             ],
-            'subject': 'sync_amz_ad',
+            'subject': 'sync_amz_data',
             'credentials': ('AKIARBAGHTGOTVILWLB7', 'BEVt2GkzivI0xhnLWgwJCT2k7lDSg8HW+APD3mc4uHVg'),
             'secure': {}
         }
     },
     "loggers": {
+        "": {
+            "handlers": LOG_HANDLERS,
+            "level": "DEBUG",
+        },
         "sync_amz_ad": {
             "handlers": LOG_HANDLERS,
             "level": LOG_LEVEL,

+ 1 - 1
sync_amz_data/settings/dev.py

@@ -9,7 +9,7 @@ DB_CONF = {
     "database": "amzn_retail_ad",
 }
 
-LOG_LEVEL = "DEBUG"
+LOG_LEVEL = 10
 LOG_HANDLERS = ["console"]
 
 DATA_PATH = "/tmp"

+ 2 - 0
sync_amz_data/tasks/__init__.py

@@ -0,0 +1,2 @@
+from .sb import SBTask
+

+ 65 - 0
sync_amz_data/tasks/_base.py

@@ -0,0 +1,65 @@
+from functools import lru_cache
+import logging
+from typing import List
+
+from sync_amz_data.public import BaseClient, asj_api
+from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
+
+logger = logging.getLogger(__name__)
+ASJ_URL_BASE = "http://127.0.0.1:8000/api/ad_manage/"
+
+
+@lru_cache(maxsize=100)
+def query_shop_info(profile_id: str) -> dict:
+    # todo 临时
+    if profile_id == "3006125408623189":
+        return {
+            "profile_id": profile_id,
+            "id": 1,
+            "shop_name": "ZosiDirect",
+            "region": "NA",
+            "access_token": "Atza|IwEBION12cAeJLW1CrhirYbH8ianuTfh1JQtC2nbnZjWcCk_J0v9XVF00Pm7AYzZPHXAuFgdul0vuQt4XUAcwVdzQF9AzDfk5wKAUXe9fuGZhb0nq0mdJxg08u2BR_rUggxKWSd1sg6OW7szIEq8xQzok9hcr-Ai-WupyaA-CCznOt7STmyEZltNsK8VuJb7ySxTlxwf-DbuX2Tn9JdsEta7DwQIsHcYv2QwsfLYGnk2LcLUUXG-6TtwCWFGHMxHfoScOvN92hOHiPl3CsdTs5RmKO5eVFdf0XUu8OU5Z9icnjuP1tYBy7_e9s2oTL9fVLLfH_ATUJplPsfm1MhtW6ioX9IXxfJSmOJ0ntEv45ndb9t-wHE_vLukuy_4jwmy_50NU_TOU_9pbFJLQhVyAB0f4HVcb5fgPn--feAX89ANhRJnIn5zibVk_rY_rte7Xu7JMJNNW41PoCUvfVdxjnorhGTIGh2u2JHQPqdww2xLcZ93SQ",
+            "status": 1,
+            "refresh_token": "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
+            "update_time": 1688351991,
+            "token_expires_time": 1688355491,
+            "create_time": 1683702488
+        }
+    return {}
+
+
+class BaseTask:
+    AmzAdClientClass = BaseClient
+
+    def __init__(self, profile_id: str):
+        self.shop_info = query_shop_info(profile_id)
+        self.ad_cil = self.AmzAdClientClass(
+            profile_id=profile_id,
+            refresh_token=self.shop_info["refresh_token"],
+            data_path=DATA_PATH, **AWS_LWA_CLIENT)
+
+    def do(self, task_info: dict):
+        record = task_info["record"]
+        iter_records = getattr(self.ad_cil, f"iter_{record}", None)
+        change_func = getattr(self, f"change_{record}", None)
+        ad_api_params = task_info.get("params")
+        if ad_api_params is None:
+            records_iterator = iter_records()
+        else:
+            records_iterator = iter_records(ad_api_params)
+        for data in records_iterator:
+            if change_func:
+                data = change_func(data)
+            self.to_mysql(record, data)
+            logger.info(data)
+
+    def batch_do(self, task_info: dict):
+        """
+        适用于使用pandas等进行批处理
+        @param task_info:
+        @return:
+        """
+        pass
+
+    def to_mysql(self, record: str, data: [dict, List[dict]]):
+        asj_api.create(f"{ASJ_URL_BASE}{record}/", data)

+ 19 - 0
sync_amz_data/tasks/account.py

@@ -0,0 +1,19 @@
+from ._base import BaseTask
+from sync_amz_data.public import AccountClient
+from sync_amz_data.tools import timestamp2utc_dt
+
+
+class AccountTask(BaseTask):
+    AmzAdClientClass = AccountClient
+
+    def change_portfolios(self, data: dict):
+        data["shop"] = self.shop_info["profile_id"]
+        for key in ["lastUpdatedDate", "creationDate"]:
+            if key in data:
+                data[key] = timestamp2utc_dt(data[key])
+
+        budget: dict = data.pop("budget", None)
+        if budget:
+            for key, val in budget.items():
+                data[f"budget_{key}"] = val
+        return data

+ 16 - 0
sync_amz_data/tasks/sb.py

@@ -0,0 +1,16 @@
+from sync_amz_data.public import SBClient
+from sync_amz_data.tasks._base import BaseTask
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class SBTask(BaseTask):
+    AmzAdClientClass = SBClient
+
+    def change_campaigns(self, data: dict):
+        return data
+
+    def change_groups(self, data: dict):
+        return data

+ 10 - 0
sync_amz_data/tasks/sd.py

@@ -0,0 +1,10 @@
+from sync_amz_data.public import SDClient
+from sync_amz_data.tasks._base import BaseTask
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class SDTask(BaseTask):
+    AmzAdClientClass = SDClient

+ 9 - 0
sync_amz_data/tasks/sp.py

@@ -0,0 +1,9 @@
+from sync_amz_data.public import SPClient
+from sync_amz_data.tasks._base import BaseTask
+
+
+class SpTask(BaseTask):
+    AmzAdClientClass = SPClient
+
+    def change_campaigns(self, data: dict):
+        return data

+ 15 - 0
sync_amz_data/tools.py

@@ -0,0 +1,15 @@
+# from dateutil.parser import isoparse
+from datetime import date, datetime, timezone
+
+
+def timestamp2utc_dt(timestamp: int, _format: str = "%Y-%m-%d %H:%M:%S") -> [str, datetime]:
+    if len(str(timestamp)) == 13:  # _ms
+        timestamp = int(timestamp / 1000)
+    utc_dt = datetime.utcfromtimestamp(timestamp)
+    if _format:
+        return utc_dt.strftime(_format)
+    return utc_dt
+
+
+if __name__ == '__main__':
+    print(timestamp2utc_dt(1574232571377))