#!/usr/bin/env python # -*- coding:utf-8 -*- ######################################################### # (C) 2000-2012 NSFOCUS Corporation. All rights Reserved# ######################################################### import time import gc import os from multiprocessing import Process from threading import Thread, RLock mutex = RLock() class Worker(Thread): def __init__(self): super(Worker, self).__init__() def run(self): print 'thread %s is running' % self.ident mutex.acquire(1) print '%s got the lock %s' % (self.ident, mutex) time.sleep(3600) class Master(Process): def __init__(self): super(Master, self).__init__() def run(self): print 'subprocess %s is running' % self.pid print '%s try to get the lock %s' % (self.pid, mutex) mutex.acquire(1) print '%s got the lock %s' % (self.pid, mutex) print 'did not got it' def make_circle_ref0(): a = [] b = [a] a.append(b) print 'refcount of a:', sys.getrefcount(a) print 'refcount of b:', sys.getrefcount(b) class CGcLeak(object): def __init__(self): self._text = '#'*10 def __del__(self): pass def make_circle_ref1(): _gcleak = CGcLeak() _gcleak._self = _gcleak # test_code_1 print 'ref count of _gcleak:%d' % sys.getrefcount(_gcleak) class CGcLeakA(object): def __init__(self): self._text = '#'*10 def __del__(self): pass class CGcLeakB(object): def __init__(self): self._text = '*'*10 def __del__(self): pass def make_circle_ref2(): _a = CGcLeakA() _b = CGcLeakB() _a._b = _b # test_code_2 _b._a = _a # test_code_3 print 'ref count0:a=%d b=%d' % \ (sys.getrefcount(_a), sys.getrefcount(_b)) def demo_1(): import traceback import bdb #show __file__ print bdb.__file__ try: import sys print sys.__file__ except: print traceback.format_exc() #show dir print dir(gc) #show assert print '__debug__ is:', __debug__ try: assert(1==0) except: print traceback.format_exc() def demo_2(): print 'main process %s is running' % os.getpid() worker = Worker() worker.setDaemon(True) worker.start() print 'main process %s is sleeping' % os.getpid() time.sleep(3600) def demo_3(): data = range(1,10000000) wdict = dict(zip(data,data)) def demo_4(): print 'gc.get_threshold:', gc.get_threshold() gc.disable() data = range(1,10000000) wdict = dict(zip(data,data)) def demo_5(): gc.set_debug(gc.DEBUG_LEAK | gc.DEBUG_COLLECTABLE | gc.DEBUG_UNCOLLECTABLE | \ gc.DEBUG_INSTANCES | gc.DEBUG_OBJECTS) make_circle_ref0() #make_circle_ref1() #make_circle_ref2() def demo_6(): import objgraph print dir(objgraph) objgraph.show_most_common_types() objgraph.show_growth() a = [] b = [a] a.append(b) objgraph.show_most_common_types() objgraph.show_growth() objgraph.show_refs([a], filename='refs_a.svg') objgraph.show_refs([b], filename='refs_b.svg') objgraph.show_backrefs([a], filename='backrefs_a.svg') objgraph.show_backrefs([b], filename='backrefs_b.svg') def demo_7(): glist = [] @profile def make_test(): i = 0 while i < 10: glist.append('*'*(1<<20)) time.sleep(0.01) i += 1 make_test() #show multithread def demo_8(): demo_2() #show segmentfault def demo_9(): import sys, os, libxml2 as libxml2 def segv_test(): s = "
" options = libxml2.HTML_PARSE_RECOVER + libxml2.HTML_PARSE_NOERROR + libxml2.HTML_PARSE_NOWARNING doc = libxml2.htmlReadDoc(s, None, 'utf-8', options).doc ctxt = doc.xpathNewContext() nodes = ctxt.xpathEval('//body/node()') nodes.reverse() for note in nodes: nexts = note.xpathEval('node()') note.unlinkNode() note.freeNode() #freeNode会将该节点及其子节点释放掉 nexts[0].unlinkNode() nexts[0].freeNode() #资源已经释放,再次释放会造成段错误 segv_test() #show deadlock def demo_10(): def start_thread(): worker = Worker() worker.setDaemon(True) worker.start() def start_process(): master = Master() master.start() master.join() print 'main process %s is running' % os.getpid() start_thread() start_process() print 'main process %s is sleeping' % os.getpid() time.sleep(3600) if __name__ == '__main__': import sys if len(sys.argv) < 2: print 'usage:python demo.py $target_index' sys.exit(0) target_index = int(sys.argv[1]) func_name = 'demo_%d()' % target_index print 'run %s...' % func_name eval(func_name)