Retrieve a list of source IP addresses from the blocklist with the API
An example of how to authenticate and then use the API to retrieve values from the blocklist.
Enter the API key values
Replace the CHANGEME values with the appropriate API key fields.
#
# Copyright (c) 2022 Netography, Inc. All rights reserved.
#
# A python script to fetch a list of configured IP Labels, using API Keys
#
# Requries pyjwt and requests:
# $ python3 -m pip install pyjwt requests
#
# Usage:
# $ python3 api-key-search-blocks.py
#
import jwt
import requests
import random
import time
import json
import os
from http.client import responses
#~~ BEGIN Configuration ~~~
# API Key Name
APPNAME = 'CHANGEME'
# API Key Secret
APPKEY = 'CHANGEME'
# Shortname
SHORTNAME = 'CHANGEME'
# API Shared Secret
SHARED_SECRET = 'CHANGEME'
API_BASE_URL = 'https://api.netography.com/api/v1'
# Construct the API Post Request
API_URL = API_BASE_URL + '/search/block'
API_POST_BODY = {
"start": -3600000,
"end": 0,
"search": "active == true",
}
# path/filename to cache the JWT auth token
CACHE_FILE = '~/.neto.token'
# ~~ END ConfigurationCheck for a cached JWT bearer token
Check to see if a cached JWT bearer token is already stored in a file, and use it if so.]
# Cache Bearer token and re-use if not expired
jwt_valid = False
access_token = None
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE) as f:
cache_data = json.load(f)
access_token = cache_data['access_token']
expires_in = cache_data['expires_in']
token_date = os.path.getmtime(CACHE_FILE)
expire_timestamp = token_date + expires_in - 60 # account for some clock skew
# print('JWT expires at ' + time.strftime("%Y-%m-%d %H:%M:%S+00:00 (UTC)", time.gmtime(expire_timestamp)) + "\n")
if (time.time() <= expire_timestamp):
jwt_valid = True
if not jwt_valid:Encode a JWT request token
Create a JWT request token as a string to send to API to authenticate. The jwt parameter for the API call is set to this value.
# Generate the JWT request token
payload = {
'iat': int(time.time()),
'jti': random.randint(0,10000000),
'appname': APPNAME,
'appkey': APPKEY,
'shortname': SHORTNAME
}
token = jwt.encode(payload, SHARED_SECRET, algorithm="HS256")
# Create the HTTP POST request with a JSON payload containing the JWT request token
body = {
'jwt': token
}Authenticate to API
Sends a POST to /auth/token with the JWT request token. If successfully authenticates, the JWT bearer token is provided in the access_token field in the response.
try:
resp = requests.post(API_BASE_URL + '/auth/token', json=body)
data = resp.json()
# print(data)
if 'access_token' not in data:
if 'message' in data:
print(f"{str(resp.status_code)} {data['message']}.")
else:
print(f"{str(resp.status_code)} {responses[resp.status_code]}. access_token not found in response")
raise SystemExit(1)
# print("bearer: %s" % (data['access_token']))
access_token = data['access_token']Cache the JWT bearer token to a file
Stores the JWT bearer token to a file cache so it can be reused between API calls that span multiple process sessions without re-authenticating each time.
with open(CACHE_FILE, 'w') as f:
json.dump(data, f)
except Exception as e:
# print(str(e))
print(f"{str(resp.status_code)} {responses[resp.status_code]}. Verify your configuration parameters")
raise SystemExit(1)Make the API call to the /search/block
Using the JWT bearer token to authenticate the call, make an API call to retrieve blocklist data and parse the output.
# Now have a valid Bearer Token, construct the API request
headers = {
'Authorization': 'Bearer ' + access_token
}
try:
resp = requests.post(API_URL, headers=headers, json=API_POST_BODY)
data = resp.json()
# print(data)
if 'data' not in data:
print("data key not found in response data")
raise SystemExit(1)
if not data['data']:
print("The response had no results")
raise SystemExit(1)
for row in data['data']:
print(row['srcip'])
except Exception as e:
print('API Error: ' + str(e))
raise SystemExit(1)Last updated