NetoAPI Python class to create traffic sources in Fusion
Decode & Validate NETOSECRET
Base64-decode the input string, parse JSON, and verify all required fields (appname, appkey, sharedsecret, shortname, url). Raises ValueError on failure.
import base64
import json
import logging
import random
import time
from functools import wraps
import os
import json
import jwt
import requests
from requests.exceptions import HTTPError, RequestException
class NetoAPIError(Exception):
"""Exception for NetoAPI operations."""
pass
class NetoSecret:
"""Decode and validate a Base64-encoded NETOSECRET."""
def __init__(self, netosecret: str):
try:
app_data = self.decode(netosecret)
except Exception as e:
raise ValueError("Invalid NETOSECRET") from e
self.app_name = app_data.get("appname")
self.app_key = app_data.get("appkey")
self.shared_secret = app_data.get("sharedsecret")
self.short_name = app_data.get("shortname")
self.url = app_data.get("url")
if any(v is None for v in (self.app_name, self.app_key, self.shared_secret, self.short_name, self.url)):
raise ValueError("NETOSECRET missing required fields")
def decode(self, api_key_base64: str) -> dict:
"""Base64-decode and parse JSON payload."""
decoded = base64.b64decode(api_key_base64).decode()
return json.loads(decoded)Initialize NetoAPI Client
Instantiate NetoAPI with optional pre-set token. If auth_on_init, force an immediate authentication, raising NetoAPIError if it fails.
class NetoAPI:
"""Client for Netography Fusion traffic-source CRUD + auth."""
def __init__(
self,
netosecret_string: str,
logger: logging.Logger = None,
auth_on_init: bool = True,
token: str = None,
):
self.logger = logger or logging.getLogger(__name__)
self.netosecret = NetoSecret(netosecret_string)
self.access_token = token
self.auth_attempt = bool(token)
if auth_on_init and not self.auth_attempt:
if not self.auth(force=True):
raise NetoAPIError("Initial authentication failed")Authentication & Token Management
Use get_netography_access_token(force) to obtain or refresh a bearer token. Implements one-time caching, retry on network errors, logs success/failure.
def authenticated_api(func):
"""Decorator: ensure valid token, retry once on 401."""
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.access_token:
if not self.auth():
raise NetoAPIError("Authentication failed")
try:
return func(self, *args, **kwargs)
except HTTPError as e:
if e.response.status_code == 401:
self.logger.info("Token expired, retrying authentication")
if not self.auth(force=True):
raise NetoAPIError("Re-authentication failed") from e
return func(self, *args, **kwargs)
raise
return wrapper
def auth(self, force: bool = False) -> str:
"""Get or refresh bearer token."""
return self.get_netography_access_token(force=force)
def headers(self, subaccount: str = None, include_content: bool = True) -> dict:
hdr = {
"Accept": "application/json",
"Authorization": f"Bearer {self.access_token}",
}
if include_content:
hdr["Content-Type"] = "application/json"
if subaccount:
hdr["Neto-Shortname"] = subaccount
return hdr
def get_netography_access_token(self, force: bool = False) -> str:
"""
Request a new JWT-signed token or return cached.
If `force`, ignore previous attempts.
"""
if self.auth_attempt and not force:
return self.access_token
self.auth_attempt = True
payload = {
"iat": int(time.time()),
"jti": random.randint(0, 10_000_000),
"appname": self.netosecret.app_name,
"appkey": self.netosecret.app_key,
"shortname": self.netosecret.short_name,
}
jwt_token = jwt.encode(payload, self.netosecret.shared_secret, algorithm="HS256")
self.logger.info(f"Authenticating to {self.netosecret.url}")
try:
resp = requests.post(
f"{self.netosecret.url}/auth/token",
json={"jwt": jwt_token},
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt_token}",
},
)
resp.raise_for_status()
except RequestException as e:
self.logger.error(f"Auth request failed: {e}")
self.auth_attempt = False
return None
data = resp.json()
token = data.get("access_token")
if not token:
self.logger.error("No access_token in response")
self.auth_attempt = False
return None
self.access_token = token
self.logger.info("Authentication successful")
return tokenCreate Flow Source
POST /vpc with the provided payload; on success returns parsed JSON, on failure raises NetoAPIError.
@authenticated_api
def create_flow_source(self, payload: dict, subaccount: str = None) -> dict:
"""Create a new VPC flow source; returns response JSON."""
try:
resp = requests.post(
f"{self.netosecret.url}/vpc", json=payload, headers=self.headers(subaccount)
)
resp.raise_for_status()
except RequestException as e:
msg = f"Failed to create flow source: {e}"
self.logger.error(msg)
raise NetoAPIError(msg) from e
return resp.json()Update Flow Source
PUT /vpc/{id} to modify an existing source; returns parsed JSON or raises NetoAPIError.
@authenticated_api
def update_flow_source(self, flow_id: str, payload: dict, subaccount: str = None) -> dict:
"""Update an existing flow source by ID; returns response JSON."""
try:
resp = requests.put(
f"{self.netosecret.url}/vpc/{flow_id}", json=payload, headers=self.headers(subaccount)
)
resp.raise_for_status()
except RequestException as e:
msg = f"Failed to update flow source {flow_id}: {e}"
self.logger.error(msg)
raise NetoAPIError(msg) from e
return resp.json()Delete Flow Source
DELETE /vpc/{id} to remove a source; returns True on HTTP 200/204, else raises NetoAPIError.
@authenticated_api
def delete_flow_source(self, flow_id: str, subaccount: str = None) -> bool:
"""Delete flow source by ID; returns True on success."""
try:
resp = requests.delete(
f"{self.netosecret.url}/vpc/{flow_id}", headers=self.headers(subaccount)
)
resp.raise_for_status()
except RequestException as e:
msg = f"Failed to delete flow source {flow_id}: {e}"
self.logger.error(msg)
raise NetoAPIError(msg) from e
return resp.status_code in (200, 204)Calling the NetoAPI Class
Read NETOSECRET from environment, instantiate client, authenticate, define payload, and call API
# ----------------------------------------------------------------
# Example usage: authenticate and create a new flow source
# ----------------------------------------------------------------
if __name__ == "__main__":
logger = logging.getLogger("NetoAPIExample")
# Load your NETOSECRET (base64‐encoded JSON) from env var or other secure store
netosecret_b64 = os.getenv("NETOSECRET")
if not netosecret_b64:
logger.error("Please set the NETOSECRET environment variable")
exit(1)
# Instantiate client (will authenticate on init)
try:
api = NetoAPI(netosecret_b64, logger=logger)
except Exception as e:
logger.error(f"Failed to initialize NetoAPI client: {e}")
exit(1)
# Define the payload (equivalent to your curl --data JSON)
flow_source_payload = {
"flowtype": "aws",
"flowresource": "s3",
"enabled": True,
"awsauthtype": "RoleARN",
"role": {
"arn": "arn:aws:iam::1234567890123:role/NetoFlowLogReader"
},
"name": "vpc-01",
"samplerate": 1,
"bucket": "neto-myflowlogs-bucket-example",
"bucketregion": "us-east-1",
"prefix": "myflowlogs-for-vpc-01",
"region": "us-east-1",
"accountid": "1234567890123"
}
# Attempt to create the flow source
try:
result = api.create_flow_source(flow_source_payload)
logger.info("Flow source created successfully:")
print(json.dumps(result, indent=2))
except NetoAPIError as err:
logger.error(f"Error creating flow source: {err}")
exit(1)Last updated