Category Archives: Python

Passive DNS mining from PCAP with dpkt & Python

Update 04/14: A friend pointed me to dnssnarf, a project that looks like it was written at a DojoSec meeting by Christopher McBee and then updated a bit later on by Grant Stavely. It uses Scapy (which I hear is really neat if you haven’t played with it). Check Grant’s blog post about dnssnarf out.

So, here is another quickie in case anyone needs it out there in the Intertubes. Say you have a .pcap file, or many .pcap files, and you want to mine the DNS responses out of them so you can build up a passive DNS database and track malicious resolutions to build a list of ban-able IP addresses. This script aims to parse a given .pcap file (tcpdump/wireshark libpcap format) and returns the results of the query types you have interest in.

This script is built around dpkt, a tool by Dug Song, and the contents are heavily inspired by the tutorials present at Jon Oberheide’s site (also a developer of dpkt). Honestly, most of the time writing this was spent understanding how dpkt handled its internal data structures and how to get to the data. The documentation on dpkt is not the most mature, but the source is pretty readable, if you keep the references I mention in the comments at hand. Also, this script was only tested with Python 2.6 and dpkt 1.7 on Linux, it was confirmed to not work on Windows as dpkt appears to have some serious problems with Windows at the moment.

#!/usr/bin/env python

import dpkt, socket, sys

if len(sys.argv) < 2 or len(sys.argv) > 2:
 print "Usage:\n", sys.argv[0], "filename.pcap"
 sys.exit()

f = open(sys.argv[1])
pcap = dpkt.pcap.Reader(f)

for ts, buf in pcap:
 # make sure we are dealing with IP traffic
 # ref: http://www.iana.org/assignments/ethernet-numbers
 try: eth = dpkt.ethernet.Ethernet(buf)
 except: continue
 if eth.type != 2048: continue
 # make sure we are dealing with UDP
 # ref: http://www.iana.org/assignments/protocol-numbers/
 try: ip = eth.data
 except: continue
 if ip.p != 17: continue
 # filter on UDP assigned ports for DNS
 # ref: http://www.iana.org/assignments/port-numbers
 try: udp = ip.data
 except: continue
 if udp.sport != 53 and udp.dport != 53: continue
 # make the dns object out of the udp data and check for it being a RR (answer)
 # and for opcode QUERY (I know, counter-intuitive)
 try: dns = dpkt.dns.DNS(udp.data)
 except: continue
 if dns.qr != dpkt.dns.DNS_R: continue
 if dns.opcode != dpkt.dns.DNS_QUERY: continue
 if dns.rcode != dpkt.dns.DNS_RCODE_NOERR: continue
 if len(dns.an) < 1: continue
 # now we're going to process and spit out responses based on record type
 # ref: http://en.wikipedia.org/wiki/List_of_DNS_record_types
 for answer in dns.an:
   if answer.type == 5:
     print "CNAME request", answer.name, "\tresponse", answer.cname
   elif answer.type == 1:
     print "A request", answer.name, "\tresponse", socket.inet_ntoa(answer.rdata)
   elif answer.type == 12:
     print "PTR request", answer.name, "\tresponse", answer.ptrname
Advertisements

Python Unescape 16-bit Unicode String to File

Archived here for me, maybe someone else will need it. Frequently when our analysts are doing malcode analysis, particularly on malicious PDF documents, they see shellcode in the form of 16-bit Unicode values that are then unescaped into the heap calling the Javascript unescape() function. Problem is, we do most of our malicious Javascript analysis from the command line with Spidermonkey, and it has some truncation issues with unescaping 16-bit Unicode correctly (it handles ASCII just fine). The devs are well aware of the issue, btw, so don’t bother them ;-).

So I wrote a quickie to take a string, massage it to the right byte order, and slap it to STDOUT, which the analyst can then redirect to a file or whatever. If there is a much easier way to do this, I’m all ears.

#!/usr/bin/python

