#!/usr/bin/env python3 ''' This script will query the DarkOwl Search API and return results found in the last x number of hours, days, or weeks. The amount of time to go back is specified by the command line arguments '-weeks', '-days', and '-hours' Example usage: DO_Search_Monitor.py -days 1 Run once daily to emulate a Monitor with a 24 hour frequency DO_Search_Monitor.py -hours 6 Run every 6 hours to emulate a Monitor with a 6 hour frequency For example, '-hours 6' will run a search with your given paramaters, but only show results crawled in the last 6 hours. Other query paramaters are outlined in the list of tuples in the 'payload' variable in the main function. ''' import requests import hmac import hashlib import base64 import argparse import sys from datetime import datetime, timedelta publicKey = 'PUBLIC_KEY_HERE' privateKey = 'PRIVATE_KEY_HERE' def main(): weeks, days, hours = parse_command_line() date_format = '%Y-%m-%dT%H:%M:%SZ' fromDate = (datetime.utcnow() - timedelta(weeks=weeks, days=days, hours=hours)).strftime(date_format) #API Parameters and values in form of a list of tuples #('API_Parameter', 'Parameter_value') payload = [ ('from', fromDate), ('q', '"this is a test search"') ] print('Your query:') for key, value in payload: print(f'\t{key}: {value}') response = perform_query(payload) print_data(response) def print_data(response): response = response.json() print(f'Total Results: {response["total"]}\n') #example, this just prints the document ID of each result from the query #modify as needed for result in response['results']: print(result['id']) def perform_query(payload): host = 'api.darkowl.com' endpoint = '/api/v1/search' #Generate search string search = payloadToString(payload) url = f'https://{host}{endpoint}{search}' abs_path = endpoint + search r = requests.get(url, headers=generate_headers(abs_path)) if r.status_code != 200: print(r.content) exit(1) return r def generate_headers(abs_path, http_method='GET'): date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') string2hash = http_method + abs_path + date bkey = bytes(source=privateKey, encoding='UTF-8') bpayload = bytes(source=string2hash, encoding='UTF-8') hmacsha1 = hmac.new(bkey, bpayload, hashlib.sha1).digest() base64encoded = base64.b64encode(hmacsha1).decode('UTF-8') auth_header = f'OWL {publicKey}:{base64encoded}' headers = {'Authorization': auth_header, 'X-VISION-DATE': date, 'Accept': 'application/json'} return headers ### Takes a payload (list of tuples) and generates a URL query string def payloadToString(payload): search = '' count = 0 for key, value in payload: if count == 0: search += f'?{key}={value}' count = 1 else: search += f'&{key}={value}' return search def parse_command_line(): description = "This tool will mimic the 'monitor' functionality of the DarkOwl user interface. Set the time (weeks, days, hours) to the value you would use in the UI and with the same query paramaters, will provide the same functionality if run on a regular basis" parser = argparse.ArgumentParser(description=description) parser.add_argument('-weeks', help="Number of weeks", type=int, default=0) parser.add_argument('-days', help="Number of days", type=int, default=0) parser.add_argument('-hours', help="Number of hours", type=int, default=0) if len(sys.argv) < 2: parser.print_help() exit(1) args = parser.parse_args() return args.weeks, args.days, args.hours if __name__ == '__main__': main()