Create a Traffic Source in Python
1
Decode NETOSECRET
Decodes base64 encoded NETOSECRET to its JSON fields
class NetoSecret:
def __init__(self, api_key_base64):
self.decode_netosecret(api_key_base64)
def decode_netosecret(self, api_key_base64):
decoded_str = base64.b64decode(api_key_base64).decode()
netosecret_json = json.loads(decoded_str)
self.app_name = netosecret_json.get("appname")
self.app_key = netosecret_json.get("appkey")
self.shared_secret = netosecret_json.get("sharedsecret")
self.short_name = netosecret_json.get("shortname")
self.url = netosecret_json.get("url")2
Authenticate to Fusion API
Construct JWT request token and authenticate
def get_netography_access_token(netosecret: Optional[NetoSecret] = None) -> Optional[str]:
"""
Asynchronously retrieve and store the Netography Fusion access token.
Returns None if authentication fails.
"""
payload = {
"iat": int(time.time()),
"jti": random.randint(0, 10000000),
"appname": netosecret.app_name,
"appkey": netosecret.app_key,
"shortname": netosecret.short_name,
}
jwt_token = jwt.encode(payload, netosecret.shared_secret, algorithm="HS256")
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {jwt_token}",
}
logging.info(
f"Authenticating to Netography Fusion API for {netosecret.short_name} at {netosecret.url}"
)
url = f"{netosecret.url}/auth/token"
session.post(url, json={"jwt": jwt_token}, headers=headers)
response = session.post(url, json={"jwt": jwt_token}, headers=headers)
if response.status_code != 200:
error_text = response.text()
logger.error(
f"Failed to authenticate with Netography Fusion API: "
f"{response.status_code}, {error_text}"
)
return None
response_data = response.json()
return response_data["access_token"]3
Create traffic source
Create a traffic source
def create_flow_source(
netosecret: NetoSecret,
role_arn: str,
log_bucket: str,
vpc_region: str,
log_bucket_region: str,
log_bucket_prefix: str,
vpc_id: str,
account_id: str,
):
logging.info(
f"Creating flow source for {vpc_id} in account {account_id} in region {log_bucket_region}"
)
token = get_netography_access_token(netosecret)
create_payload = {
"awsauthtype": "RoleARN",
"role": {"arn": role_arn},
"bucket": log_bucket,
"bucketregion": log_bucket_region,
"prefix": log_bucket_prefix,
"region": vpc_region,
"sqsurl": "",
"name": f"{account_id}-{vpc_id}",
"enabled": True,
"hiveprefix": False,
"flowtype": "aws",
"flowresource": "s3",
"accountid": account_id,
"traffictype": "flow",
"deleteobjects": False,
"samplerate": 100,
"tags": [f"{vpc_region}", f"{vpc_id}"],
}
headers["authorization"] = f"Bearer {token}"
requests.post(
f"{netosecret.url}/vpc",
json=create_payload,
headers=headers,
)
def get_flow_sources(token):
url = f"{self.netosecret.url}/vpc"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
return response.json().get("data", [])
def delete_flow_source(fs_id, token):
url = f"{self.netosecret.url}/vpc/{fs_id}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.delete(url, headers=headers)
Last updated