Connection Dropper

Today I was playing with python and ctypes to build a connection dropper… cause we (me and swirl) need a small lib to embed in an our tool.

Before to proceed we need to know how the following windows functions works: GetTcpTable (the GetTcpTable function retrieves the IPv4 TCP connection table) and SetTcpEntry (the SetTcpEntry function sets the state of a TCP connection).

Well so all we have to do is to retrieve the connections list by using GetTcpTable, select the connection we want to drop and then change its status to MIB_TCP_STATE_DELETE_TCB by using SetTcpEntry. Easy… let’s code…

First we have to create some defines…

#connection dropper - defines.py
# by ratsoul

from ctypes import *

BYTE      = c_ubyte
WORD      = c_ushort
DWORD     = c_ulong

ERROR_INSUFFICIENT_BUFFER = 122

CONNECTION_STATUS = {
  1 : "closed",
  2 : "listen",
  5 : "estab",
  9 : "closing"
  }

DROP = MIB_TCP_STATE_DELETE_TCB = 12

class MIB_TCPROW(Structure):
    _fields_ = \
                [
                ("dwState", DWORD),
                ("dwLocalAddr", DWORD),
                ("dwLocalPort", DWORD),
                ("dwRemoteAddr", DWORD),
                ("dwRemotePort", DWORD)
                ]

class MIB_TCPTABLE(Structure):
    _fields_ = \
                [
                ("dwNumEntries", DWORD),
                ("table", MIB_TCPROW * 4000)
                ]

class S_UN_B(Structure):
    _fields_ = \
                [
                ("s_b1", BYTE),
                ("s_b2", BYTE),
                ("s_b3", BYTE),
                ("s_b4", BYTE)
                ]
class S_UN_W(Structure):
    _field_ = \
                [
                ("s_w1", BYTE),
                ("s_w2", BYTE)
                ]

class S_UN(Union):
    _fields_ = \
                [
                ("s_un_b", S_UN_B),
                ("s_un_w", S_UN_W),
                ("s_addr", DWORD)
                ]

class IN_ADDR(Structure):
    _fields_ = [("s_un", S_UN)]

Then we can proceed with the main code…

#connection dropper - cdrop.py
# by ratsoul

from ctypes import *
from defines import *
import socket

#dll
iph = windll.Iphlpapi
ws2 = windll.Ws2_32
msv = windll.msvcrt

#struct
ipaddr = IN_ADDR()
mtable = MIB_TCPTABLE()

#misc
size   = c_ulong(sizeof(mtable))
connections_list = {}

#GetTcpTable
ret = iph.GetTcpTable(byref(mtable), byref(size), True)
if ret == ERROR_INSUFFICIENT_BUFFER:
    #no problem..
    mtable = MIB_TCPTABLE()
    iph.GetTcpTable(byref(mtable), byref(size), True)

mt = mtable.table
id = 0
print "> Connections list:"
for i in range(mtable.dwNumEntries):
    t = mt[i]
    #get status info
    try:
        status = CONNECTION_STATUS[t.dwState]
    except:
        status = "unkn"
    #get local info
    ipaddr.s_un.s_addr = t.dwLocalAddr
    local_address = socket.inet_ntoa(ipaddr)
    local_port = t.dwLocalPort
    #get remote info
    ipaddr.s_un.s_addr = t.dwRemoteAddr
    remote_address = socket.inet_ntoa(ipaddr)
    remote_port = t.dwRemotePort
    #skip local
    if not (remote_address == "0.0.0.0" or  remote_address == "127.0.0.1") :
        #save current MIB_TCPROW
        connections_list[id] = t;
        #print some info about current MIB_TCPROW
        try:
            print "\t%d) link: %s -> %s" %(id, socket.gethostbyaddr(local_address)[0], socket.gethostbyaddr(remote_address)[0])
        except:
            print "\t%d) link: %s -> %s" %(id, socket.gethostbyaddr(local_address)[0], remote_address)
        id += 1
print "> ---"

val = int( raw_input("> connection to drop [0-%d]: " % (len(connections_list)-1)))
conn2drop = connections_list[val]
conn2drop.dwState = DROP
#SetTcpEntry
if iph.SetTcpEntry(byref(conn2drop)):
    print "[!] unable to drop selected connection id=%d (on Vista run as Admin)" % val
else:
    print "[*] bye bye connection id=%d" % val

Full source code here.

Have fun 😉

Advertisements


%d bloggers like this: