NetoAPI Python class to create traffic sources in Fusion
1
Decode & Validate NETOSECRET
import base64
import json
import logging
import random
import time
from functools import wraps
import os
import json
import jwt
import requests
from requests.exceptions import HTTPError, RequestException
class NetoAPIError(Exception):
"""Exception for NetoAPI operations."""
pass
class NetoSecret:
"""Decode and validate a Base64-encoded NETOSECRET."""
def __init__(self, netosecret: str):
try:
app_data = self.decode(netosecret)
except Exception as e:
raise ValueError("Invalid NETOSECRET") from e
self.app_name = app_data.get("appname")
self.app_key = app_data.get("appkey")
self.shared_secret = app_data.get("sharedsecret")
self.short_name = app_data.get("shortname")
self.url = app_data.get("url")
if any(v is None for v in (self.app_name, self.app_key, self.shared_secret, self.short_name, self.url)):
raise ValueError("NETOSECRET missing required fields")
def decode(self, api_key_base64: str) -> dict:
"""Base64-decode and parse JSON payload."""
decoded = base64.b64decode(api_key_base64).decode()
return json.loads(decoded)2
Initialize NetoAPI Client
class NetoAPI:
"""Client for Netography Fusion traffic-source CRUD + auth."""
def __init__(
self,
netosecret_string: str,
logger: logging.Logger = None,
auth_on_init: bool = True,
token: str = None,
):
self.logger = logger or logging.getLogger(__name__)
self.netosecret = NetoSecret(netosecret_string)
self.access_token = token
self.auth_attempt = bool(token)
if auth_on_init and not self.auth_attempt:
if not self.auth(force=True):
raise NetoAPIError("Initial authentication failed")3
Authentication & Token Management
def authenticated_api(func):
"""Decorator: ensure valid token, retry once on 401."""
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.access_token:
if not self.auth():
raise NetoAPIError("Authentication failed")
try:
return func(self, *args, **kwargs)
except HTTPError as e:
if e.response.status_code == 401:
self.logger.info("Token expired, retrying authentication")
if not self.auth(force=True):
raise NetoAPIError("Re-authentication failed") from e
return func(self, *args, **kwargs)
raise
return wrapper
def auth(self, force: bool = False) -> str:
"""Get or refresh bearer token."""
return self.get_netography_access_token(force=force)
def headers(self, subaccount: str = None, include_content: bool = True) -> dict:
hdr = {
"Accept": "application/json",
"Authorization": f"Bearer {self.access_token}",
}
if include_content:
hdr["Content-Type"] = "application/json"
if subaccount:
hdr["Neto-Shortname"] = subaccount
return hdr
def get_netography_access_token(self, force: bool = False) -> str:
"""
Request a new JWT-signed token or return cached.
If `force`, ignore previous attempts.
"""
if self.auth_attempt and not force:
return self.access_token
self.auth_attempt = True
payload = {
"iat": int(time.time()),
"jti": random.randint(0, 10_000_000),
"appname": self.netosecret.app_name,
"appkey": self.netosecret.app_key,
"shortname": self.netosecret.short_name,
}
jwt_token = jwt.encode(payload, self.netosecret.shared_secret, algorithm="HS256")
self.logger.info(f"Authenticating to {self.netosecret.url}")
try:
resp = requests.post(
f"{self.netosecret.url}/auth/token",
json={"jwt": jwt_token},
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt_token}",
},
)
resp.raise_for_status()
except RequestException as e:
self.logger.error(f"Auth request failed: {e}")
self.auth_attempt = False
return None
data = resp.json()
token = data.get("access_token")
if not token:
self.logger.error("No access_token in response")
self.auth_attempt = False
return None
self.access_token = token
self.logger.info("Authentication successful")
return token4
Create Flow Source
@authenticated_api
def create_flow_source(self, payload: dict, subaccount: str = None) -> dict:
"""Create a new VPC flow source; returns response JSON."""
try:
resp = requests.post(
f"{self.netosecret.url}/vpc", json=payload, headers=self.headers(subaccount)
)
resp.raise_for_status()
except RequestException as e:
msg = f"Failed to create flow source: {e}"
self.logger.error(msg)
raise NetoAPIError(msg) from e
return resp.json()5
Update Flow Source
@authenticated_api
def update_flow_source(self, flow_id: str, payload: dict, subaccount: str = None) -> dict:
"""Update an existing flow source by ID; returns response JSON."""
try:
resp = requests.put(
f"{self.netosecret.url}/vpc/{flow_id}", json=payload, headers=self.headers(subaccount)
)
resp.raise_for_status()
except RequestException as e:
msg = f"Failed to update flow source {flow_id}: {e}"
self.logger.error(msg)
raise NetoAPIError(msg) from e
return resp.json()6
Delete Flow Source
@authenticated_api
def delete_flow_source(self, flow_id: str, subaccount: str = None) -> bool:
"""Delete flow source by ID; returns True on success."""
try:
resp = requests.delete(
f"{self.netosecret.url}/vpc/{flow_id}", headers=self.headers(subaccount)
)
resp.raise_for_status()
except RequestException as e:
msg = f"Failed to delete flow source {flow_id}: {e}"
self.logger.error(msg)
raise NetoAPIError(msg) from e
return resp.status_code in (200, 204)7
Calling the NetoAPI Class
# ----------------------------------------------------------------
# Example usage: authenticate and create a new flow source
# ----------------------------------------------------------------
if __name__ == "__main__":
logger = logging.getLogger("NetoAPIExample")
# Load your NETOSECRET (base64‐encoded JSON) from env var or other secure store
netosecret_b64 = os.getenv("NETOSECRET")
if not netosecret_b64:
logger.error("Please set the NETOSECRET environment variable")
exit(1)
# Instantiate client (will authenticate on init)
try:
api = NetoAPI(netosecret_b64, logger=logger)
except Exception as e:
logger.error(f"Failed to initialize NetoAPI client: {e}")
exit(1)
# Define the payload (equivalent to your curl --data JSON)
flow_source_payload = {
"flowtype": "aws",
"flowresource": "s3",
"enabled": True,
"awsauthtype": "RoleARN",
"role": {
"arn": "arn:aws:iam::1234567890123:role/NetoFlowLogReader"
},
"name": "vpc-01",
"samplerate": 1,
"bucket": "neto-myflowlogs-bucket-example",
"bucketregion": "us-east-1",
"prefix": "myflowlogs-for-vpc-01",
"region": "us-east-1",
"accountid": "1234567890123"
}
# Attempt to create the flow source
try:
result = api.create_flow_source(flow_source_payload)
logger.info("Flow source created successfully:")
print(json.dumps(result, indent=2))
except NetoAPIError as err:
logger.error(f"Error creating flow source: {err}")
exit(1)Previouscurl: Authenticate to API using NETOSECRETNextRetrieve a list of source IP addresses from the blocklist with the API
Last updated