from sync_amz_data.DataTransform.Data_ETL import SP_ETL import requests from urllib.parse import urljoin from sync_amz_data.public.amz_ad_client import SPClient from sync_amz_data.settings import AWS_LWA_CLIENT import pandas as pd import json from sync_amz_data.tasks.datainsert.wg import LADS class RateLimitError(Exception): def __init__(self, retry_after: str = None): self.retry_after = retry_after def request(url_path: str, method: str = "GET", head: dict = None, params: dict = None, body: dict = None, AD = LADS): ADS = AD resp = requests.session().request( method=method, url=urljoin(ADS, url_path), headers=head, params=params, json=body, ) if resp.status_code == 429: raise RateLimitError(resp.headers.get("Retry-After")) if resp.status_code >= 400: raise Exception(resp.text) return resp.json() class ProductLineDetail: def __init__(self, profile_id): self.profile_id = profile_id self.re_url_path = "api/ad_manage/profiles/" self.upcreate_url_path = "api/sellers/productlinedetail/update/pasin" self.heads = {'X-Token': "da4ab6bc5cbf1dfa"} self.refresh_token = self.get_refresh_token() self.lwa_client_id = AWS_LWA_CLIENT['lwa_client_id'] self.lwa_client_secret = AWS_LWA_CLIENT['lwa_client_secret'] self.AWS_CREDENTIALS = { 'lwa_client_id': self.lwa_client_id, 'lwa_client_secret': self.lwa_client_secret, 'refresh_token': self.refresh_token, 'profile_id': self.profile_id } def get_refresh_token(self): params = {'profile_id': self.profile_id} heads = self.heads url_path = self.re_url_path tem = request(url_path=url_path, head=heads, params=params) if tem.get('data') is not None: _ = tem.get('data') out = _[0].get('refresh_token') else: out = None return out def updata_create(self): heads = self.heads url_path = self.upcreate_url_path params = {'profileId': self.profile_id} tem = request(url_path=url_path, head=heads, method="GET", params=params) return tem if __name__ == '__main__': a = ProductLineDetail(profile_id="3006125408623189") # out = a.get_sptargets_data() out = a.updata_create() print(out)