selectpoll.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Written by Bram Cohen
  2. # see LICENSE.txt for license information
  3. from select import select, error
  4. from time import sleep
  5. from types import IntType
  6. from bisect import bisect
  7. POLLIN = 1
  8. POLLOUT = 2
  9. POLLERR = 8
  10. POLLHUP = 16
  11. class poll:
  12. def __init__(self):
  13. self.rlist = []
  14. self.wlist = []
  15. def register(self, f, t):
  16. if type(f) != IntType:
  17. f = f.fileno()
  18. if (t & POLLIN):
  19. insert(self.rlist, f)
  20. else:
  21. remove(self.rlist, f)
  22. if (t & POLLOUT):
  23. insert(self.wlist, f)
  24. else:
  25. remove(self.wlist, f)
  26. def unregister(self, f):
  27. if type(f) != IntType:
  28. f = f.fileno()
  29. remove(self.rlist, f)
  30. remove(self.wlist, f)
  31. def poll(self, timeout = None):
  32. if self.rlist or self.wlist:
  33. try:
  34. r, w, e = select(self.rlist, self.wlist, [], timeout)
  35. except ValueError:
  36. return None
  37. else:
  38. sleep(timeout)
  39. return []
  40. result = []
  41. for s in r:
  42. result.append((s, POLLIN))
  43. for s in w:
  44. result.append((s, POLLOUT))
  45. return result
  46. def remove(list, item):
  47. i = bisect(list, item)
  48. if i > 0 and list[i-1] == item:
  49. del list[i-1]
  50. def insert(list, item):
  51. i = bisect(list, item)
  52. if i == 0 or list[i-1] != item:
  53. list.insert(i, item)
  54. def test_remove():
  55. x = [2, 4, 6]
  56. remove(x, 2)
  57. assert x == [4, 6]
  58. x = [2, 4, 6]
  59. remove(x, 4)
  60. assert x == [2, 6]
  61. x = [2, 4, 6]
  62. remove(x, 6)
  63. assert x == [2, 4]
  64. x = [2, 4, 6]
  65. remove(x, 5)
  66. assert x == [2, 4, 6]
  67. x = [2, 4, 6]
  68. remove(x, 1)
  69. assert x == [2, 4, 6]
  70. x = [2, 4, 6]
  71. remove(x, 7)
  72. assert x == [2, 4, 6]
  73. x = [2, 4, 6]
  74. remove(x, 5)
  75. assert x == [2, 4, 6]
  76. x = []
  77. remove(x, 3)
  78. assert x == []
  79. def test_insert():
  80. x = [2, 4]
  81. insert(x, 1)
  82. assert x == [1, 2, 4]
  83. x = [2, 4]
  84. insert(x, 3)
  85. assert x == [2, 3, 4]
  86. x = [2, 4]
  87. insert(x, 5)
  88. assert x == [2, 4, 5]
  89. x = [2, 4]
  90. insert(x, 2)
  91. assert x == [2, 4]
  92. x = [2, 4]
  93. insert(x, 4)
  94. assert x == [2, 4]
  95. x = [2, 3, 4]
  96. insert(x, 3)
  97. assert x == [2, 3, 4]
  98. x = []
  99. insert(x, 3)
  100. assert x == [3]