1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336 |
- import requests
- from urllib3.util.retry import Retry
- from requests.adapters import HTTPAdapter
- import json
- import time
- from cachetools import TTLCache
- from urllib.parse import urljoin
- from typing import List, Literal, Iterable, Iterator
- import gzip
- from pathlib import Path
- import s3fs
- from s3fs import S3FileSystem
- import logging
- URL_AUTH = "https://api.amazon.com/auth/o2/token"
- URL_AD_API = "https://advertising-api.amazon.com"
- cache = TTLCache(maxsize=10, ttl=3200)
- logger = logging.getLogger(__name__)
- def shop_infos(profile_id):
- resp_rel = requests.get("http://192.168.1.225/api/ad_manage/profiles/",headers={"X-Token": "da4ab6bc5cbf1dfa"})
- data = resp_rel.json().get("data")
- profile_info = {}
- for info in data:
- if info.get("profile_id") in [int(profile_id),str(profile_id)]:
- profile_info["profile_id"] = profile_id
- profile_info["refresh_token"] = info.get("refresh_token")
- profile_info["account_name"] = info.get("account_name")
- profile_info["advertiser_id"] = info.get("advertiser_id")
- profile_info["country_code"] = info.get("country_code")
- profile_info["marketplace_str_id"] =info.get("marketplace_str_id")
- profile_info["time_zone"] = info.get("time_zone")
- return profile_info
- return resp_rel.text
- class RateLimitError(Exception):
- def __init__(self, retry_after: str = None):
- self.retry_after = retry_after
- def gz_decompress(file_path: str, chunk_size: int = 1024 * 1024):
- decompressed_file = file_path.rstrip(".gz")
- with open(decompressed_file, "wb") as pw:
- zf = gzip.open(file_path, mode='rb')
- while True:
- chunk = zf.read(size=chunk_size)
- if not chunk:
- break
- pw.write(chunk)
- return decompressed_file
- class BaseClient:
- def __init__(
- self, lwa_client_id: str, lwa_client_secret: str, refresh_token: str = None, profile_id: str = None,
- data_path: str = "./"
- ):
- self.lwa_client_id = lwa_client_id
- self.lwa_client_secret = lwa_client_secret
- self.refresh_token = refresh_token
- self.profile_id = profile_id
- self.data_path = Path(data_path)
- if not self.data_path.exists():
- self.data_path.mkdir(parents=True)
- retry_strategy = Retry(
- total=5, # 重试次数
- allowed_methods=["GET", "POST"],
- # 强制重试的状态码,在method_whitelist中的请求方法才会重试
- status_forcelist=[429, 500, 502, 503, 504],
- raise_on_status=False, # 在status_forcelist中的状态码达到重试次数后是否抛出异常
- # backoff_factor * (2 ** (retry_time-1)), 即间隔1s, 2s, 4s, 8s, ...
- backoff_factor=2,
- )
- adapter = HTTPAdapter(max_retries=retry_strategy)
- self.session = requests.session()
- self.session.mount("https://", adapter)
- @property
- def access_token(self) -> str:
- try:
- return cache[self.refresh_token]
- except KeyError:
- resp = requests.post(URL_AUTH, data={
- "grant_type": "refresh_token",
- "client_id": self.lwa_client_id,
- "refresh_token": self.refresh_token,
- "client_secret": self.lwa_client_secret,
- })
- if resp.status_code != 200:
- raise Exception(resp.text)
- js = resp.json()
- cache[self.refresh_token] = js["access_token"]
- self.refresh_token = js["refresh_token"]
- return js["access_token"]
- @property
- def auth_headers(self):
- return {
- "Amazon-Advertising-API-ClientId": self.lwa_client_id,
- "Amazon-Advertising-API-Scope": self.profile_id,
- "Authorization": f"Bearer {self.access_token}",
- }
- def _request(self, url_path: str, method: str = "GET", headers: dict = None, params: dict = None,
- body: dict = None):
- head = self.auth_headers
- if headers:
- head.update(headers)
- resp = self.session.request(
- method=method,
- url=urljoin(URL_AD_API, url_path),
- headers=head,
- params=params,
- json=body,
- )
- if resp.status_code == 429:
- raise RateLimitError(resp.headers.get("Retry-After"))
- if resp.status_code >= 400:
- raise Exception(resp.text)
- return resp.json()
- def get_profilesInfo(self):
- url_path = "/v2/profiles"
- return self._request(url_path)
- class SPClient(BaseClient):
- def get_campaigns(self, **body):
- url_path = "/sp/campaigns/list"
- headers = {
- "Accept": "application/vnd.spcampaign.v3+json",
- "Content-Type": "application/vnd.spcampaign.v3+json"
- }
- return self._request(url_path, method="POST", headers=headers, body=body)
- def iter_campaigns(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_campaigns(**body)
- yield from info["campaigns"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_budgetrecommendation(self, campaign_ids):
- url_path = "/sp/campaigns/budgetRecommendations"
- body = {
- "campaignIds": campaign_ids
- }
- headers = {
- "Accept": "application/vnd.budgetrecommendation.v3+json",
- "Content-Type": "application/vnd.budgetrecommendation.v3+json"
- }
- return self._request(url_path, method="POST", headers=headers, body=body)
- def iter_budgetrecommendation(self,campaign_ids):
- for i in range(0,len(campaign_ids),100):
- campaign_id = campaign_ids[i:i+100]
- info: list = self.get_budgetrecommendation(campaign_id)
- yield from info["budgetRecommendationsSuccessResults"]
- def get_ad_groups(self, **body):
- url_path = "/sp/adGroups/list"
- headers = {
- "Accept": "application/vnd.spadGroup.v3+json",
- "Content-Type": "application/vnd.spadGroup.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_adGroups(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ad_groups(**body)
- yield from info["adGroups"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_ads(self, **body):
- url_path = "/sp/productAds/list"
- headers = {
- "Accept": "application/vnd.spproductAd.v3+json",
- "Content-Type": "application/vnd.spproductAd.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_ads(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ads(**body)
- yield from info["productAds"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_keywords(self, **body):
- url_path = "/sp/keywords/list"
- headers = {
- "Accept": "application/vnd.spKeyword.v3+json",
- "Content-Type": "application/vnd.spKeyword.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_keywords(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_keywords(**body)
- yield from info["keywords"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_targets(self, **body):
- url_path = "/sp/targets/list"
- headers = {
- "Accept": "application/vnd.sptargetingClause.v3+json",
- "Content-Type": "application/vnd.sptargetingClause.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_targets(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_targets(**body)
- yield from info["targetingClauses"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- logger.info(f"总共数量:{info['totalResults']}")
- def get_budget(self, campaign_ids: list):
- url_path = "/sp/campaigns/budget/usage"
- body = {
- "campaignIds": campaign_ids
- }
- return self._request(url_path, method="POST", body=body)
- def get_adgroup_bidrecommendation(
- self, campaignId: str, adGroupId: str, targetingExpressions: list,
- recommendationType: str = "BIDS_FOR_EXISTING_AD_GROUP"):
- url_path = "/sp/targets/bid/recommendations"
- headers = {
- "Accept": "application/vnd.spthemebasedbidrecommendation.v3+json",
- "Content-Type": "application/vnd.spthemebasedbidrecommendation.v3+json"
- }
- body = {
- "campaignId": campaignId,
- "adGroupId": adGroupId,
- "recommendationType": recommendationType,
- "targetingExpressions": targetingExpressions
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_adgroup_bidrecommendation(self,campaignId: str, adGroupId: str, targetingExpressions: list,
- recommendationType: str = "BIDS_FOR_EXISTING_AD_GROUP"):
- for i in range(0,len(targetingExpressions),100):
- try:
- info = self.get_adgroup_bidrecommendation(campaignId,adGroupId,targetingExpressions[i:i+100],recommendationType)
- yield from info['bidRecommendations']
- except:
- # print("空值")
- return iter([])
- def get_keyword_bidrecommendation(self, adGroupId: str, keyword: list, matchType: list):
- keywords = list(map(lambda x: {"keyword": x[0], "matchType": x[1]}, list(zip(keyword, matchType))))
- url_path = "/v2/sp/keywords/bidRecommendations"
- body = {"adGroupId": adGroupId,
- "keywords": keywords}
- return self._request(url_path, method="POST", body=body)
- def get_bidrecommendationList(self,adGroupId,expressions):
- url_path = "/v2/sp/targets/bidRecommendations"
- body = {
- "adGroupId": adGroupId,
- "expressions": expressions
- }
- return self._request(url_path,method="POST",body=body)
- def iter_bidrecommendationList(self, adGroupId, expressions):
- for i in range(0,len(expressions), 10):
- try:
- info = self.get_bidrecommendationList(adGroupId, expressions[i:i+10])
- out = info['recommendations']
- for i in out:
- i['adGroupId'] = adGroupId
- yield from out
- except:
- # print("空值")
- return iter([])
- def get_campaignNegativekeyword(self,**body):
- url_path = '/sp/campaignNegativeKeywords/list'
- headers = {
- "Accept": "application/vnd.spCampaignNegativeKeyword.v3+json",
- "Content-Type": "application/vnd.spCampaignNegativeKeyword.v3+json"
- }
- return self._request(url_path,method="POST",body=body,headers=headers)
- def iter_campaignNegativekeyword(self,**body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_campaignNegativekeyword(**body)
- yield from info["campaignNegativeKeywords"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_negativekeyword(self,**body):
- url_path = '/sp/negativeKeywords/list'
- headers = {
- "Accept": "application/vnd.spNegativeKeyword.v3+json",
- "Content-Type": "application/vnd.spNegativeKeyword.v3+json"
- }
- return self._request(url_path,method="POST",body=body,headers=headers)
- def iter_negativekeyword(self,**body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_negativekeyword(**body)
- yield from info["negativeKeywords"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_campaignNegativetargeting(self, **body):
- url_path = '/sp/campaignNegativeTargets/list'
- headers = {
- "Accept": "application/vnd.spCampaignNegativeTargetingClause.v3+json",
- "Content-Type": "application/vnd.spCampaignNegativeTargetingClause.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_campaignNegativetargeting(self, **body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_campaignNegativetargeting(**body)
- yield from info["campaignNegativeTargetingClauses"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_negativetargeting(self, **body):
- url_path = '/sp/negativeTargets/list'
- headers = {
- "Accept": "application/vnd.spNegativeTargetingClause.v3+json",
- "Content-Type": "application/vnd.spNegativeTargetingClause.v3+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_negativetargeting(self, **body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_negativetargeting(**body)
- yield from info["negativeTargetingClauses"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_targets_bid_recommendations(self,campaignId:str=None,
- adGroupId:str=None,
- asins:list=None,
- bid:float=None,
- keyword:str=None,
- userSelectedKeyword:bool=False,
- matchType:Literal["BROAD","EXACT","PHRASE"]="BROAD",
- recommendationType:Literal['KEYWORDS_FOR_ASINS','KEYWORDS_FOR_ADGROUP']="KEYWORDS_FOR_ASINS",
- sortDimension:Literal["CLICKS","CONVERSIONS","DEFAULT"]="DEFAULT",
- locale:Literal["ar_EG" ,"de_DE", "en_AE", "en_AU", "en_CA", "en_GB", "en_IN", "en_SA", "en_SG", "en_US",
- "es_ES", "es_MX", "fr_FR", "it_IT", "ja_JP", "nl_NL", "pl_PL", "pt_BR", "sv_SE", "tr_TR", "zh_CN"]="en_US"):
- """
- adGroupId与asins只能选择一个参数填写。
- @param campaignId: pass
- @param adGroupId: 如果recommendationType为KEYWORDS_FOR_ADGROUP时,为必填项
- @param asins:如果recommendationType为KEYWORDS_FOR_ASINS时,为必填项
- @param userSelectedKeyword:是否参考选择的keyword
- @param recommendationType:类型选择,必填
- @param locale:pass
- """
- headers = {"Accept":"application/vnd.spkeywordsrecommendation.v3+json",
- "Content-Type":"application/vnd.spkeywordsrecommendation.v3+json"}
- url_path = "/sp/targets/keywords/recommendations"
- body = {
- "recommendationType": recommendationType,
- "targets": [
- {
- "matchType": matchType,
- "keyword": keyword,
- "bid": bid,
- "userSelectedKeyword": userSelectedKeyword
- }
- ],
- "maxRecommendations": "200",
- "sortDimension": sortDimension,
- "locale": locale
- }
- if adGroupId is not None:
- body["campaignId"]=campaignId
- body["adGroupId"]= adGroupId
- else:
- body['asins'] = asins
- return self._request(url_path, method="POST", body=body,headers=headers)
- def get_v3_report(self,
- groupby:list,
- columns:list,
- startDate:str,
- endDate:str,
- reportType: Literal['spCampaigns','spAdvertisedProduct' ,'spPurchasedProduct', 'spTargeting', 'spSearchTerm'],
- timeUnit="DAILY",
- download=True,
- Rewrite=False):
- """
- @param groupby: 聚合条件,[campaign,adGroup, searchTerm,purchasedAsin,campaignPlacement,targeting,searchTerm,advertiser,asin]
- columns: 需要获取的字段
- """
- #######################################################################################################################################################
- if Rewrite:
- pid = self.profile_id
- report_info = {'groupby': groupby,
- 'columns': columns,
- 'startDate': startDate,
- 'endDate': endDate,
- 'reportType': reportType,
- 'timeUnit': timeUnit,
- 'download': download}
- reportrel = self.download_v3_report(report_info, None,
- f"s3://reportforspsbsd/zosi/us/sp/{str(groupby)}_{startDate}_{endDate}_{reportType}_{str(pid)}.json.gz")
- return reportrel
- ##############################################################################################################################################
- url_path = "/reporting/reports"
- headers = {
- "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
- }
- body = {
- "name":"SP campaigns report",
- "startDate":startDate,
- "endDate":endDate,
- "configuration":{
- "adProduct":"SPONSORED_PRODUCTS",
- "groupBy":groupby,
- "columns":columns,
- "reportTypeId":reportType,
- "timeUnit":timeUnit,
- "format":"GZIP_JSON"
- }
- }
- ret = self._request(url_path,method="POST",headers=headers,body=body)
- # print(ret)
- report_id = ret["reportId"]
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"创建报告成功:{ret}")
- while status in ["PROCESSING","PENDING"]:
- logger.debug(f"报告{report_id}正在处理中...")
- time.sleep(4)
- try:
- ret = self._request(f"/reporting/reports/{report_id}")
- except:
- time.sleep(15)
- ret = self._request(f"/reporting/reports/{report_id}")
- print(ret)
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"报告处理完成:{ret}")
- if download:
- pid = self.profile_id
- report_info = {'groupby': groupby,
- 'columns': columns,
- 'startDate': startDate,
- 'endDate': endDate,
- 'reportType': reportType,
- 'timeUnit': timeUnit,
- 'download': download}
- reportrel= self.download_v3_report(report_info,ret['url'],f"s3://reportforspsbsd/zosi/us/sp/{str(groupby)}_{startDate}_{endDate}_{reportType}_{str(pid)}.json.gz")
- return reportrel
- else:
- return ret
- def download_v3_report(self,report_info, url, file_path: str, decompress: bool = True,Rewrite=False) -> str:
- ##################################################################################
- if Rewrite:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- ##################################################################################
- resp = requests.get(url, stream=True, allow_redirects=True)
- # print(resp)
- if resp.status_code in [200, 207]:
- try:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- # print()
- with s3_.open(file_path, 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- try:
- with open(f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz", 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with open(f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz", 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- logger.info(f"过期开始重试")
- self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
- report_info['endDate'],
- report_info['reportType'], report_info['timeUnit'], report_info['download'])
- else:
- logger.info(f"状态码{resp.status_code},开始重试")
- self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
- report_info['endDate'],
- report_info['reportType'], report_info['timeUnit'], report_info['download'])
- class SBClient(BaseClient):
- def get_campaigns(self, **body):
- url_path = "/sb/v4/campaigns/list"
- headers = {
- "Accept": "application/vnd.sbcampaignresouce.v4+json",
- "Content-Type": "application/vnd.sbcampaignresouce.v4+json"
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def get_campaign_v3(self, campaignId):
- if campaignId is None:
- url_path = f'/sb/campaigns'
- else:
- url_path = f'/sb/campaigns/{campaignId}'
- return self._request(url_path, method="GET")
- def iter_campaigns(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_campaigns(**body)
- yield from info["campaigns"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- # logger.info(f"总共数量:{info['totalResults']}")
- def get_ad_groups(self, **body):
- url_path = "/sb/v4/adGroups/list"
- headers = {
- 'Content-Type': "application/vnd.sbadgroupresource.v4+json",
- 'Accept': "application/vnd.sbadgroupresource.v4+json"
- }
- return self._request(url_path, method="POST", headers=headers, body=body)
- def iter_adGroups(self, **body) -> Iterator[dict]:
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ad_groups(**body)
- # print(info)
- yield from info["adGroups"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_ads(self, **body):
- url_path = "/sb/v4/ads/list"
- headers = {
- 'Content-Type': "application/vnd.sbadresource.v4+json",
- 'Accept': "application/vnd.sbadresource.v4+json"
- }
- return self._request(url_path, method="POST", headers=headers, body=body)
- def iter_ads(self, **body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_ads(**body)
- # print(info)
- yield from info["ads"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_keywords(self,**param):
- url_path = "/sb/keywords"
- return self._request(url_path, method="GET",params=param)
- def get_keyword(self,keywordid):
- url_path = f'/sb/keywords/{keywordid}'
- return self._request(url_path,method="GET")
- def iter_keywords(self,**param):
- if "startIndex" not in param:
- param["startIndex"] = 0
- param["count"] = 5000
- while True:
- info:list = self.get_keywords(**param)
- # print(info)
- if len(info) == 0:
- break
- param["startIndex"] += 5000
- yield info
- def get_targets(self, **body):
- url_path = "/sb/targets/list"
- return self._request(url_path, method="POST", body=body)
- def iter_targets(self, **body):
- if "maxResults" not in body:
- body["maxResults"] = 100
- while True:
- info: dict = self.get_targets(**body)
- # print(info)
- yield from info["targets"]
- if not info.get("nextToken"):
- break
- body["nextToken"] = info["nextToken"]
- def get_budget(self, campaignIds: list):
- url_path = "/sb/campaigns/budget/usage"
- body = {"campaignIds": campaignIds}
- return self._request(url_path, method="POST", body=body)
- def get_keyword_bidrecommendation(self, campaignId: str, keywords: list,):
- url_path = "/sb/recommendations/bids"
- headers = {
- "Content-Type": "application/json"
- }
- body = {
- "campaignId": campaignId,
- "keywords": keywords
- }
- return self._request(url_path, method="POST", body=body, headers=headers)
- def iter_keyword_bidrecommendation(self,campaignId: str, keywords: list):
- for i in range(0, len(keywords), 100):
- try:
- info = self.get_keyword_bidrecommendation(campaignId, keywords[i:i+100])
- yield from info['keywordsBidsRecommendationSuccessResults']
- except:
- # print("空值")
- return iter([])
- def get_v3_report(self,
- groupby:list,
- columns:list,
- startDate:str,
- endDate:str,
- reportType: Literal['sbCampaigns', 'sbPurchasedProduct', 'sbTargeting', 'sbSearchTerm'],
- timeUnit="DAILY",
- download=True,
- Rewrite=False):
- """
- Now about reportType is only sbPurchasedProduct available.
- @param groupby: 聚合条件
- @param columns: 需要获取的字段[campaign,purchasedAsin,targeting,searchTerm]
- @param startDate: 请求开始的日期
- @param endDate: 请求结束的日期
- @param reportType: 广告类型
- @param timeUnit: 时间指标-[DAILY, SUMMARY]
- @param download: 下载报告
- """
- ##############################################################################################
- if Rewrite:
- pid = self.profile_id
- report_info = {'groupby': groupby,
- 'columns': columns,
- 'startDate': startDate,
- 'endDate': endDate,
- 'reportType': reportType,
- 'timeUnit': timeUnit,
- 'download': download}
- reportrel = self.download_v3_report(report_info, None,
- f"s3://reportforspsbsd/zosi/us/sb/{startDate}_{endDate}_{reportType}_{str(groupby)}_{str(pid)}.json.gz")
- return reportrel
- ###############################################################################################
- url_path = "/reporting/reports"
- headers = {
- "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
- }
- body = {
- "name":"SB campaigns report",
- "startDate":startDate,
- "endDate":endDate,
- "configuration":{
- "adProduct":"SPONSORED_BRANDS",
- "groupBy":groupby,
- "columns":columns,
- "reportTypeId":reportType,
- "timeUnit":timeUnit,
- "format":"GZIP_JSON"
- }
- }
- ret = self._request(url_path,method="POST",headers=headers,body=body)
- # print(ret)
- report_id = ret["reportId"]
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"创建报告成功:{ret}")
- while status in ["PROCESSING","PENDING"]:
- logger.debug(f"报告{report_id}正在处理中...")
- time.sleep(4)
- try:
- ret = self._request(f"/reporting/reports/{report_id}")
- except:
- time.sleep(15)
- ret = self._request(f"/reporting/reports/{report_id}")
- print(ret)
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"报告处理完成:{ret}")
- if download:
- pid = self.profile_id
- report_info = {'groupby': groupby,
- 'columns': columns,
- 'startDate': startDate,
- 'endDate': endDate,
- 'reportType': reportType,
- 'timeUnit': timeUnit,
- 'download': download}
- reportrel= self.download_v3_report(report_info,ret['url'],f"s3://reportforspsbsd/zosi/us/sb/{startDate}_{endDate}_{reportType}_{str(groupby)}_{str(pid)}.json.gz")
- return reportrel
- else:
- return ret
- def download_v3_report(self, report_info, url, file_path: str, decompress: bool = True,Rewrite=False) -> str:
- ###########################################################################################
- if Rewrite:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- #############################################################################################
- resp = requests.get(url, stream=True, allow_redirects=True)
- # print(resp)
- if resp.status_code in [200, 207]:
- try:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- # print()
- with s3_.open(file_path, 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- try:
- with open(
- f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
- 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with open(
- f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
- 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- logger.info(f"过期开始重试")
- self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
- report_info['endDate'],
- report_info['reportType'], report_info['timeUnit'], report_info['download'])
- else:
- logger.info(f"状态码{resp.status_code},开始重试")
- self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
- report_info['endDate'],
- report_info['reportType'], report_info['timeUnit'], report_info['download'])
- def get_v2_report(
- self,
- record_type: Literal['campaigns', 'adGroups', 'ads', 'targets', 'keywords'],
- report_date: str,
- metrics: List[str],
- segment: Literal['placement', 'query'] = None,
- creative_type: Literal['video', 'all'] = "all",
- download: bool = True,
- Rewrite=False
- ):
- """
- @param download: 是否下载文件
- @param record_type:
- @param report_date: 格式为YYYYMMDD,以请求的卖家市场所对应的时区为准,超过60天的报告不可用
- @param metrics:
- @param segment:
- @param creative_type:
- None:仅包含非视频广告
- 'video':仅包含视频广告
- 'all':包含视频和非视频广告
- @return:
- """
- ############################################################################################
- if Rewrite:
- pid = self.profile_id
- report_info = {"record_type": record_type, "report_date": report_date, "metrics": metrics,
- "segment": segment, "creative_type": creative_type, "download": download}
- reportrel = self.download_v2_report(report_info, None,
- f"s3://reportforspsbsd/zosi/us/sb/{str(report_date)}_{record_type}_{creative_type}_{segment}_{str(pid)}.gz"
- )
- return reportrel
- #############################################################################################
- url = f"/v2/hsa/{record_type}/report"
- body = {
- "reportDate": report_date,
- "metrics": ",".join(metrics),
- "creativeType": creative_type,
- "segment": segment
- }
- if record_type == "ads":
- body["creativeType"] = "all"
- ret = self._request(url, method="POST", body=body)
- report_id = ret["reportId"]
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"创建报告成功:{ret}")
- while status == "IN_PROGRESS":
- logger.debug(f"报告{report_id}正在处理中...")
- time.sleep(4)
- try:
- ret = self._request(f"/v2/reports/{report_id}")
- except:
- time.sleep(15)
- ret = self._request(f"/v2/reports/{report_id}")
- print(ret)
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"报告处理完成:{ret}")
- if download:
- pid = self.profile_id
- report_info = {"record_type": record_type, "report_date": report_date, "metrics": metrics,
- "segment": segment, "creative_type": creative_type, "download": download}
- reportrel= self.download_v2_report(report_info,report_id, f"s3://reportforspsbsd/zosi/us/sb/{str(report_date)}_{record_type}_{creative_type}_{segment}_{str(pid)}.gz")
- return reportrel
- else:
- return ret
- def download_v2_report(self, report_info, report_id: str, file_path: str, decompress: bool = True,Rewrite=False) -> str:
- ################################################################################
- if Rewrite:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- # print()
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- ################################################################################
- url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
- resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
- # print(resp.status_code)
- if resp.status_code in [200, 207]:
- try:
- logger.info(f"开始下载报告:{report_id}")
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- # print()
- with s3_.open(file_path, 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- # print(resp.text)
- f.write(data)
- logger.info(f"报告{report_id}下载完成:{file_path}")
- if not decompress:
- return file_path
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- try:
- with open(
- f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['creative_type']}_{str(self.profile_id)}.json.gz",
- 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with open(
- f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['creative_type']}_{str(self.profile_id)}.json.gz",
- 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- logger.info(f"过期开始重试")
- self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
- report_info['segment'], report_info['creative_type'], report_info['download'])
- else:
- logger.info(f"状态码{resp.status_code},开始重试")
- self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
- report_info['segment'], report_info['creative_type'], report_info['download'])
- class SDClient(BaseClient):
- def get_campaigns(self, **params) -> List[dict]:
- url_path = "/sd/campaigns"
- return self._request(url_path, params=params)
- def get_campaigns_extended(self, **params) -> List[dict]:
- url_path = "/sd/campaigns/extended"
- return self._request(url_path, params=params)
- def get_adGroups(self,**params):
- url_path = '/sd/adGroups'
- return self._request(url_path, params=params)
- def iter_adGroups(self,**param):
- if "startIndex" not in param:
- param["startIndex"] = 0
- param["count"] = 5000
- while True:
- info:list = self.get_adGroups(**param)
- # print(info)
- if len(info) == 0:
- break
- param["startIndex"] += 5000
- yield info
- def get_ads(self,**params):
- url_path = '/sd/productAds'
- return self._request(url_path, params=params)
- def iter_ads(self,**param):
- if "startIndex" not in param:
- param["startIndex"] = 0
- param["count"] = 5000
- while True:
- info:list = self.get_ads(**param)
- # print(info)
- if len(info) == 0:
- break
- param["startIndex"] += 5000
- yield info
- def get_targets(self,**params):
- url_path = '/sd/targets'
- return self._request(url_path, params=params)
- def iter_targets(self,**param):
- if "startIndex" not in param:
- param["startIndex"] = 0
- param["count"] = 5000
- while True:
- info:list = self.get_targets(**param)
- # print(info)
- if len(info) == 0:
- break
- param["startIndex"] += 5000
- yield info
- def get_budget(self, campaignIds: list):
- url_path = "/sd/campaigns/budget/usage"
- body = {"campaignIds": campaignIds}
- return self._request(url_path, method="POST", body=body)
- def get_target_bidrecommendation(self,tactic:str,products:list,typeFilter:list,themes:dict,locale:str='en_US'):#
- url_path = '/sd/targets/recommendations'
- headers ={
- 'Content-Type':"application/vnd.sdtargetingrecommendations.v3.3+json",
- 'Accept':"application/vnd.sdtargetingrecommendations.v3.3+json"
- }
- # "tactic":"T00020",
- # "products":[{"asin":"B00MP57IOY"}],
- # "typeFilter":["PRODUCT"],
- # "themes":{"product":[{"name":"TEST","expression":[{"type":"asinBrandSameAs"}]}]}
- body = {
- "tactic":tactic,
- "products":products,
- "typeFilter":typeFilter,
- "themes":themes
- }
- return self._request(url_path, method="POST", headers=headers,body=body,params={"locale":locale})
- def get_v3_report(self,
- groupby:list,
- columns:list,
- startDate:str,
- endDate:str,
- reportType: Literal['sdCampaigns', 'sdPurchasedProduct', 'sdTargeting', 'sdSearchTerm'],
- timeUnit="DAILY",
- download=True,
- Rewrite=False):
- """
- Now about reportType is only sbPurchasedProduct available.
- @param groupby: 聚合条件
- @param columns: 需要获取的字段[campaign,purchasedAsin,targeting,searchTerm]
- @param startDate: 请求开始的日期
- @param endDate: 请求结束的日期
- @param reportType: 广告类型
- @param timeUnit: 时间指标-[DAILY, SUMMARY]
- @param download: 下载报告
- """
- ###############################################################################################
- if Rewrite:
- pid = self.profile_id
- report_info = {'groupby': groupby,
- 'columns': columns,
- 'startDate': startDate,
- 'endDate': endDate,
- 'reportType': reportType,
- 'timeUnit': timeUnit,
- 'download': download}
- reportrel = self.download_v3_report(report_info, None,
- f"s3://reportforspsbsd/zosi/us/sd/{startDate}_{endDate}_{reportType}_{str(groupby)}_{str(pid)}.json.gz")
- return reportrel
- ##################################################################################################
- url_path = "/reporting/reports"
- headers = {
- "Content-Type":"application/vnd.createasyncreportrequest.v3+json"
- }
- body = {
- "name":"SD campaigns report",
- "startDate":startDate,
- "endDate":endDate,
- "configuration":{
- "adProduct":"SPONSORED_DISPLAY",
- "groupBy":groupby,
- "columns":columns,
- "reportTypeId":reportType,
- "timeUnit":timeUnit,
- "format":"GZIP_JSON"
- }
- }
- ret = self._request(url_path,method="POST",headers=headers,body=body)
- # print(ret)
- report_id = ret["reportId"]
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"创建报告成功:{ret}")
- while status in ["PROCESSING","PENDING"]:
- logger.debug(f"报告{report_id}正在处理中...")
- time.sleep(4)
- try:
- ret = self._request(f"/reporting/reports/{report_id}")
- except:
- time.sleep(15)
- ret = self._request(f"/reporting/reports/{report_id}")
- print(ret)
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"报告处理完成:{ret}")
- if download:
- pid = self.profile_id
- report_info = {'groupby':groupby,
- 'columns':columns,
- 'startDate':startDate,
- 'endDate':endDate,
- 'reportType': reportType,
- 'timeUnit':timeUnit,
- 'download':download}
- reportrel= self.download_v3_report(report_info,ret['url'],f"s3://reportforspsbsd/zosi/us/sd/{startDate}_{endDate}_{reportType}_{str(groupby)}_{str(pid)}.json.gz")
- return reportrel
- else:
- return ret
- def download_v3_report(self, report_info, url, file_path: str, decompress: bool = True,Rewrite=False) -> str:
- #######################################################################
- if Rewrite:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- #########################################################################
- resp = requests.get(url, stream=True, allow_redirects=True)
- # print(resp)
- if resp.status_code in [200, 207]:
- try:
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- # print()
- with s3_.open(file_path, 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- try:
- with open(
- f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
- 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with open(
- f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
- 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- logger.info(f"过期开始重试")
- self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
- report_info['endDate'],
- report_info['reportType'], report_info['timeUnit'], report_info['download'])
- else:
- logger.info(f"状态码{resp.status_code},开始重试")
- self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
- report_info['endDate'],
- report_info['reportType'], report_info['timeUnit'], report_info['download'])
- def get_v2_report(
- self,
- record_type: Literal['campaigns', 'adGroups', 'productAds', 'targets', 'asins'],
- report_date: str,
- metrics: List[str],
- segment: Literal['matchedTarget'] = None,
- tactic: Literal['T00020', 'T00030'] = None,
- download: bool = True,
- Rewrite=False
- ):
- """
- @param download: 是否下载文件
- @param record_type:
- @param report_date: 格式为YYYYMMDD,以请求的卖家市场所对应的时区为准,超过60天的报告不可用
- @param metrics:
- @param segment:
- @param tactic:
- T00020: contextual targeting
- T00030: audience targeting
- @return:
- """
- url = f"/sd/{record_type}/report"
- body = {
- "reportDate": report_date,
- "metrics": ",".join(metrics),
- "tactic": tactic,
- "segment": segment
- }
- ret = self._request(url, method="POST", body=body)
- report_id = ret["reportId"]
- status = ret["status"]
- print(ret)
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"创建报告成功:{ret}")
- while status == "IN_PROGRESS":
- logger.debug(f"报告{report_id}正在处理中...")
- time.sleep(4)
- try:
- ret = self._request(f"/v2/reports/{report_id}")
- except:
- time.sleep(15)
- ret = self._request(f"/v2/reports/{report_id}")
- print(ret)
- status = ret["status"]
- if status == "FAILURE":
- raise Exception(ret)
- logger.info(f"报告处理完成:{ret}")
- if download:
- pid = self.profile_id
- report_info = {"record_type":record_type,"report_date":report_date,"metrics":metrics,"segment":segment,"tactic":tactic,"download":download}
- reportrel= self.download_v2_report(report_info,report_id, f"s3://reportforspsbsd/zosi/us/sd/{str(report_date)}_{record_type}_{tactic}_{segment}_{str(pid)}.gz")
- return reportrel
- else:
- return ret
- def download_v2_report(self, report_info,report_id: str, file_path: str, decompress: bool = True) -> str:
- url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
- resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
- # print(resp.status_code)
- if resp.status_code in [200,207]:
- try:
- logger.info(f"开始下载报告:{report_id}")
- kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
- 'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
- 'aws_secret_access_key': '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'}
- s3_ = S3FileSystem(client_kwargs=kwargs)
- # print()
- with s3_.open(file_path, 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- # print(resp.text)
- f.write(data)
- logger.info(f"报告{report_id}下载完成:{file_path}")
- if not decompress:
- return file_path
- with s3_.open(file_path, 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- try:
- with open(
- f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
- 'wb') as f:
- for data in resp.iter_content(chunk_size=10 * 1024):
- f.write(data)
- if not decompress:
- return file_path
- with open(
- f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
- 'rb') as f: # 读取s3数据
- data = gzip.GzipFile(fileobj=f, mode='rb')
- de_file = json.load(data)
- # logger.info(f"解压完成:{de_file}")
- # print(de_file)
- return de_file
- except:
- logger.info(f"过期开始重试")
- self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
- report_info['segment'], report_info['tactic'], report_info['download'])
- else:
- logger.info(f"状态码{resp.status_code},开始重试")
- self.get_v2_report(report_info['record_type'],report_info['report_date'],report_info['metrics'],
- report_info['segment'],report_info['tactic'],report_info['download'])
- class Account(BaseClient):
- def get_portfolios(self):
- url_path = "/v2/portfolios/extended"
- return self._request(url_path)
- def iter_portfolios(self):
- yield from self.get_portfolios()
- AccountClient = Account
- if __name__ == '__main__':
- AWS_CREDENTIALS = {
- 'lwa_client_id': 'amzn1.application-oa2-client.ebd701cd07854fb38c37ee49ec4ba109',
- 'refresh_token': "Atzr|IwEBIL4ur8kbcwRyxVu_srprAAoTYzujnBvA6jU-0SMxkRgOhGjYJSUNGKvw24EQwJa1jG5RM76mQD2P22AKSq8qSD94LddoXGdKDO74eQVYl0RhuqOMFqdrEZpp1p4bIR6_N8VeSJDHr7UCuo8FiabkSHrkq7tsNvRP-yI-bnpQv4EayPBh7YwHVX3hYdRbhxaBvgJENgCuiEPb35Q2-Z6w6ujjiKUAK2VSbCFpENlEfcHNsjDeY7RCvFlwlCoHj1IeiNIaFTE9yXFu3aEWlExe3LzHv6PZyunEi88QJSXKSh56Um0e0eEg05rMv-VBM83cAqc5POmZnTP1vUdZO8fQv3NFLZ-xU6e1WQVxVPi5Cyqk4jYhGf1Y9t98N654y0tVvw74qNIsTrB-8bGS0Uhfe24oBEWmzObvBY3zhtT1d42myGUJv4pMTU6yPoS83zhPKm3LbUDEpBA1hvvc_09jHk7vUEAuFB-UAZzlht2C1yklzQ",
- 'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
- 'profile_id': "3006125408623189"
- }
- sp = SPClient(**AWS_CREDENTIALS)
- rel = sp.iter_negativekeyword()
- print(list(rel))
- # print(rel)
|