threads.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. __filename__ = "threads.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.0.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import threading
  9. import sys
  10. import trace
  11. import time
  12. import datetime
  13. class threadWithTrace(threading.Thread):
  14. def __init__(self, *args, **keywords):
  15. self.startTime=datetime.datetime.utcnow()
  16. self.isStarted=False
  17. tries=0
  18. while tries<3:
  19. try:
  20. self._args, self._keywords = args, keywords
  21. threading.Thread.__init__(self, *self._args, **self._keywords)
  22. self.killed = False
  23. break
  24. except Exception as e:
  25. print('ERROR: threads.py/__init__ failed - '+str(e))
  26. time.sleep(1)
  27. tries+=1
  28. def start(self):
  29. tries=0
  30. while tries<3:
  31. try:
  32. self.__run_backup = self.run
  33. self.run = self.__run
  34. threading.Thread.start(self)
  35. break
  36. except Exception as e:
  37. print('ERROR: threads.py/start failed - '+str(e))
  38. time.sleep(1)
  39. tries+=1
  40. # note that this is set True even if all tries failed
  41. self.isStarted=True
  42. def __run(self):
  43. sys.settrace(self.globaltrace)
  44. self.__run_backup()
  45. self.run = self.__run_backup
  46. def globaltrace(self, frame, event, arg):
  47. if event == 'call':
  48. return self.localtrace
  49. else:
  50. return None
  51. def localtrace(self, frame, event, arg):
  52. if self.killed:
  53. if event == 'line':
  54. raise SystemExit()
  55. return self.localtrace
  56. def kill(self):
  57. self.killed = True
  58. def clone(self,fn):
  59. return threadWithTrace(target=fn, \
  60. args=self._args, \
  61. daemon=True)
  62. def removeDormantThreads(baseDir: str,threadsList: [],debug: bool) -> None:
  63. """Removes threads whose execution has completed
  64. """
  65. if len(threadsList)==0:
  66. return
  67. dormantThreads=[]
  68. currTime=datetime.datetime.utcnow()
  69. changed=False
  70. # which threads are dormant?
  71. noOfActiveThreads=0
  72. for th in threadsList:
  73. removeThread=False
  74. if th.isStarted:
  75. if not th.is_alive():
  76. if (currTime-th.startTime).total_seconds()>10:
  77. if debug:
  78. print('DEBUG: thread is not alive ten seconds after start')
  79. removeThread=True
  80. # timeout for started threads
  81. if (currTime-th.startTime).total_seconds()>600:
  82. if debug:
  83. print('DEBUG: started thread timed out')
  84. removeThread=True
  85. else:
  86. # timeout for threads which havn't been started
  87. if (currTime-th.startTime).total_seconds()>600:
  88. if debug:
  89. print('DEBUG: unstarted thread timed out')
  90. removeThread=True
  91. if removeThread:
  92. dormantThreads.append(th)
  93. else:
  94. noOfActiveThreads+=1
  95. if debug:
  96. print('DEBUG: '+str(noOfActiveThreads) + ' active threads out of '+str(len(threadsList)))
  97. # remove the dormant threads
  98. dormantCtr=0
  99. for th in dormantThreads:
  100. if debug:
  101. print('DEBUG: Removing dormant thread '+str(dormantCtr))
  102. dormantCtr+=1
  103. threadsList.remove(th)
  104. th.kill()
  105. changed=True
  106. # start scheduled threads
  107. if len(threadsList)<10:
  108. ctr=0
  109. for th in threadsList:
  110. if not th.isStarted:
  111. print('Starting new send thread '+str(ctr))
  112. th.start()
  113. changed=True
  114. break
  115. ctr+=1
  116. if not changed:
  117. return
  118. if debug:
  119. sendLogFilename=baseDir+'/send.csv'
  120. with open(sendLogFilename, "a+") as logFile:
  121. logFile.write(currTime.strftime("%Y-%m-%dT%H:%M:%SZ")+','+str(noOfActiveThreads)+','+str(len(threadsList))+'\n')