Walking Heap Using Pydbg

06:43:00 0 Comments

I'm a big fan of Pydbg. Although it has many awesome features , it also has few limitations. One of them is lack of control over process heap. For a long time I'm thinking of writing something which makes Heap Manipulation / Heap parsing / Traversing using pydbg little easier for reverse engineers. So finally last weekend I wrote couple of small py scripts which can parse Windows 7 process heaps on the fly.
In this blog post I'm going to share one of them.
This is the simplest implementation of HeapWalk() API based on pydbg. Heap walk API enumerates the memory blocks in the specified heap. If you are not very familiar with HeapWalk() API this page has a very good example in C++.
Right now best available tool available for heap analysis is windbg. The script I'm going to share  does something similar to windbg's "!heap -a 0xmyheaphandle" command.
view raw temp.txt hosted with ❤ by GitHub

You can use the function HeapWalk() [@ Line 103] as break point hander in your pydbg script. In below example actually I did something similar.

First I'm running an application (on 32 bit Windows 7) which uses user32!MessageBoxA API somewhere.

After that I'm attaching my pydbg script with that process and setting up a break point at user32!MessageBoxA and also setting up HeapWalk() as the breakpoint handler.

Now whenever the application will make a call to MessageBoxA api our breakpoint handler HeapWalk() will be invoked and it will start traversing all the available process heap and their segments.

Script 1:
view raw HeapWalker.py hosted with ❤ by GitHub

The output of this script will be something similar: https://gist.github.com/debasishm89/1264d7a6726b9e910a5d

Since this script will give you addresses of all all heap blocks and their size, now you should have more control over process heap. You should be able to search for string/data / byets / pointer in process heaps very easily.