import binascii
import sys
import re

# print usage if args wrong
if len(sys.argv) > 2 or len(sys.argv) < 2:
  print "Usage: " + sys.argv[0] + " <string to decode>"
  print "where string is something like '%u30CC%u4560'"
  print "Keep in mind this only works for unicode 16-bit"
  print "which means 2 bytes (four hexadecimal chars with %u"
  print "in front of them)."
  sys.exit()

# convert string to upper since we don't care
string = sys.argv[1].upper()

# clean up the string for processing, do some rudimentary input validation
if re.findall(r'[^UA-F0-9\\%]', string):
  print "invalid string submitted\nonly the following chars are allowed:"
  print ''' % \ u U A-F a-f 0-9 ' " '''
  sys.exit()
string = string.strip('"').strip("'")
string = re.sub(r'(%|\\)[U]', '', string)

# check one last time that we have only hex
if re.findall(r'[^A-F0-9]', string):
  print "invalid string submitted\nonly the following chars are allowed:"
  print ''' % \ u U A-F a-f 0-9 ' " '''
  sys.exit()

# split up the string, do our stuff with hex
a = []
for i in string: a.append(i)
if len(a) % 4 != 0:
  print "you are missing some characters, must be in groups of 4"
  print "did your copy mess up?"
  sys.exit()
b = ""
while len(a) > 0:
  b1 = a.pop(0) + a.pop(0)
  b2 = a.pop(0) + a.pop(0)
  b = b + b2 + b1

result = binascii.a2b_hex(b)
sys.stdout.write(result)

Bluecoat ProxySG Cache Retrieval Script in Python

So, I was actually looking at this script today and thought folks who use Bluecoat as proxies at their jobs (I get the impression that they are pretty popular) might be interested in checking it out. It’s kind of like a poor-man’s pcap solution for sites that use a robust Bluecoat proxy but don’t have pcap instrumentation everywhere.

If you give this script a URI, and a list of Bluecoat proxies, and some credentials to those proxies, it essentially goes and grabs the URI, writes it to disk and includes some information on the last time it was modified on disk, etc. Sometimes, you can use this to retrieve malicious payload that is otherwise unavailable to you due to take-down by LE or replay-filtering by the adversary.

Print usage with –help, make sure you define your setup variables appropriately before you run it, and I hope you find it useful.

#!/usr/bin/env python
# creds: I wrote most of this, only thing I used for inspiration was this HTML table parser article: http://simbot.wordpress.com/2006/05/17/html-table-parser-using-python/
# though honestly, his parser is much more feature-rich, his code taught me how the HTMLParser class works
# email me at mishley at-sign gmail dot com for cake and/or questions

import sys
import os
import urllib
from HTMLParser import HTMLParser
import optparse
import re
import time

# setup variables
default_proxies = [ "192.168.1.2", "192.168.1.3" ] # default list of proxies to use if -p is not provided
bluecoat_web_port = "3443" # web port to access bluecoat proxy web admin interface
bluecoat_web_user = "username" # username for above interface
bluecoat_web_pass = "password" # password for above interface
bluecoat_proxy_port = "3128" # proxy port to request that a proxy directly proxy a request, may also probably use 80

# parse command line args
parser = optparse.OptionParser()
parser.add_option("-u", "--uri", type="string", action="store", dest="uri", help="URI to retrieve. Must be a file object, not a directory.")
parser.add_option("-p", "--proxyip", type="string", action="append", dest="proxyip", help="Proxy IP addresses to search (defaults to all Bluecoats), can be used multiple times for multiple IP addresses. (if used more than once, --all is assumed)")
parser.add_option("-l", "--log", dest="log", action="store_true", default=False, help="Write file object metadata to log file, <filename>.log.")
parser.add_option("-a", "--all", dest="all", action="store_true", default=False, help="Grab a copy of the file from every proxy on which it is found, not just the first in the list. These files may be identical, use md5sum to check.")
options, args = parser.parse_args()

# input validation
if len(sys.argv) == 1:
        parser.print_help()
        sys.exit()
if options.proxyip and len(options.proxyip) > 1:
	options.all = True
if not options.proxyip:
	options.proxyip = default_proxies
else:
	for i in options.proxyip:
		if re.search('[^0-9\.]', i):
			parser.error("Option --proxyip must use a valid IP address, exiting.")
if not options.uri:
	parser.error("Option --uri is required for use, exiting.")

class proxyopen(urllib.FancyURLopener):
	def prompt_user_passwd(self, host, realm):
		return bluecoat_web_user, bluecoat_web_pass
	def http_error_401(self, url, fp, errcode, errmsg, headers, data=None):
		"""Error 401 -- authentication required. This function supports Basic authentication only."""
		self.tries += 1
		if self.maxtries and self.tries >= self.maxtries:
			self.tries = 0
			return self.http_error_default(url, fp, 500, "HTTPS Basic Auth timed out after "+str(self.maxtries)+" attempts.", headers)
		if not 'www-authenticate' in headers:
			URLopener.http_error_default(self, url, fp, errcode, errmsg, headers)
		stuff = headers['www-authenticate']
		import re
		match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
		if not match:
			URLopener.http_error_default(self, url, fp, errcode, errmsg, headers)
		scheme, realm = match.groups()
		if scheme.lower() != 'basic':
			URLopener.http_error_default(self, url, fp, errcode, errmsg, headers)
		name = 'retry_' + self.type + '_basic_auth'
		if data is None:
			return getattr(self,name)(url, realm)
		else:
			self.tries = 0
			return getattr(self,name)(url, realm, data)

def checkURI(uri="http://www.google.com/favicon.ico", proxyip="192.168.1.2"):
	opener = proxyopen()
	protocol, domainandpath = uri.split('//')
	protocol = protocol.rstrip(':')
	if protocol != 'http':
		sys.exit("Cannot process non-http requests, exiting.")
	try: page = opener.open("https://" + proxyip + ":" + bluecoat_web_port + "/CE/Info/" + protocol + "/" + domainandpath).read()
	except: return "NOCONN_0xDEADBEEF"
	if page.find('Authentication required') > -1: return "NOAUTH_0xDEADBEEF"
	if page.find('0x00000007') == -1 and page.find('CE URL Information') > -1: return page
	else: return "NOTFOUND_0xDEADBEEF"

def fdURI(uri="http://www.google.com/favicon.ico", proxyip="192.168.1.2"):
	proxy = { 'http': 'http://'+proxyip+':'+bluecoat_proxy_port }
	fd = urllib.urlopen(uri, proxies=proxy)
	return fd

class parseTable(HTMLParser):
	def __init__(self):
		HTMLParser.__init__(self)
		self.in_table = 0
		self.in_tr = 0
		self.in_td = 0
		self.tabledata = []
	def handle_starttag(self, tag, attrs):
		if tag == 'table': self.in_table = 1
		if tag == 'tr': self.in_tr = 1
		if tag == 'td': self.in_td = 1
	def handle_data(self, data):
		if self.in_td and self.in_tr and self.in_table:
			self.tabledata.append(data)
	def handle_endtag(self, tag):
		if tag == 'table': self.in_table = 0
		if tag == 'tr': self.in_tr = 0
		if tag == 'td': self.in_td = 0

