123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- import requests
- import json
- import time
- from cachetools import TTLCache
- from urllib.parse import urljoin
- from typing import List, Literal, Iterable, Iterator
- import gzip
- from pathlib import Path
- from logging import getLogger
- URL_AUTH = "https://api.amazon.com/auth/o2/token"
- URL_AD_API = "https://advertising-api.amazon.com"
- cache = TTLCache(maxsize=10, ttl=3200)
- logger = getLogger(__name__)
- def gz_decompress(file_path: str, chunk_size: int = 1024 * 1024):
- decompressed_file = file_path.rstrip(".gz")
- with open(decompressed_file, "wb") as pw:
- zf = gzip.open(file_path, mode='rb')
- while True:
- chunk = zf.read(size=chunk_size)
- if not chunk:
- break
- pw.write(chunk)
- return decompressed_file
- class BaseClient:
- def __init__(
- self, lwa_client_id: str, lwa_client_secret: str, refresh_token: str = None, profile_id: str = None,
- data_path: str = "./"
- ):
- self.lwa_client_id = lwa_client_id
- self.lwa_client_secret = lwa_client_secret
- self.refresh_token = refresh_token
- self.profile_id = profile_id
- self.data_path = Path(data_path)
- if not self.data_path.exists():
- self.data_path.mkdir(parents=True)
- @property
- def access_token(self) -> str:
- try:
- return cache[self.refresh_token]
- except KeyError:
- resp = requests.post(URL_AUTH, data={
- "grant_type": "refresh_token",
- "client_id": self.lwa_client_id,
- "refresh_token": self.refresh_token,
- "client_secret": self.lwa_client_secret,
- })
- if resp.status_code != 200:
- raise Exception(resp.text)
- js = resp.json()
- cache[self.refresh_token] = js["access_token"]
- self.refresh_token = js["refresh_token"]
- return js["access_token"]
- @property
- def auth_headers(self):
- return {
- "Amazon-Advertising-API-ClientId": self.lwa_client_id,
- "Amazon-Advertising-API-Scope": self.profile_id,
- "Authorization": f"Bearer {self.access_token}",
- }
- def _request(self, url_path: str, method: str = "GET", headers: dict = None, params: dict = None,
- body: dict = None):
- head = self.auth_headers
- if headers:
- head.update(headers)
- resp = requests.request(
- method=method,
- url=urljoin(URL_AD_API, url_path),
- headers=head,
- params=params,
- json=body,
- )
- js = resp.json()
- return js
- class SPClient(BaseClient):
- def get_campaigns(self, **body):
- url_path = "/sp/campaigns/list"
- headers = {
- "Accept": "application/vnd.spcampaign.v3+json",
- "Content-Type": "application/vnd.spcampaign.v3+json"
- }
- return self._request(url_path, method="POST", headers=headers, body=body)
- def iter_campaigns(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_campaigns(**body)
- yield from info["campaigns"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_ad_groups(self, **body):
- url_path = "/sp/adGroups/list"
- headers = {
- "Accept": "application/vnd.spadGroup.v3+json",
- "Content-Type": "application/vnd.spadGroup.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_adGroups(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ad_groups(**body)
- yield from info["adGroups"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_ads(self, **body):
- url_path = "/sp/productAds/list"
- headers = {
- "Accept": "application/vnd.spproductAd.v3+json",
- "Content-Type": "application/vnd.spproductAd.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_ads(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ads(**body)
- yield from info["productAds"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_keywords(self, **body):
- url_path = "/sp/keywords/list"
- headers = {
- "Accept": "application/vnd.spKeyword.v3+json",
- "Content-Type": "application/vnd.spKeyword.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_keywords(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_keywords(**body)
- yield from info["keywords"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_targets(self, **body):
- url_path = "/sp/targets/list"
- headers = {
- "Accept": "application/vnd.sptargetingClause.v3+json",
- "Content-Type": "application/vnd.sptargetingClause.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_targets(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_targets(**body)
- yield from info["targetingClauses"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_budget(self, campaign_ids: list):
- url_path = "/sp/campaigns/budget/usage"
- body = {
- "campaignIds": campaign_ids
- }
- return self._request(url_path, method="POST", body=body)
- def get_adgroup_bidrecommendation(self,campaignId:str,adGroupId:str,targetingExpressions:list,recommendationType:str="BIDS_FOR_EXISTING_AD_GROUP"):
- url_path = "/sp/targets/bid/recommendations"
- headers = {
- "Accept": "application/vnd.spthemebasedbidrecommendation.v3+json",
- "Content-Type": "application/vnd.spthemebasedbidrecommendation.v3+json"
- }
- body = {"campaignId":campaignId,
- "adGroupId":adGroupId,
- "recommendationType":recommendationType,
- "targetingExpressions":targetingExpressions}
- return self._request(url_path, method="POST", body=body, headers=headers)
- def get_keyword_bidrecommendation(self,adGroupId:str,keyword:list,matchType:list):
- keywords = list(map(lambda x:{"keyword":x[0],"matchType":x[1]},list(zip(keyword,matchType))))
- print(keywords)
- url_path = "/v2/sp/keywords/bidRecommendations"
- body = {"adGroupId":adGroupId,
- "keywords":keywords}
- return self._request(url_path, method="POST", body=body)
- class SBClient(BaseClient):
- def get_campaigns(self, **body):
- url_path = "/sb/v4/campaigns/list"
- headers = {
- "Accept": "application/vnd.sbcampaignresouce.v4+json",
- "Content-Type": "application/vnd.sbcampaignresouce.v4+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_campaigns(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_campaigns(**body)
- yield from info["campaigns"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- # logger.info(f"总共数量:{info['totalResults']}")
- def get_ad_groups(self,**body):
- url_path = "/sb/v4/adGroups/list"
- headers = {
- 'Content-Type': "application/vnd.sbadgroupresource.v4+json",
- 'Accept': "application/vnd.sbadgroupresource.v4+json"
- }
- return self._request(url_path, method="POST", headers=headers,body=body)
- def iter_adGroups(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ad_groups(**body)
- print(info)
- yield from info["adGroups"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_ads(self,**body):
- url_path = "/sb/v4/ads/list"
- headers = {'Content-Type': "application/vnd.sbadresource.v4+json",
- 'Accept': "application/vnd.sbadresource.v4+json"
- }
- return self._request(url_path, method="POST", headers=headers,body=body)
- def iter_ads(self,**body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ads(**body)
- print(info)
- yield from info["ads"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_keywords(self):
- url_path = "/sb/keywords"
- return self._request(url_path, method="GET")
- def get_targets(self,**body):
- url_path = "/sb/targets/list"
- return self._request(url_path, method="POST", body=body)
- def iter_targets(self,**body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_targets(**body)
- # print(info)
- yield from info["targets"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_budget(self,campaignIds:list):
- url_path = "/sb/campaigns/budget/usage"
- body = {"campaignIds":campaignIds}
- return self._request(url_path, method="POST",body=body)
- def get_keyword_bidrecommendation(self,**body):
- url_path = "/sb/recommendations/bids"
- return self._request(url_path, method="POST", body=body)
- def get_report(
- self,
- record_type: Literal['campaigns', 'adGroups', 'ads', 'targets', 'keywords'],
- report_date: str,
- metrics: List[str],
- segment: Literal['placement', 'query'] = None,
- creative_type: Literal['video', 'all'] = "all",
- download: bool = True
- ):
- """
- @param download: 是否下载文件
- @param record_type:
- @param report_date: 格式为YYYYMMDD,以请求的卖家市场所对应的时区为准,超过60天的报告不可用
- @param metrics:
- @param segment:
- @param creative_type:
- None:仅包含非视频广告
- 'video':仅包含视频广告
- 'all':包含视频和非视频广告
- @return:
- """
- url = f"/v2/hsa/{record_type}/report"
- body = {
- "reportDate": report_date,
- "metrics": ",".join(metrics),
- "creativeType": creative_type,
- "segment": segment
- }
- if record_type == "ads":
- body["creativeType"] = "all"
- ret = self._request(url, method="POST", body=body)
- report_id = ret["reportId"]
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"创建报告成功:{ret}")
- while status == "IN_PROGRESS":
- logger.debug(f"报告{report_id}正在处理中...")
- time.sleep(3)
- ret = self._request(f"/v2/reports/{report_id}")
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"报告处理完成:{ret}")
- if download:
- self.download_report(report_id, str(self.data_path / f"sb_{record_type}.json.gz"))
- else:
- return ret
- def download_report(self, report_id: str, file_path: str, decompress: bool = True) -> str:
- url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
- resp = requests.get(url, headers=self.auth_headers, stream=True)
- logger.info(f"开始下载报告:{report_id}")
- with open(file_path, "wb") as file:
- for data in resp.iter_content(chunk_size=10 * 1024):
- file.write(data)
- logger.info(f"报告{report_id}下载完成:{file_path}")
- if not decompress:
- return file_path
- de_file = gz_decompress(file_path)
- logger.info(f"解压完成:{de_file}")
- return de_file
- class SDClient(BaseClient):
- def get_campaigns(self, **params) -> List[dict]:
- url_path = "/sd/campaigns"
- return self._request(url_path, params=params)
- class Account(BaseClient):
- def get_portfolio(self):
- url_path = "/v2/portfolios/extended"
- return self._request(url_path)
- if __name__ == '__main__':
- AWS_CREDENTIALS = {
- 'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
- 'refresh_token': "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
- 'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
- 'profile_id': "3006125408623189"
- }
- # sp = SPClient(**AWS_CREDENTIALS)
- # print(sp.get_keyword_bidrecommendation(adGroupId="119753215871672",keyword=["8mp security camera system","8mp security camera system"],matchType=["broad","exact"]))
- sb = SBClient(**AWS_CREDENTIALS)
- # print(list(sb.iter_targets()))
- print(sb.get_keyword_bidrecommendation(**{'campaignId': 27333596383941,'keywords':[{"matchType":'broad',"keywordText":"4k security camera system"}]}))
- print(sb.get_budget([27333596383941]))
- # sd = SDClient(**AWS_CREDENTIALS)
- # print(sd.get_campaigns(startIndex=10, count=10))
- # sb = SBClient(**AWS_CREDENTIALS)
- # metrics = [
- # 'applicableBudgetRuleId',
- # 'applicableBudgetRuleName',
- # 'attributedConversions14d',
- # 'attributedConversions14dSameSKU',
- # 'attributedDetailPageViewsClicks14d',
- # 'attributedOrderRateNewToBrand14d',
- # 'attributedOrdersNewToBrand14d',
- # 'attributedOrdersNewToBrandPercentage14d',
- # 'attributedSales14d',
- # 'attributedSales14dSameSKU',
- # 'attributedSalesNewToBrand14d',
- # 'attributedSalesNewToBrandPercentage14d',
- # 'attributedUnitsOrderedNewToBrand14d',
- # 'attributedUnitsOrderedNewToBrandPercentage14d',
- # 'campaignBudget',
- # 'campaignBudgetType',
- # 'campaignId',
- # 'campaignName',
- # 'campaignRuleBasedBudget',
- # 'campaignStatus',
- # 'clicks',
- # 'cost',
- # 'dpv14d',
- # 'impressions',
- # 'unitsSold14d',
- # 'attributedBrandedSearches14d',
- # 'topOfSearchImpressionShare']
- # sb.get_report(
- # record_type="campaigns",
- # report_date="20231008",
- # metrics=metrics
- # )
|