Thank you for reading. Hope you've enjoyed :)
0:001> !heap -a 00240000
Index Address Name Debugging options enabled
1: 00240000
Segment at 00240000 to 00340000 (00031000 bytes committed)
Flags: 00000002
ForceFlags: 00000000
Granularity: 8 bytes
Segment Reserve: 00100000
Segment Commit: 00002000
DeCommit Block Thres: 00000800
DeCommit Total Thres: 00002000
Total Free Size: 0000040e
Max. Allocation Size: 7ffdefff
Lock Variable at: 00240138
Next TagIndex: 0000
Maximum TagIndex: 0000
Tag Entries: 00000000
PsuedoTag Entries: 00000000
Virtual Alloc List: 002400a0
Uncommitted ranges: 00240090
00271000: 000cf000 (847872 bytes)
FreeList[ 00 ] at 002400c4: 0026ef88 . 0026bf68
0026bf60: 00010 . 00010 [100] - free
0026ef80: 00108 . 02060 [100] - free
Segment00 at 00240000:
Flags: 00000000
Base: 00240000
First Entry: 00240588
Last Entry: 00340000
Total Pages: 00000100
Total UnCommit: 000000cf
Largest UnCommit:00000000
UnCommitted Ranges: (1)
Heap entries for Segment00 in Heap 00240000
address: psize . size flags state (requested size)
00240000: 00000 . 00588 [101] - busy (587)
00240588: 00588 . 00240 [101] - busy (23f)
002407c8: 00240 . 00020 [101] - busy (18)
002407e8: 00020 . 00ce0 [101] - busy (cd6)
002414c8: 00ce0 . 01540 [101] - busy (1532)
00242a08: 01540 . 00048 [101] - busy (3c)
00242a50: 00048 . 00038 [101] - busy (30)
00242a88: 00038 . 00080 [101] - busy (78)
00242b08: 00080 . 00080 [101] - busy (78)
00242b88: 00080 . 00048 [101] - busy (3c)
00242bd0: 00048 . 00228 [101] - busy (220)
00242df8: 00228 . 00050 [101] - busy (42)
00242e48: 00050 . 00080 [101] - busy (78)
00242ec8: 00080 . 00018 [101] - busy (10)
00242ee0: 00018 . 00050 [101] - busy (46)
00242f30: 00050 . 00080 [101] - busy (78)
00242fb0: 00080 . 00018 [101] - busy (10)
'''

HeapWalker - Walking Windows 7 process heaps using pydbg
Author : Debasish Mandal
Blog :http://www.debasish.in/
Twitter : https://twitter.com/debasishm89

Description : This python script is simplest implementation of Windows HeapWalk() API.
It uses pydbg and allows user to Walk Through debugee process's heaps on the fly.

In this example

The output of this script will be similar to windbg command "!heap -a 0xMyHeapHandle" command.
0:002> !heap -a 00220000
...
...
Heap entries for Segment00 in Heap 00220000
address: psize . size flags state (requested size)
00220000: 00000 . 00588 [101] - busy (587)
00220588: 00588 . 00240 [101] - busy (23f)
002207c8: 00240 . 00020 [101] - busy (18)
002207e8: 00020 . 00ce0 [101] - busy (cd6)
..
And so on.

'''
from pydbg import *
from pydbg.defines import *
from struct import unpack
from struct import pack
def getHeapBlockDetails(dbg,heap_handle, chunk_addr):
'''
Return heap block details: state and size
'''
xor_res = unpack('<L',dbg.read_process_memory( heap_handle+0x50, 4 ))[0] ^ unpack('<L',dbg.read_process_memory( chunk_addr, 4 ))[0]
h = pack('>L',xor_res).encode('hex')
size = int(h[4:],16)*8
state_code = int(h[2:4],16)
if state_code == 1:
state = "Busy"
elif state_code == 0:
state = "Free"
elif state_code == 9:
state = "Busy - Internal"
else:
state = "Unknown"
return size,state
def ReadListEntry(dbg,addr):
'''
Read and return two dword from any pointer(Mostly Blink of _LIST_ENTRY Linked List).
'''
flink = dbg.read_process_memory( addr, 4 ) # +0x010 SegmentListEntry : _LIST_ENTRY
blink = dbg.read_process_memory( addr+4, 4 )
return flink,blink
def getSegmentsIfAny(dbg,heap_hnd):
'''
A bit "hacky" way to find out if the heap has more than one segment in it.
If it has more than one segment, this function is going to return a list with all segment base address.
Other wise it will return a list with only item (the same heap handle)
Example : If heap handle is 04c10000,
0:027> dt _LIST_ENTRY 04c10000+0x010
ntdll!_LIST_ENTRY
[ 0x4010010 - 0x4c100a8 ]
+0x000 Flink : 0x04010010 _LIST_ENTRY [ 0x5420010 - 0x4c10010 ]
+0x004 Blink : 0x04c100a8 _LIST_ENTRY [ 0x4c10010 - 0xa100010 ]

0:027> dt _LIST_ENTRY 0x04c100a8
ntdll!_LIST_ENTRY
[ 0x4c10010 - 0xa100010 ]
+0x000 Flink : 0x04c10010 _LIST_ENTRY [ 0x4010010 - 0x4c100a8 ]
+0x004 Blink : 0x0a100010 _LIST_ENTRY [ 0x4c100a8 - 0x5420010 ]

And so on...
'''
seg_list = []
first_flink,first_blink = ReadListEntry(dbg,heap_hnd+0x010)# 0xheaphandle+0x010 SegmentListEntry : _LIST_ENTRY
if first_flink == first_blink:
seg_list.append(heap_hnd)
else:
# try to find out all available heap segments by iterating through the linked list.
next_blink = first_blink
while 1:
flink , blink = ReadListEntry(dbg,unpack('<L',next_blink)[0])
# Get the exact segment base
if flink.encode('hex')[:2] == "10":
seg_list.append (unpack('<L',flink)[0] - 16)
'''
Otherwise this can also be done.
if unpack('<L',flink)[0] % 16 == 0: # A nasty hack.
seg_list.append (unpack('<L',flink)[0] - 16)
'''
#print hex(unpack('<L',flink)[0]),hex(unpack('<L',blink)[0])
if blink == first_blink:
# Break the loop - End of Linked List (_LIST_ENTRY ) Reached
break
next_blink = blink
return set(seg_list) # Remove any duplicate value if any
def HeapWalk(dbg):
print '[+] Address of PEB : ', hex(dbg.peb)
total_heaps = unpack('<L',dbg.read_process_memory( dbg.peb+0x088, 4 ))[0] # Total number of process heaps
print '[+] Total Number of Process Heaps : ', total_heaps
h_poi = unpack('<L',dbg.read_process_memory( dbg.peb+0x090, 4 ))[0] # Pointer where all the heap handles are present in memory
heaps = []
offset = 0
# Now we read all the heap handles from memory
for i in range(1,total_heaps+1):
heaps.append( int (hex(unpack('<L',dbg.read_process_memory( h_poi+offset, 4 ))[0]),16) )
offset += 4
# Start iterating through all the heaps
for heap in heaps:
print '[+] Walking ',hex(heap)
# Small check to determine if the heap is LFH
heap_type = unpack('<L',dbg.read_process_memory( heap+0x0d4, 4 ))[0]
if heap_type != 0:
print '[+] Heap type : Low Fragmentation Heap'
else:
print '[+] Heap type : Other'
seg_list = getSegmentsIfAny(dbg,heap)
if len(seg_list) > 1:
print '[+] Heap has total',len(seg_list),'segments'
else:
print '[+] Heap has only 1 segment'
for seg in seg_list:
# Try to iterate through all heaps or heap segments.
print '[+] Parsing Segment ',hex(seg),'of Heap :',hex(heap)
next_addr = seg
# Try to iterate through all the available heap blocks.
while 1:
try:
size,state = getHeapBlockDetails(dbg,heap,next_addr)
print '\t Heap',hex(heap),'Segment :',hex(seg),'Block :',hex(next_addr),'Size : ', hex(size), 'User Pointer : ',hex(next_addr + 8) ,'(',state,')'
block_data = dbg.read_process_memory( next_addr+8, size )
'''
# Do this when you need to search for string / Unicode string "DEBASISH" in all heap blocks
if "D\x00E\x00B\x00A\x00S\x00I\x00S\x00H" in block_data:
print '\t Heap Block : ',hex(next_addr), 'Size : ', hex(size), 'User Pointer : ',hex(next_addr + 8) ,'(',state,')'
'''
next_addr = next_addr + size
except Exception,e:
print '\t [+] Uncommited Bytes Reached'
break
return DBG_CONTINUE
def main():
dbg = pydbg()
pid = raw_input ('Enter PID : ')
dbg.attach(int(pid))
print '[+] Attached'
#HeapWalk(dbg)
try:
mb = dbg.func_resolve_debuggee('user32.dll','MessageBoxA')
dbg.bp_set(mb,handler=HeapWalk)
except Exception,e:
print '[+] Failed'
dbg.run()
if __name__ == '__main__':
main()

0 comments :