if __name__ == "__main__":
	filename = options.uri.split('/')[-1]
	for proxy in options.proxyip:
		meta = checkURI(options.uri, proxy)
		if meta == "NOCONN_0xDEADBEEF":
			print "Unable to connect to proxy "+proxy+" via urllib to find URL '"+options.uri+"'."
			continue
		elif meta == "NOTFOUND_0xDEADBEEF":
			print "Unable to locate URL '"+options.uri+"' in proxy "+proxy+"."
			continue
		elif meta == "NOAUTH_0xDEADBEEF":
			print "Unable to authenticate to proxy "+proxy+"."
			continue
		else:
			fd = fdURI(options.uri, proxy)
			outstring = fd.read()
			# we are going to re-grab meta data now that we've potentially
			# modified the last-cached timestamp
			meta = checkURI(options.uri, proxy)
			tableparser = parseTable()
			tableparser.feed(meta)
			tableparser.close()
			parsed = tableparser.tabledata
			tableparser = None
			lastretrieved = time.strftime("%Y%m%d_%H:%M:%S_UTC", time.strptime(' '.join(parsed[9].split()[2:4]), "%m/%d/%Y %H:%M:%S"))
			fullname = filename+"_"+proxy+"_"+lastretrieved
			outfile = open(fullname, 'wb')
			outfile.write(outstring)
			outfile.close()
			fd.close()
			print "Downloaded file '"+fullname+"' successfully."
			if options.log:
				logfile = open(fullname+".log", 'wb')
				j = 0
				for i in parsed:
					j = j + 1
					if j % 2 == 0: logfile.write(i+"\n")
					else: logfile.write(i+" :: ")
				logfile.close()
				print "Successfully wrote metadata to file '"+fullname+".log'."
			if options.all: continue
			else: break

automagic Python urllib basic HTTP authentication

So, I have Python a script here at work that needs to use urllib to grab some pages from a site where HTTP basic access authentication is used. I had to work through some issues on my own after reading the code, and after not finding many references on Google decided to document it here in case someone else wants it. There were two basic problems I had to figure out.

  • By default, if you use urllib.urlopen() to request a page that is protected by HTTP basic auth in IDLE or some other interactive prompt, you are prompted to enter your user name and password using the prompt_user_passwd() function defined in the urllib.FancyURLopener class. If you want to automate your login to the web server, you have to override this method to return the user name and password.
  • The other, harder (for me) to figure out bit, is how to handle timeouts correctly. Since Python expects the authentication to happen manually, it doesn’t do anything to keep the automatically-provided user name and password pair from looping indefinitely if they are incorrect. So we must override the http_error_401() method as well to timeout. Fortunately, we can use the urllib.FancyURLopener attribute maxtries which is defined on instantiation as a limit to the number of authentication tries in the case of an incorrect password. This attribute is originally used by the http_error_302() method to prevent infinite-looping due to redirect recursion. We could end up with slightly less tries to authenticate if we go through a few redirects before getting the 401 error requiring us to authenticate, but since we only need one successful try at authenticating it shouldn’t be a big deal.
  • So basically, we’ll create our own class, inheriting from urllib.FancyURLopener, and overflow those two methods. The code, with the salient bits highlighted:

    class basicAuth(urllib.FancyURLopener):
    	def prompt_user_passwd(self, host, realm):
    		return "our_username", "our_password"
    	def http_error_401(self, url, fp, errcode, errmsg, headers, data=None):
    		"""Error 401 -- authentication required. This function supports Basic authentication only."""
    		self.tries += 1
    		if self.maxtries and self.tries >= self.maxtries:
    			self.tries = 0
    			return self.http_error_default(url, fp, 500, "HTTPS Basic Auth timed out after "+str(self.maxtries)+" attempts.", headers)
    		if not 'www-authenticate' in headers:
    			URLopener.http_error_default(self, url, fp, errcode, errmsg, headers)
    		stuff = headers['www-authenticate']
    		import re
    		match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
    		if not match:
    			URLopener.http_error_default(self, url, fp, errcode, errmsg, headers)
    		scheme, realm = match.groups()
    		if scheme.lower() != 'basic':
    			URLopener.http_error_default(self, url, fp, errcode, errmsg, headers)
    		name = 'retry_' + self.type + '_basic_auth'
    		if data is None:
    			return getattr(self,name)(url, realm)
    		else:
    			self.tries = 0
    			return getattr(self,name)(url, realm, data)