threads.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. __filename__ = "threads.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.1.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import threading
  9. import os
  10. import sys
  11. import trace
  12. import time
  13. import datetime
  14. class threadWithTrace(threading.Thread):
  15. def __init__(self, *args, **keywords):
  16. self.startTime=datetime.datetime.utcnow()
  17. self.isStarted=False
  18. tries=0
  19. while tries<3:
  20. try:
  21. self._args, self._keywords = args, keywords
  22. threading.Thread.__init__(self, *self._args, **self._keywords)
  23. self.killed = False
  24. break
  25. except Exception as e:
  26. print('ERROR: threads.py/__init__ failed - '+str(e))
  27. time.sleep(1)
  28. tries+=1
  29. def start(self):
  30. tries=0
  31. while tries<3:
  32. try:
  33. self.__run_backup = self.run
  34. self.run = self.__run
  35. threading.Thread.start(self)
  36. break
  37. except Exception as e:
  38. print('ERROR: threads.py/start failed - '+str(e))
  39. time.sleep(1)
  40. tries+=1
  41. # note that this is set True even if all tries failed
  42. self.isStarted=True
  43. def __run(self):
  44. sys.settrace(self.globaltrace)
  45. self.__run_backup()
  46. self.run = self.__run_backup
  47. def globaltrace(self, frame, event, arg):
  48. if event == 'call':
  49. return self.localtrace
  50. else:
  51. return None
  52. def localtrace(self, frame, event, arg):
  53. if self.killed:
  54. if event == 'line':
  55. raise SystemExit()
  56. return self.localtrace
  57. def kill(self):
  58. self.killed = True
  59. def clone(self,fn):
  60. return threadWithTrace(target=fn, \
  61. args=self._args, \
  62. daemon=True)
  63. def removeDormantThreads(baseDir: str,threadsList: [],debug: bool) -> None:
  64. """Removes threads whose execution has completed
  65. """
  66. if len(threadsList)==0:
  67. return
  68. dormantThreads=[]
  69. currTime=datetime.datetime.utcnow()
  70. changed=False
  71. # which threads are dormant?
  72. noOfActiveThreads=0
  73. for th in threadsList:
  74. removeThread=False
  75. if th.isStarted:
  76. if not th.is_alive():
  77. if (currTime-th.startTime).total_seconds()>10:
  78. if debug:
  79. print('DEBUG: thread is not alive ten seconds after start')
  80. removeThread=True
  81. # timeout for started threads
  82. if (currTime-th.startTime).total_seconds()>600:
  83. if debug:
  84. print('DEBUG: started thread timed out')
  85. removeThread=True
  86. else:
  87. # timeout for threads which havn't been started
  88. if (currTime-th.startTime).total_seconds()>600:
  89. if debug:
  90. print('DEBUG: unstarted thread timed out')
  91. removeThread=True
  92. if removeThread:
  93. dormantThreads.append(th)
  94. else:
  95. noOfActiveThreads+=1
  96. if debug:
  97. print('DEBUG: '+str(noOfActiveThreads) + ' active threads out of '+str(len(threadsList)))
  98. # remove the dormant threads
  99. dormantCtr=0
  100. for th in dormantThreads:
  101. if debug:
  102. print('DEBUG: Removing dormant thread '+str(dormantCtr))
  103. dormantCtr+=1
  104. threadsList.remove(th)
  105. th.kill()
  106. changed=True
  107. # start scheduled threads
  108. if len(threadsList)<10:
  109. ctr=0
  110. for th in threadsList:
  111. if not th.isStarted:
  112. print('Starting new send thread '+str(ctr))
  113. th.start()
  114. changed=True
  115. break
  116. ctr+=1
  117. if not changed:
  118. return
  119. if debug:
  120. sendLogFilename=baseDir+'/send.csv'
  121. try:
  122. with open(sendLogFilename, "a+") as logFile:
  123. logFile.write(currTime.strftime("%Y-%m-%dT%H:%M:%SZ")+','+str(noOfActiveThreads)+','+str(len(threadsList))+'\n')
  124. except:
  125. pass