threads.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. __filename__ = "threads.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.1.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import threading
  9. import sys
  10. import time
  11. import datetime
  12. class threadWithTrace(threading.Thread):
  13. def __init__(self, *args, **keywords):
  14. self.startTime = datetime.datetime.utcnow()
  15. self.isStarted = False
  16. tries = 0
  17. while tries < 3:
  18. try:
  19. self._args, self._keywords = args, keywords
  20. threading.Thread.__init__(self, *self._args, **self._keywords)
  21. self.killed = False
  22. break
  23. except Exception as e:
  24. print('ERROR: threads.py/__init__ failed - ' + str(e))
  25. time.sleep(1)
  26. tries += 1
  27. def start(self):
  28. tries = 0
  29. while tries < 3:
  30. try:
  31. self.__run_backup = self.run
  32. self.run = self.__run
  33. threading.Thread.start(self)
  34. break
  35. except Exception as e:
  36. print('ERROR: threads.py/start failed - ' + str(e))
  37. time.sleep(1)
  38. tries += 1
  39. # note that this is set True even if all tries failed
  40. self.isStarted = True
  41. def __run(self):
  42. sys.settrace(self.globaltrace)
  43. self.__run_backup()
  44. self.run = self.__run_backup
  45. def globaltrace(self, frame, event, arg):
  46. if event == 'call':
  47. return self.localtrace
  48. else:
  49. return None
  50. def localtrace(self, frame, event, arg):
  51. if self.killed:
  52. if event == 'line':
  53. raise SystemExit()
  54. return self.localtrace
  55. def kill(self):
  56. self.killed = True
  57. def clone(self, fn):
  58. return threadWithTrace(target=fn,
  59. args=self._args,
  60. daemon=True)
  61. def removeDormantThreads(baseDir: str, threadsList: [], debug: bool) -> None:
  62. """Removes threads whose execution has completed
  63. """
  64. if len(threadsList) == 0:
  65. return
  66. dormantThreads = []
  67. currTime = datetime.datetime.utcnow()
  68. changed = False
  69. # which threads are dormant?
  70. noOfActiveThreads = 0
  71. for th in threadsList:
  72. removeThread = False
  73. if th.isStarted:
  74. if not th.is_alive():
  75. if (currTime - th.startTime).total_seconds() > 10:
  76. if debug:
  77. print('DEBUG: ' +
  78. 'thread is not alive ten seconds after start')
  79. removeThread = True
  80. # timeout for started threads
  81. if (currTime - th.startTime).total_seconds() > 600:
  82. if debug:
  83. print('DEBUG: started thread timed out')
  84. removeThread = True
  85. else:
  86. # timeout for threads which havn't been started
  87. if (currTime - th.startTime).total_seconds() > 600:
  88. if debug:
  89. print('DEBUG: unstarted thread timed out')
  90. removeThread = True
  91. if removeThread:
  92. dormantThreads.append(th)
  93. else:
  94. noOfActiveThreads += 1
  95. if debug:
  96. print('DEBUG: ' + str(noOfActiveThreads) +
  97. ' active threads out of ' + str(len(threadsList)))
  98. # remove the dormant threads
  99. dormantCtr = 0
  100. for th in dormantThreads:
  101. if debug:
  102. print('DEBUG: Removing dormant thread ' + str(dormantCtr))
  103. dormantCtr += 1
  104. threadsList.remove(th)
  105. th.kill()
  106. changed = True
  107. # start scheduled threads
  108. if len(threadsList) < 10:
  109. ctr = 0
  110. for th in threadsList:
  111. if not th.isStarted:
  112. print('Starting new send thread ' + str(ctr))
  113. th.start()
  114. changed = True
  115. break
  116. ctr += 1
  117. if not changed:
  118. return
  119. if debug:
  120. sendLogFilename = baseDir + '/send.csv'
  121. try:
  122. with open(sendLogFilename, "a+") as logFile:
  123. logFile.write(currTime.strftime("%Y-%m-%dT%H:%M:%SZ") +
  124. ',' + str(noOfActiveThreads) +
  125. ',' + str(len(threadsList)) + '\n')
  126. except BaseException:
  127. pass