172 lines
5.6 KiB
Python
172 lines
5.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
import struct
|
|
import os
|
|
import atexit
|
|
from time import time as now
|
|
from threading import Thread
|
|
from glob import glob
|
|
try:
|
|
from queue import Queue
|
|
except ImportError:
|
|
from Queue import Queue
|
|
|
|
event_bin_format = 'llHHI'
|
|
|
|
# Taken from include/linux/input.h
|
|
# https://www.kernel.org/doc/Documentation/input/event-codes.txt
|
|
EV_SYN = 0x00
|
|
EV_KEY = 0x01
|
|
EV_REL = 0x02
|
|
EV_ABS = 0x03
|
|
EV_MSC = 0x04
|
|
|
|
INVALID_ARGUMENT_ERRNO = 22
|
|
|
|
def make_uinput():
|
|
import fcntl, struct
|
|
|
|
# Requires uinput driver, but it's usually available.
|
|
uinput = open("/dev/uinput", 'wb')
|
|
UI_SET_EVBIT = 0x40045564
|
|
fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)
|
|
|
|
UI_SET_KEYBIT = 0x40045565
|
|
try:
|
|
for i in range(0x300):
|
|
fcntl.ioctl(uinput, UI_SET_KEYBIT, i)
|
|
except OSError as e:
|
|
if e.errno != INVALID_ARGUMENT_ERRNO:
|
|
raise e
|
|
|
|
BUS_USB = 0x03
|
|
uinput_user_dev = "80sHHHHi64i64i64i64i"
|
|
axis = [0] * 64 * 4
|
|
uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
|
|
uinput.flush() # Without this you may get Errno 22: Invalid argument.
|
|
|
|
UI_DEV_CREATE = 0x5501
|
|
fcntl.ioctl(uinput, UI_DEV_CREATE)
|
|
UI_DEV_DESTROY = 0x5502
|
|
#fcntl.ioctl(uinput, UI_DEV_DESTROY)
|
|
|
|
return uinput
|
|
|
|
class EventDevice(object):
|
|
def __init__(self, path):
|
|
self.path = path
|
|
self._input_file = None
|
|
self._output_file = None
|
|
|
|
@property
|
|
def input_file(self):
|
|
if self._input_file is None:
|
|
try:
|
|
self._input_file = open(self.path, 'rb')
|
|
except IOError as e:
|
|
if e.strerror == 'Permission denied':
|
|
print('Permission denied ({}). You must be sudo to access global events.'.format(self.path))
|
|
exit()
|
|
|
|
def try_close():
|
|
try:
|
|
self._input_file.close
|
|
except:
|
|
pass
|
|
atexit.register(try_close)
|
|
return self._input_file
|
|
|
|
@property
|
|
def output_file(self):
|
|
if self._output_file is None:
|
|
self._output_file = open(self.path, 'wb')
|
|
atexit.register(self._output_file.close)
|
|
return self._output_file
|
|
|
|
def read_event(self):
|
|
data = self.input_file.read(struct.calcsize(event_bin_format))
|
|
seconds, microseconds, type, code, value = struct.unpack(event_bin_format, data)
|
|
return seconds + microseconds / 1e6, type, code, value, self.path
|
|
|
|
def write_event(self, type, code, value):
|
|
integer, fraction = divmod(now(), 1)
|
|
seconds = int(integer)
|
|
microseconds = int(fraction * 1e6)
|
|
data_event = struct.pack(event_bin_format, seconds, microseconds, type, code, value)
|
|
|
|
# Send a sync event to ensure other programs update.
|
|
sync_event = struct.pack(event_bin_format, seconds, microseconds, EV_SYN, 0, 0)
|
|
|
|
self.output_file.write(data_event + sync_event)
|
|
self.output_file.flush()
|
|
|
|
class AggregatedEventDevice(object):
|
|
def __init__(self, devices, output=None):
|
|
self.event_queue = Queue()
|
|
self.devices = devices
|
|
self.output = output or self.devices[0]
|
|
def start_reading(device):
|
|
while True:
|
|
self.event_queue.put(device.read_event())
|
|
for device in self.devices:
|
|
thread = Thread(target=start_reading, args=[device])
|
|
thread.setDaemon(True)
|
|
thread.start()
|
|
|
|
def read_event(self):
|
|
return self.event_queue.get(block=True)
|
|
|
|
def write_event(self, type, code, value):
|
|
self.output.write_event(type, code, value)
|
|
|
|
import re
|
|
from collections import namedtuple
|
|
DeviceDescription = namedtuple('DeviceDescription', 'event_file is_mouse is_keyboard')
|
|
device_pattern = r"""N: Name="([^"]+?)".+?H: Handlers=([^\n]+)"""
|
|
def list_devices_from_proc(type_name):
|
|
try:
|
|
with open('/proc/bus/input/devices') as f:
|
|
description = f.read()
|
|
except FileNotFoundError:
|
|
return
|
|
|
|
devices = {}
|
|
for name, handlers in re.findall(device_pattern, description, re.DOTALL):
|
|
path = '/dev/input/event' + re.search(r'event(\d+)', handlers).group(1)
|
|
if type_name in handlers:
|
|
yield EventDevice(path)
|
|
|
|
def list_devices_from_by_id(type_name):
|
|
for path in glob('/dev/input/by-id/*-event-' + type_name):
|
|
yield EventDevice(path)
|
|
|
|
def aggregate_devices(type_name):
|
|
# Some systems have multiple keyboards with different range of allowed keys
|
|
# on each one, like a notebook with a "keyboard" device exclusive for the
|
|
# power button. Instead of figuring out which keyboard allows which key to
|
|
# send events, we create a fake device and send all events through there.
|
|
uinput = make_uinput()
|
|
fake_device = EventDevice('uinput Fake Device')
|
|
fake_device._input_file = uinput
|
|
fake_device._output_file = uinput
|
|
|
|
# We don't aggregate devices from different sources to avoid
|
|
# duplicates.
|
|
|
|
devices_from_proc = list(list_devices_from_proc(type_name))
|
|
if devices_from_proc:
|
|
return AggregatedEventDevice(devices_from_proc, output=fake_device)
|
|
|
|
# breaks on mouse for virtualbox
|
|
# was getting /dev/input/by-id/usb-VirtualBox_USB_Tablet-event-mouse
|
|
devices_from_by_id = list(list_devices_from_by_id(type_name))
|
|
if devices_from_by_id:
|
|
return AggregatedEventDevice(devices_from_by_id, output=fake_device)
|
|
|
|
# If no keyboards were found we can only use the fake device to send keys.
|
|
return fake_device
|
|
|
|
|
|
def ensure_root():
|
|
if os.geteuid() != 0:
|
|
raise ImportError('You must be root to use this library on linux.')
|