-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
44 lines (30 loc) · 860 Bytes
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import logging
import asyncio
import aiofiles
import os
from functools import partial, wraps
logger = logging.getLogger("Sandbox")
def wrap(func):
@asyncio.coroutine
@wraps(func)
def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return loop.run_in_executor(executor, pfunc)
return run
mkdir = wrap(os.mkdir)
unlink = wrap(os.unlink)
def openReadFIFO(name):
os.mkfifo(name)
return asyncio.create_task(aiofiles.open(name, "rb"))
def openWriteFIFO(name):
os.mkfifo(name)
return asyncio.create_task(aiofiles.open(name, "wb"))
async def enSureDir(path):
if not os.path.isdir(path):
os.mkdir(path)
async def rmdir(path):
os.rmdir(path)
def log(string):
logger.debug(string)