Coverage for Lib/asyncio/mixins.py: 95%

14 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-15 02:02 +0000

1"""Event loop mixins.""" 

2 

3import threading 

4from . import events 

5 

6_global_lock = threading.Lock() 

7 

8 

9class _LoopBoundMixin: 

10 _loop = None 

11 

12 def _get_loop(self): 

13 loop = events._get_running_loop() 

14 

15 if self._loop is None: 

16 with _global_lock: 

17 if self._loop is None: 17 ↛ 19line 17 didn't jump to line 19

18 self._loop = loop 

19 if loop is not self._loop: 

20 raise RuntimeError(f'{self!r} is bound to a different event loop') 

21 return loop