Browse Source

Merge branch 'master' into yifan

# Conflicts:
#	sync_amz_data/DataTransform/Data_ETL.py
huangyifan 1 year ago
parent
commit
8445c8b78a

+ 0 - 1
start_sync_amz.py

@@ -1,4 +1,3 @@
-from sync_amz_data.tasks.account import AccountTask
 from sync_amz_data.settings import LOG_CONF
 import logging.config
 logging.config.dictConfig(LOG_CONF)

+ 20 - 2
sync_amz_data/public/amz_ad_client.py

@@ -665,9 +665,27 @@ class SBClient(BaseClient):
         body = {"campaignIds": campaignIds}
         return self._request(url_path, method="POST", body=body)
 
-    def get_keyword_bidrecommendation(self, **body):
+
+    def get_keyword_bidrecommendation(self, campaignId: str, keywords: list,):
         url_path = "/sb/recommendations/bids"
-        return self._request(url_path, method="POST", body=body)
+        headers = {
+            "Content-Type": "application/json"
+        }
+        body = {
+            "campaignId": campaignId,
+            "keywords": keywords
+        }
+        return self._request(url_path, method="POST", body=body, headers=headers)
+
+    def iter_keyword_bidrecommendation(self,campaignId: str, keywords: list):
+        for i in range(0, len(keywords), 100):
+            try:
+                info = self.get_keyword_bidrecommendation(campaignId, keywords[i:i+100])
+                yield from info['keywordsBidsRecommendationSuccessResults']
+            except:
+                # print("空值")
+                return iter([])
+
 
     def get_v3_report(self,
                       groupby:list,

+ 0 - 65
sync_amz_data/tasks/_base.py

@@ -1,65 +0,0 @@
-from functools import lru_cache
-import logging
-from typing import List
-
-from sync_amz_data.public import BaseClient, asj_api
-from sync_amz_data.settings import AWS_LWA_CLIENT, DATA_PATH
-
-logger = logging.getLogger(__name__)
-ASJ_URL_BASE = "http://127.0.0.1:8000/api/ad_manage/"
-
-
-@lru_cache(maxsize=100)
-def query_shop_info(profile_id: str) -> dict:
-    # todo 临时
-    if profile_id == "3006125408623189":
-        return {
-            "profile_id": profile_id,
-            "id": 1,
-            "shop_name": "ZosiDirect",
-            "region": "NA",
-            "access_token": "Atza|IwEBION12cAeJLW1CrhirYbH8ianuTfh1JQtC2nbnZjWcCk_J0v9XVF00Pm7AYzZPHXAuFgdul0vuQt4XUAcwVdzQF9AzDfk5wKAUXe9fuGZhb0nq0mdJxg08u2BR_rUggxKWSd1sg6OW7szIEq8xQzok9hcr-Ai-WupyaA-CCznOt7STmyEZltNsK8VuJb7ySxTlxwf-DbuX2Tn9JdsEta7DwQIsHcYv2QwsfLYGnk2LcLUUXG-6TtwCWFGHMxHfoScOvN92hOHiPl3CsdTs5RmKO5eVFdf0XUu8OU5Z9icnjuP1tYBy7_e9s2oTL9fVLLfH_ATUJplPsfm1MhtW6ioX9IXxfJSmOJ0ntEv45ndb9t-wHE_vLukuy_4jwmy_50NU_TOU_9pbFJLQhVyAB0f4HVcb5fgPn--feAX89ANhRJnIn5zibVk_rY_rte7Xu7JMJNNW41PoCUvfVdxjnorhGTIGh2u2JHQPqdww2xLcZ93SQ",
-            "status": 1,
-            "refresh_token": "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
-            "update_time": 1688351991,
-            "token_expires_time": 1688355491,
-            "create_time": 1683702488
-        }
-    return {}
-
-
-class BaseTask:
-    AmzAdClientClass = BaseClient
-
-    def __init__(self, profile_id: str):
-        self.shop_info = query_shop_info(profile_id)
-        self.ad_cil = self.AmzAdClientClass(
-            profile_id=profile_id,
-            refresh_token=self.shop_info["refresh_token"],
-            data_path=DATA_PATH, **AWS_LWA_CLIENT)
-
-    def do(self, task_info: dict):
-        record = task_info["record"]
-        iter_records = getattr(self.ad_cil, f"iter_{record}", None)
-        change_func = getattr(self, f"change_{record}", None)
-        ad_api_params = task_info.get("params")
-        if ad_api_params is None:
-            records_iterator = iter_records()
-        else:
-            records_iterator = iter_records(ad_api_params)
-        for data in records_iterator:
-            if change_func:
-                data = change_func(data)
-            self.to_mysql(record, data)
-            logger.info(data)
-
-    def batch_do(self, task_info: dict):
-        """
-        适用于使用pandas等进行批处理
-        @param task_info:
-        @return:
-        """
-        pass
-
-    def to_mysql(self, record: str, data: [dict, List[dict]]):
-        asj_api.create(f"{ASJ_URL_BASE}{record}/", data)

+ 0 - 19
sync_amz_data/tasks/account.py

@@ -1,19 +0,0 @@
-from ._base import BaseTask
-from sync_amz_data.public import AccountClient
-from sync_amz_data.tools import timestamp2utc_dt
-
-
-class AccountTask(BaseTask):
-    AmzAdClientClass = AccountClient
-
-    def change_portfolios(self, data: dict):
-        data["shop"] = self.shop_info["profile_id"]
-        for key in ["lastUpdatedDate", "creationDate"]:
-            if key in data:
-                data[key] = timestamp2utc_dt(data[key])
-
-        budget: dict = data.pop("budget", None)
-        if budget:
-            for key, val in budget.items():
-                data[f"budget_{key}"] = val
-        return data

+ 113 - 0
sync_amz_data/tasks/datainsert/SB/mysql_datainsert_sb_keywordsbid_recommendations.py

@@ -0,0 +1,113 @@
+import requests
+from urllib.parse import urljoin
+from sync_amz_data.public.amz_ad_client import SBClient
+from sync_amz_data.settings import AWS_LWA_CLIENT
+import pandas as pd
+import json
+
+pd.set_option('display.max_columns', None)
+# 显示所有行
+pd.set_option('display.max_rows', None)
+
+
+class RateLimitError(Exception):
+    def __init__(self, retry_after: str = None):
+        self.retry_after = retry_after
+
+
+def request(url_path: str, method: str = "GET", head: dict = None, params: dict = None, body: dict = None):
+    ADS = "http://192.168.1.23:8001/"
+    resp = requests.session().request(
+        method=method,
+        url=urljoin(ADS, url_path),
+        headers=head,
+        params=params,
+        json=body,
+    )
+    if resp.status_code == 429:
+        raise RateLimitError(resp.headers.get("Retry-After"))
+    if resp.status_code >= 400:
+        raise Exception(resp.text)
+    return resp.json()
+
+
+class SbkeywordsBidRecommendations:
+    def __init__(self, profile_id):
+        self.profile_id = profile_id
+        self.re_url_path = "api/ad_manage/profiles/"
+        self.sbk_url_path = "api/ad_manage/sbkbrkeywords/"
+        self.upcreate_url_path = "api/ad_manage/sbkeywordsbidrecommendation/updata/"
+        self.heads = {'X-Token': "da4ab6bc5cbf1dfa"}
+        self.refresh_token = self.get_refresh_token()
+        self.lwa_client_id = AWS_LWA_CLIENT['lwa_client_id']
+        self.lwa_client_secret = AWS_LWA_CLIENT['lwa_client_secret']
+        self.AWS_CREDENTIALS = {
+            'lwa_client_id': self.lwa_client_id,
+            'lwa_client_secret': self.lwa_client_secret,
+            'refresh_token': self.refresh_token,
+            'profile_id': self.profile_id
+        }
+
+    def get_refresh_token(self):
+        params = {'profile_id': self.profile_id}
+        heads = self.heads
+        url_path = self.re_url_path
+        tem = request(url_path=url_path, head=heads, params=params)
+        if tem.get('data') is not None:
+            _ = tem.get('data')
+            out = _[0].get('refresh_token')
+        else:
+            out = None
+        return out
+
+    def get_arg(self):
+        heads = self.heads
+        url_path = self.sbk_url_path
+        data = []
+        page = 1
+        params = {'profile_id': self.profile_id, 'limit': 999, 'page': page}
+        tem = request(url_path=url_path, head=heads, params=params)
+        data.extend(tem.get('data'))
+        while tem.get('is_next') is True:
+            page += 1
+            params = {'profile_id': self.profile_id, 'limit': 999, 'page': page}
+            tem = request(url_path=url_path, head=heads, params=params)
+            data.extend(tem.get('data'))
+        _ = pd.json_normalize(data)
+        df = _.copy()
+        df['keywords'] = df[['matchType', 'keywordText', 'keywordId']].apply(lambda x: x.to_dict(), axis=1)
+        df_grouped = df.groupby(['campaignId']).agg({'keywords': list}).reset_index()
+        return df_grouped
+
+    def get_sbkeywordsbidrecommendation_data(self):
+        tem = SBClient(**self.AWS_CREDENTIALS)
+        df_arg = self.get_arg()
+        data_json = df_arg.to_json(orient='records')
+        list_arg = json.loads(data_json)
+        list_outdata = []
+        for i in list_arg:
+            list_sptbr = tem.iter_keyword_bidrecommendation(**i)
+            list_outdata.extend(list(list_sptbr))
+        _ = pd.json_normalize(list_outdata)
+        df_out = _[['keyword.keywordId', 'recommendedBid.rangeStart', 'recommendedBid.rangeEnd',
+                    'recommendedBid.recommended']].copy()
+        df_out.rename(columns={'keyword.keywordId': 'keyword',
+                               'recommendedBid.rangeStart': 'suggestedBid_lower',
+                               'recommendedBid.rangeEnd': 'suggestedBid_upper',
+                               'recommendedBid.recommended': 'suggestedBid'
+                               }, inplace=True)
+        json_data = json.loads(df_out.to_json(orient='records', force_ascii=False))
+        return json_data
+
+    def updata_create(self):
+        body = self.get_sbkeywordsbidrecommendation_data()
+        heads = self.heads
+        url_path = self.upcreate_url_path
+        tem = request(url_path=url_path, head=heads, body=body, method="POST")
+        return tem
+
+if __name__ == '__main__':
+    a = SbkeywordsBidRecommendations(profile_id="3006125408623189")
+    # out = a.get_sptargetsbidrecommendation_data()
+    out = a.updata_create()
+    print(out)

+ 2 - 2
sync_amz_data/tasks/datainsert/SB/mysql_datainsert_sbkeyword_v3.py

@@ -27,7 +27,7 @@ def request(url_path: str, method: str = "GET", head: dict = None, params: dict
     return resp.json()
 
 
-class SpKeyword:
+class SbKeyword:
     def __init__(self, profile_id, campaignId: list = None):
         self.profile_id = profile_id
         self.portfolioId = campaignId
@@ -91,7 +91,7 @@ class SpKeyword:
 
 
 if __name__ == '__main__':
-    a = SpKeyword(profile_id="3006125408623189")
+    a = SbKeyword(profile_id="3006125408623189")
     out = a.updata_create()
     # out = a.dataconvert()
     print(out)

+ 88 - 0
sync_amz_data/tasks/datainsert/SB/mysql_datainsert_sbtarget.py

@@ -0,0 +1,88 @@
+from sync_amz_data.DataTransform.Data_ETL import SB_ETL
+import requests
+from urllib.parse import urljoin
+from sync_amz_data.settings import AWS_LWA_CLIENT
+import pandas as pd
+import json
+
+
+class RateLimitError(Exception):
+    def __init__(self, retry_after: str = None):
+        self.retry_after = retry_after
+
+
+def request(url_path: str, method: str = "GET", head: dict = None, params: dict = None, body: dict = None):
+    ADS = "http://192.168.1.23:8001/"
+    resp = requests.session().request(
+        method=method,
+        url=urljoin(ADS, url_path),
+        headers=head,
+        params=params,
+        json=body,
+    )
+    if resp.status_code == 429:
+        raise RateLimitError(resp.headers.get("Retry-After"))
+    if resp.status_code >= 400:
+        raise Exception(resp.text)
+    return resp.json()
+
+
+class SbTargets:
+    def __init__(self, profile_id):
+        self.profile_id = profile_id
+        self.re_url_path = "api/ad_manage/profiles/"
+        self.upcreate_url_path = "api/ad_manage/sbtargets/updata/"
+        self.heads = {'X-Token': "da4ab6bc5cbf1dfa"}
+        self.refresh_token = self.get_refresh_token()
+        self.lwa_client_id = AWS_LWA_CLIENT['lwa_client_id']
+        self.lwa_client_secret = AWS_LWA_CLIENT['lwa_client_secret']
+        self.AWS_CREDENTIALS = {
+            'lwa_client_id': self.lwa_client_id,
+            'lwa_client_secret': self.lwa_client_secret,
+            'refresh_token': self.refresh_token,
+            'profile_id': self.profile_id
+        }
+
+    def get_refresh_token(self):
+        params = {'profile_id': self.profile_id}
+        heads = self.heads
+        url_path = self.re_url_path
+        tem = request(url_path=url_path, head=heads, params=params)
+        if tem.get('data') is not None:
+            _ = tem.get('data')
+            out = _[0].get('refresh_token')
+        else:
+            out = None
+        return out
+
+    def get_sbtargets_data(self):
+        tem = SB_ETL(**self.AWS_CREDENTIALS)
+        df = tem.targets_ETL()
+        if len(df) > 0:
+            df.rename(columns={'adgroupid': 'adGroup',
+                               'campaignid': 'campaign',
+                               'targetid': 'targetId',
+                               'expressions_type': 'expression_type',
+                               'expressions_value': 'expression_value',
+                               'resolvedexpressions_type': 'resolvedExpression_type',
+                               'resolvedexpressions_value': 'resolvedExpression_value'}, inplace=True)
+            df.drop(columns=['expressions', 'resolvedexpressions'], inplace=True)
+            df['profile'] = self.profile_id
+        return df
+
+    def updata_create(self):
+        df_data = self.get_sbtargets_data()
+        _ = df_data.to_json(orient='records', date_format='iso')
+        body = json.loads(_)
+        heads = self.heads
+        url_path = self.upcreate_url_path
+        tem = request(url_path=url_path, head=heads, body=body, method="POST")
+        return tem
+
+
+
+if __name__ == '__main__':
+    a = SbTargets(profile_id="3006125408623189")
+    # out = a.get_sbtargets_data()
+    out = a.updata_create()
+    print(out)

+ 0 - 4
sync_amz_data/tasks/datainsert/SP/mysql_datainsert_sp_targetsbid_recommendations.py

@@ -5,10 +5,6 @@ from sync_amz_data.settings import AWS_LWA_CLIENT
 import pandas as pd
 import json
 
-pd.set_option('display.max_columns', None)
-# 显示所有行
-pd.set_option('display.max_rows', None)
-
 
 class RateLimitError(Exception):
     def __init__(self, retry_after: str = None):

+ 31 - 0
sync_amz_data/tasks/datainsert/alldata_insert.py

@@ -10,6 +10,12 @@ from SP.mysql_datainsert_sp_targetsbid_recommendations_v2 import SpTargetsBidRec
 from SP.mysql_datainsert_spnegativekeyword import SpNegativeKeyword
 from SP.mysql_datainsert_spnegativetarget import SpNegativeTarget
 
+from SB.mysql_datainsert_sbcampaign import SbCampaign
+from SB.mysql_datainsert_sbgroup import SbGroup
+from SB.mysql_datainsert_sbkeyword_v3 import SbKeyword
+from SB.mysql_datainsert_sbads import SbAds
+from SB.mysql_datainsert_sb_keywordsbid_recommendations import SbkeywordsBidRecommendations
+from SB.mysql_datainsert_sbtarget import SbTargets
 
 pf = Portfolios("3006125408623189")
 pfo = pf.updata_create()
@@ -56,3 +62,28 @@ spnko = spnk.updata_create()
 print('SpNegativeKeyword', spnko)
 
 
+sbc = SbCampaign(profile_id="3006125408623189")
+sbco = sbc.updata_create()
+print("SbCampaign", sbco)
+
+sbg = SbGroup(profile_id="3006125408623189")
+sbgo = sbg.updata_create()
+print("SbGroup", sbgo)
+
+sbk = SbKeyword(profile_id="3006125408623189")
+sbko = sbk.updata_create()
+print("SbKeyword", sbko)
+
+sba = SbAds(profile_id="3006125408623189")
+sbao = sba.updata_create()
+print("SbAds", sbao)
+
+sbkbr = SbkeywordsBidRecommendations(profile_id="3006125408623189")
+sbkbro = sbkbr.updata_create()
+print("SbkeywordsBidRecommendations", sbkbro)
+
+sbt = SbTargets(profile_id="3006125408623189")
+sbto = sbt.updata_create()
+print("SbTargets", sbto)
+
+

+ 0 - 16
sync_amz_data/tasks/sb.py

@@ -1,16 +0,0 @@
-from sync_amz_data.public import SBClient
-from sync_amz_data.tasks._base import BaseTask
-
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class SBTask(BaseTask):
-    AmzAdClientClass = SBClient
-
-    def change_campaigns(self, data: dict):
-        return data
-
-    def change_groups(self, data: dict):
-        return data

+ 0 - 10
sync_amz_data/tasks/sd.py

@@ -1,10 +0,0 @@
-from sync_amz_data.public import SDClient
-from sync_amz_data.tasks._base import BaseTask
-
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class SDTask(BaseTask):
-    AmzAdClientClass = SDClient

+ 0 - 9
sync_amz_data/tasks/sp.py

@@ -1,9 +0,0 @@
-from sync_amz_data.public import SPClient
-from sync_amz_data.tasks._base import BaseTask
-
-
-class SpTask(BaseTask):
-    AmzAdClientClass = SPClient
-
-    def change_campaigns(self, data: dict):
-        return data