Implement storedevs, prepare to port littlefs
This commit is contained in:
265
kernel/fs/littlefs/scripts/watch.py
Executable file
265
kernel/fs/littlefs/scripts/watch.py
Executable file
@ -0,0 +1,265 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Traditional watch command, but with higher resolution updates and a bit
|
||||
# different options/output format
|
||||
#
|
||||
# Example:
|
||||
# ./scripts/watch.py -s0.1 date
|
||||
#
|
||||
# Copyright (c) 2022, The littlefs authors.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
import collections as co
|
||||
import errno
|
||||
import fcntl
|
||||
import io
|
||||
import os
|
||||
import pty
|
||||
import re
|
||||
import shutil
|
||||
import struct
|
||||
import subprocess as sp
|
||||
import sys
|
||||
import termios
|
||||
import time
|
||||
|
||||
try:
|
||||
import inotify_simple
|
||||
except ModuleNotFoundError:
|
||||
inotify_simple = None
|
||||
|
||||
|
||||
def openio(path, mode='r', buffering=-1):
|
||||
# allow '-' for stdin/stdout
|
||||
if path == '-':
|
||||
if mode == 'r':
|
||||
return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
|
||||
else:
|
||||
return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
|
||||
else:
|
||||
return open(path, mode, buffering)
|
||||
|
||||
def inotifywait(paths):
|
||||
# wait for interesting events
|
||||
inotify = inotify_simple.INotify()
|
||||
flags = (inotify_simple.flags.ATTRIB
|
||||
| inotify_simple.flags.CREATE
|
||||
| inotify_simple.flags.DELETE
|
||||
| inotify_simple.flags.DELETE_SELF
|
||||
| inotify_simple.flags.MODIFY
|
||||
| inotify_simple.flags.MOVED_FROM
|
||||
| inotify_simple.flags.MOVED_TO
|
||||
| inotify_simple.flags.MOVE_SELF)
|
||||
|
||||
# recurse into directories
|
||||
for path in paths:
|
||||
if os.path.isdir(path):
|
||||
for dir, _, files in os.walk(path):
|
||||
inotify.add_watch(dir, flags)
|
||||
for f in files:
|
||||
inotify.add_watch(os.path.join(dir, f), flags)
|
||||
else:
|
||||
inotify.add_watch(path, flags)
|
||||
|
||||
# wait for event
|
||||
inotify.read()
|
||||
|
||||
class LinesIO:
|
||||
def __init__(self, maxlen=None):
|
||||
self.maxlen = maxlen
|
||||
self.lines = co.deque(maxlen=maxlen)
|
||||
self.tail = io.StringIO()
|
||||
|
||||
# trigger automatic sizing
|
||||
if maxlen == 0:
|
||||
self.resize(0)
|
||||
|
||||
def write(self, s):
|
||||
# note using split here ensures the trailing string has no newline
|
||||
lines = s.split('\n')
|
||||
|
||||
if len(lines) > 1 and self.tail.getvalue():
|
||||
self.tail.write(lines[0])
|
||||
lines[0] = self.tail.getvalue()
|
||||
self.tail = io.StringIO()
|
||||
|
||||
self.lines.extend(lines[:-1])
|
||||
|
||||
if lines[-1]:
|
||||
self.tail.write(lines[-1])
|
||||
|
||||
def resize(self, maxlen):
|
||||
self.maxlen = maxlen
|
||||
if maxlen == 0:
|
||||
maxlen = shutil.get_terminal_size((80, 5))[1]
|
||||
if maxlen != self.lines.maxlen:
|
||||
self.lines = co.deque(self.lines, maxlen=maxlen)
|
||||
|
||||
canvas_lines = 1
|
||||
def draw(self):
|
||||
# did terminal size change?
|
||||
if self.maxlen == 0:
|
||||
self.resize(0)
|
||||
|
||||
# first thing first, give ourself a canvas
|
||||
while LinesIO.canvas_lines < len(self.lines):
|
||||
sys.stdout.write('\n')
|
||||
LinesIO.canvas_lines += 1
|
||||
|
||||
# clear the bottom of the canvas if we shrink
|
||||
shrink = LinesIO.canvas_lines - len(self.lines)
|
||||
if shrink > 0:
|
||||
for i in range(shrink):
|
||||
sys.stdout.write('\r')
|
||||
if shrink-1-i > 0:
|
||||
sys.stdout.write('\x1b[%dA' % (shrink-1-i))
|
||||
sys.stdout.write('\x1b[K')
|
||||
if shrink-1-i > 0:
|
||||
sys.stdout.write('\x1b[%dB' % (shrink-1-i))
|
||||
sys.stdout.write('\x1b[%dA' % shrink)
|
||||
LinesIO.canvas_lines = len(self.lines)
|
||||
|
||||
for i, line in enumerate(self.lines):
|
||||
# move cursor, clear line, disable/reenable line wrapping
|
||||
sys.stdout.write('\r')
|
||||
if len(self.lines)-1-i > 0:
|
||||
sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i))
|
||||
sys.stdout.write('\x1b[K')
|
||||
sys.stdout.write('\x1b[?7l')
|
||||
sys.stdout.write(line)
|
||||
sys.stdout.write('\x1b[?7h')
|
||||
if len(self.lines)-1-i > 0:
|
||||
sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i))
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def main(command, *,
|
||||
lines=0,
|
||||
cat=False,
|
||||
sleep=None,
|
||||
keep_open=False,
|
||||
keep_open_paths=None,
|
||||
exit_on_error=False):
|
||||
returncode = 0
|
||||
try:
|
||||
while True:
|
||||
# reset ring each run
|
||||
if cat:
|
||||
ring = sys.stdout
|
||||
else:
|
||||
ring = LinesIO(lines)
|
||||
|
||||
try:
|
||||
# run the command under a pseudoterminal
|
||||
mpty, spty = pty.openpty()
|
||||
|
||||
# forward terminal size
|
||||
w, h = shutil.get_terminal_size((80, 5))
|
||||
if lines:
|
||||
h = lines
|
||||
fcntl.ioctl(spty, termios.TIOCSWINSZ,
|
||||
struct.pack('HHHH', h, w, 0, 0))
|
||||
|
||||
proc = sp.Popen(command,
|
||||
stdout=spty,
|
||||
stderr=spty,
|
||||
close_fds=False)
|
||||
os.close(spty)
|
||||
mpty = os.fdopen(mpty, 'r', 1)
|
||||
|
||||
while True:
|
||||
try:
|
||||
line = mpty.readline()
|
||||
except OSError as e:
|
||||
if e.errno != errno.EIO:
|
||||
raise
|
||||
break
|
||||
if not line:
|
||||
break
|
||||
|
||||
ring.write(line)
|
||||
if not cat:
|
||||
ring.draw()
|
||||
|
||||
mpty.close()
|
||||
proc.wait()
|
||||
if exit_on_error and proc.returncode != 0:
|
||||
returncode = proc.returncode
|
||||
break
|
||||
except OSError as e:
|
||||
if e.errno != errno.ETXTBSY:
|
||||
raise
|
||||
pass
|
||||
|
||||
# try to inotifywait
|
||||
if keep_open and inotify_simple is not None:
|
||||
if keep_open_paths:
|
||||
paths = set(keep_paths)
|
||||
else:
|
||||
# guess inotify paths from command
|
||||
paths = set()
|
||||
for p in command:
|
||||
for p in {
|
||||
p,
|
||||
re.sub('^-.', '', p),
|
||||
re.sub('^--[^=]+=', '', p)}:
|
||||
if p and os.path.exists(p):
|
||||
paths.add(p)
|
||||
ptime = time.time()
|
||||
inotifywait(paths)
|
||||
# sleep for a minimum amount of time, this helps issues around
|
||||
# rapidly updating files
|
||||
time.sleep(max(0, (sleep or 0.1) - (time.time()-ptime)))
|
||||
else:
|
||||
time.sleep(sleep or 0.1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
if not cat:
|
||||
sys.stdout.write('\n')
|
||||
sys.exit(returncode)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Traditional watch command, but with higher resolution "
|
||||
"updates and a bit different options/output format.",
|
||||
allow_abbrev=False)
|
||||
parser.add_argument(
|
||||
'command',
|
||||
nargs=argparse.REMAINDER,
|
||||
help="Command to run.")
|
||||
parser.add_argument(
|
||||
'-n', '--lines',
|
||||
nargs='?',
|
||||
type=lambda x: int(x, 0),
|
||||
const=0,
|
||||
help="Show this many lines of history. 0 uses the terminal height. "
|
||||
"Defaults to 0.")
|
||||
parser.add_argument(
|
||||
'-z', '--cat',
|
||||
action='store_true',
|
||||
help="Pipe directly to stdout.")
|
||||
parser.add_argument(
|
||||
'-s', '--sleep',
|
||||
type=float,
|
||||
help="Seconds to sleep between runs. Defaults to 0.1.")
|
||||
parser.add_argument(
|
||||
'-k', '--keep-open',
|
||||
action='store_true',
|
||||
help="Try to use inotify to wait for changes.")
|
||||
parser.add_argument(
|
||||
'-K', '--keep-open-path',
|
||||
dest='keep_open_paths',
|
||||
action='append',
|
||||
help="Use this path for inotify. Defaults to guessing.")
|
||||
parser.add_argument(
|
||||
'-e', '--exit-on-error',
|
||||
action='store_true',
|
||||
help="Exit on error.")
|
||||
sys.exit(main(**{k: v
|
||||
for k, v in vars(parser.parse_args()).items()
|
||||
if v is not None}))
|
Reference in New Issue
Block a user