Rewrite using mouse library

This commit is contained in:
coolneng 2020-07-28 13:24:50 +02:00
parent 00d24222b1
commit 2f1d586946
Signed by: coolneng
GPG Key ID: 9893DA236405AF57
20 changed files with 1196 additions and 14 deletions

276
mouse/__init__.py Normal file
View File

@ -0,0 +1,276 @@
# -*- coding: utf-8 -*-
"""
mouse
=====
Take full control of your mouse with this small Python library. Hook global events, register hotkeys, simulate mouse movement and clicks, and much more.
_Huge thanks to [Kirill Pavlov](http://kirillpavlov.com/) for donating the package name. If you are looking for the Cheddargetter.com client implementation, [`pip install mouse==0.5.0`](https://pypi.python.org/pypi/mouse/0.5.0)._
## Features
- Global event hook on all mice devices (captures events regardless of focus).
- **Listen** and **sends** mouse events.
- Works with **Windows** and **Linux** (requires sudo).
- **Pure Python**, no C modules to be compiled.
- **Zero dependencies**. Trivial to install and deploy, just copy the files.
- **Python 2 and 3**.
- Includes **high level API** (e.g. [record](#mouse.record) and [play](#mouse.play).
- Events automatically captured in separate thread, doesn't block main program.
- Tested and documented.
This program makes no attempt to hide itself, so don't use it for keyloggers.
## Usage
Install the [PyPI package](https://pypi.python.org/pypi/mouse/):
$ sudo pip install mouse
or clone the repository (no installation required, source files are sufficient):
$ git clone https://github.com/boppreh/mouse
Then check the [API docs](https://github.com/boppreh/mouse#api) to see what features are available.
## Known limitations:
- Events generated under Windows don't report device id (`event.device == None`). [#21](https://github.com/boppreh/keyboard/issues/21)
- To avoid depending on X the Linux parts reads raw device files (`/dev/input/input*`) but this requries root.
- Other applications, such as some games, may register hooks that swallow all key events. In this case `mouse` will be unable to report events.
"""
# TODO
# - infinite wait
# - mouse.on_move
version = '0.7.1'
import time as _time
import platform as _platform
if _platform.system() == 'Windows':
from. import _winmouse as _os_mouse
elif _platform.system() == 'Linux':
from. import _nixmouse as _os_mouse
else:
raise OSError("Unsupported platform '{}'".format(_platform.system()))
from ._mouse_event import ButtonEvent, MoveEvent, WheelEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN, DOUBLE
from ._generic import GenericListener as _GenericListener
_pressed_events = set()
class _MouseListener(_GenericListener):
def init(self):
_os_mouse.init()
def pre_process_event(self, event):
if isinstance(event, ButtonEvent):
if event.event_type in (UP, DOUBLE):
_pressed_events.discard(event.button)
else:
_pressed_events.add(event.button)
return True
def listen(self):
_os_mouse.listen(self.queue)
_listener = _MouseListener()
def is_pressed(button=LEFT):
""" Returns True if the given button is currently pressed. """
_listener.start_if_necessary()
return button in _pressed_events
def press(button=LEFT):
""" Presses the given button (but doesn't release). """
_os_mouse.press(button)
def release(button=LEFT):
""" Releases the given button. """
_os_mouse.release(button)
def click(button=LEFT):
""" Sends a click with the given button. """
_os_mouse.press(button)
_os_mouse.release(button)
def double_click(button=LEFT):
""" Sends a double click with the given button. """
click(button)
click(button)
def right_click():
""" Sends a right click with the given button. """
click(RIGHT)
def wheel(delta=1):
""" Scrolls the wheel `delta` clicks. Sign indicates direction. """
_os_mouse.wheel(delta)
def move(x, y, absolute=True, duration=0):
"""
Moves the mouse. If `absolute`, to position (x, y), otherwise move relative
to the current position. If `duration` is non-zero, animates the movement.
"""
x = int(x)
y = int(y)
# Requires an extra system call on Linux, but `move_relative` is measured
# in millimiters so we would lose precision.
position_x, position_y = get_position()
if not absolute:
x = position_x + x
y = position_y + y
if duration:
start_x = position_x
start_y = position_y
dx = x - start_x
dy = y - start_y
if dx == 0 and dy == 0:
_time.sleep(duration)
else:
# 120 movements per second.
# Round and keep float to ensure float division in Python 2
steps = max(1.0, float(int(duration * 120.0)))
for i in range(int(steps)+1):
move(start_x + dx*i/steps, start_y + dy*i/steps)
_time.sleep(duration/steps)
else:
_os_mouse.move_to(x, y)
def drag(start_x, start_y, end_x, end_y, absolute=True, duration=0):
"""
Holds the left mouse button, moving from start to end position, then
releases. `absolute` and `duration` are parameters regarding the mouse
movement.
"""
if is_pressed():
release()
move(start_x, start_y, absolute, 0)
press()
move(end_x, end_y, absolute, duration)
release()
def on_button(callback, args=(), buttons=(LEFT, MIDDLE, RIGHT, X, X2), types=(UP, DOWN, DOUBLE)):
""" Invokes `callback` with `args` when the specified event happens. """
if not isinstance(buttons, (tuple, list)):
buttons = (buttons,)
if not isinstance(types, (tuple, list)):
types = (types,)
def handler(event):
if isinstance(event, ButtonEvent):
if event.event_type in types and event.button in buttons:
callback(*args)
_listener.add_handler(handler)
return handler
def on_pressed(callback, args=()):
""" Invokes `callback` with `args` when the left button is pressed. """
return on_button(callback, args, [LEFT], [DOWN])
def on_click(callback, args=()):
""" Invokes `callback` with `args` when the left button is clicked. """
return on_button(callback, args, [LEFT], [UP])
def on_double_click(callback, args=()):
"""
Invokes `callback` with `args` when the left button is double clicked.
"""
return on_button(callback, args, [LEFT], [DOUBLE])
def on_right_click(callback, args=()):
""" Invokes `callback` with `args` when the right button is clicked. """
return on_button(callback, args, [RIGHT], [UP])
def on_middle_click(callback, args=()):
""" Invokes `callback` with `args` when the middle button is clicked. """
return on_button(callback, args, [MIDDLE], [UP])
def wait(button=LEFT, target_types=(UP, DOWN, DOUBLE)):
"""
Blocks program execution until the given button performs an event.
"""
from threading import Lock
lock = Lock()
lock.acquire()
handler = on_button(lock.release, (), [button], target_types)
lock.acquire()
_listener.remove_handler(handler)
def get_position():
""" Returns the (x, y) mouse position. """
return _os_mouse.get_position()
def hook(callback):
"""
Installs a global listener on all available mouses, invoking `callback`
each time it is moved, a key status changes or the wheel is spun. A mouse
event is passed as argument, with type either `mouse.ButtonEvent`,
`mouse.WheelEvent` or `mouse.MoveEvent`.
Returns the given callback for easier development.
"""
_listener.add_handler(callback)
return callback
def unhook(callback):
"""
Removes a previously installed hook.
"""
_listener.remove_handler(callback)
def unhook_all():
"""
Removes all hooks registered by this application. Note this may include
hooks installed by high level functions, such as `record`.
"""
del _listener.handlers[:]
def record(button=RIGHT, target_types=(DOWN,)):
"""
Records all mouse events until the user presses the given button.
Then returns the list of events recorded. Pairs well with `play(events)`.
Note: this is a blocking function.
Note: for more details on the mouse hook and events see `hook`.
"""
recorded = []
hook(recorded.append)
wait(button=button, target_types=target_types)
unhook(recorded.append)
return recorded
def play(events, speed_factor=1.0, include_clicks=True, include_moves=True, include_wheel=True):
"""
Plays a sequence of recorded events, maintaining the relative time
intervals. If speed_factor is <= 0 then the actions are replayed as fast
as the OS allows. Pairs well with `record()`.
The parameters `include_*` define if events of that type should be inluded
in the replay or ignored.
"""
last_time = None
for event in events:
if speed_factor > 0 and last_time is not None:
_time.sleep((event.time - last_time) / speed_factor)
last_time = event.time
if isinstance(event, ButtonEvent) and include_clicks:
if event.event_type == UP:
_os_mouse.release(event.button)
else:
_os_mouse.press(event.button)
elif isinstance(event, MoveEvent) and include_moves:
_os_mouse.move_to(event.x, event.y)
elif isinstance(event, WheelEvent) and include_wheel:
_os_mouse.wheel(event.delta)
replay = play
hold = press
if __name__ == '__main__':
print('Recording... Double click to stop and replay.')
play(record())

BIN
mouse/__init__.pyc Normal file

Binary file not shown.

27
mouse/__main__.py Normal file
View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
import mouse
import fileinput
import json
import sys
class_by_name = {
'ButtonEvent': mouse.ButtonEvent,
'WheelEvent': mouse.WheelEvent,
'MoveEvent': mouse.MoveEvent,
}
def print_event_json(event):
# Could use json.dumps(event.__dict__()), but this way we guarantee semantic order.
d = event._asdict()
d['event_class'] = event.__class__.__name__
print(json.dumps(d))
sys.stdout.flush()
mouse.hook(print_event_json)
def load(line):
d = json.loads(line)
class_ = class_by_name[d['event_class']]
del d['event_class']
return class_(**d)
mouse.play(load(line) for line in fileinput.input())

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

72
mouse/_generic.py Normal file
View File

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
from threading import Thread, Lock
import traceback
import functools
try:
from queue import Queue
except ImportError:
from Queue import Queue
class GenericListener(object):
lock = Lock()
def __init__(self):
self.handlers = []
self.listening = False
self.queue = Queue()
def invoke_handlers(self, event):
for handler in self.handlers:
try:
if handler(event):
# Stop processing this hotkey.
return 1
except Exception as e:
traceback.print_exc()
def start_if_necessary(self):
"""
Starts the listening thread if it wasn't already.
"""
self.lock.acquire()
try:
if not self.listening:
self.init()
self.listening = True
self.listening_thread = Thread(target=self.listen)
self.listening_thread.daemon = True
self.listening_thread.start()
self.processing_thread = Thread(target=self.process)
self.processing_thread.daemon = True
self.processing_thread.start()
finally:
self.lock.release()
def pre_process_event(self, event):
raise NotImplementedError('This method should be implemented in the child class.')
def process(self):
"""
Loops over the underlying queue of events and processes them in order.
"""
assert self.queue is not None
while True:
event = self.queue.get()
if self.pre_process_event(event):
self.invoke_handlers(event)
self.queue.task_done()
def add_handler(self, handler):
"""
Adds a function to receive each event captured, starting the capturing
process if necessary.
"""
self.start_if_necessary()
self.handlers.append(handler)
def remove_handler(self, handler):
""" Removes a previously added event handler. """
self.handlers.remove(handler)

BIN
mouse/_generic.pyc Normal file

Binary file not shown.

20
mouse/_mouse_event.py Normal file
View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from collections import namedtuple
LEFT = 'left'
RIGHT = 'right'
MIDDLE = 'middle'
WHEEL = 'wheel'
X = 'x'
X2 = 'x2'
UP = 'up'
DOWN = 'down'
DOUBLE = 'double'
VERTICAL = 'vertical'
HORIZONTAL = 'horizontal'
ButtonEvent = namedtuple('ButtonEvent', ['event_type', 'button', 'time'])
WheelEvent = namedtuple('WheelEvent', ['delta', 'time'])
MoveEvent = namedtuple('MoveEvent', ['x', 'y', 'time'])

BIN
mouse/_mouse_event.pyc Normal file

Binary file not shown.

271
mouse/_mouse_tests.py Normal file
View File

@ -0,0 +1,271 @@
# -*- coding: utf-8 -*-
import unittest
import time
from ._mouse_event import MoveEvent, ButtonEvent, WheelEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN, DOUBLE
import mouse
class FakeOsMouse(object):
def __init__(self):
self.append = None
self.position = (0, 0)
self.queue = None
self.init = lambda: None
def listen(self, queue):
self.listening = True
self.queue = queue
def press(self, button):
self.append((DOWN, button))
def release(self, button):
self.append((UP, button))
def get_position(self):
return self.position
def move_to(self, x, y):
self.append(('move', (x, y)))
self.position = (x, y)
def wheel(self, delta):
self.append(('wheel', delta))
def move_relative(self, x, y):
self.position = (self.position[0] + x, self.position[1] + y)
class TestMouse(unittest.TestCase):
@staticmethod
def setUpClass():
mouse._os_mouse= FakeOsMouse()
mouse._listener.start_if_necessary()
assert mouse._os_mouse.listening
def setUp(self):
self.events = []
mouse._pressed_events.clear()
mouse._os_mouse.append = self.events.append
def tearDown(self):
mouse.unhook_all()
# Make sure there's no spill over between tests.
self.wait_for_events_queue()
def wait_for_events_queue(self):
mouse._listener.queue.join()
def flush_events(self):
self.wait_for_events_queue()
events = list(self.events)
# Ugly, but requried to work in Python2. Python3 has list.clear
del self.events[:]
return events
def press(self, button=LEFT):
mouse._os_mouse.queue.put(ButtonEvent(DOWN, button, time.time()))
self.wait_for_events_queue()
def release(self, button=LEFT):
mouse._os_mouse.queue.put(ButtonEvent(UP, button, time.time()))
self.wait_for_events_queue()
def double_click(self, button=LEFT):
mouse._os_mouse.queue.put(ButtonEvent(DOUBLE, button, time.time()))
self.wait_for_events_queue()
def click(self, button=LEFT):
self.press(button)
self.release(button)
def wheel(self, delta=1):
mouse._os_mouse.queue.put(WheelEvent(delta, time.time()))
self.wait_for_events_queue()
def move(self, x=0, y=0):
mouse._os_mouse.queue.put(MoveEvent(x, y, time.time()))
self.wait_for_events_queue()
def test_hook(self):
events = []
self.press()
mouse.hook(events.append)
self.press()
mouse.unhook(events.append)
self.press()
self.assertEqual(len(events), 1)
def test_is_pressed(self):
self.assertFalse(mouse.is_pressed())
self.press()
self.assertTrue(mouse.is_pressed())
self.release()
self.press(X2)
self.assertFalse(mouse.is_pressed())
self.assertTrue(mouse.is_pressed(X2))
self.press(X2)
self.assertTrue(mouse.is_pressed(X2))
self.release(X2)
self.release(X2)
self.assertFalse(mouse.is_pressed(X2))
def test_buttons(self):
mouse.press()
self.assertEqual(self.flush_events(), [(DOWN, LEFT)])
mouse.release()
self.assertEqual(self.flush_events(), [(UP, LEFT)])
mouse.click()
self.assertEqual(self.flush_events(), [(DOWN, LEFT), (UP, LEFT)])
mouse.double_click()
self.assertEqual(self.flush_events(), [(DOWN, LEFT), (UP, LEFT), (DOWN, LEFT), (UP, LEFT)])
mouse.right_click()
self.assertEqual(self.flush_events(), [(DOWN, RIGHT), (UP, RIGHT)])
mouse.click(RIGHT)
self.assertEqual(self.flush_events(), [(DOWN, RIGHT), (UP, RIGHT)])
mouse.press(X2)
self.assertEqual(self.flush_events(), [(DOWN, X2)])
def test_position(self):
self.assertEqual(mouse.get_position(), mouse._os_mouse.get_position())
def test_move(self):
mouse.move(0, 0)
self.assertEqual(mouse._os_mouse.get_position(), (0, 0))
mouse.move(100, 500)
self.assertEqual(mouse._os_mouse.get_position(), (100, 500))
mouse.move(1, 2, False)
self.assertEqual(mouse._os_mouse.get_position(), (101, 502))
mouse.move(0, 0)
mouse.move(100, 499, True, duration=0.01)
self.assertEqual(mouse._os_mouse.get_position(), (100, 499))
mouse.move(100, 1, False, duration=0.01)
self.assertEqual(mouse._os_mouse.get_position(), (200, 500))
mouse.move(0, 0, False, duration=0.01)
self.assertEqual(mouse._os_mouse.get_position(), (200, 500))
def triggers(self, fn, events, **kwargs):
self.triggered = False
def callback():
self.triggered = True
handler = fn(callback, **kwargs)
for event_type, arg in events:
if event_type == DOWN:
self.press(arg)
elif event_type == UP:
self.release(arg)
elif event_type == DOUBLE:
self.double_click(arg)
elif event_type == 'WHEEL':
self.wheel()
mouse._listener.remove_handler(handler)
return self.triggered
def test_on_button(self):
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, LEFT)]))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, X)]))
self.assertFalse(self.triggers(mouse.on_button, [('WHEEL', '')]))
self.assertFalse(self.triggers(mouse.on_button, [(DOWN, X)], buttons=MIDDLE))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, MIDDLE)], buttons=MIDDLE))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, MIDDLE)], buttons=MIDDLE))
self.assertFalse(self.triggers(mouse.on_button, [(DOWN, MIDDLE)], buttons=MIDDLE, types=UP))
self.assertTrue(self.triggers(mouse.on_button, [(UP, MIDDLE)], buttons=MIDDLE, types=UP))
self.assertTrue(self.triggers(mouse.on_button, [(UP, MIDDLE)], buttons=[MIDDLE, LEFT], types=[UP, DOWN]))
self.assertTrue(self.triggers(mouse.on_button, [(DOWN, LEFT)], buttons=[MIDDLE, LEFT], types=[UP, DOWN]))
self.assertFalse(self.triggers(mouse.on_button, [(UP, X)], buttons=[MIDDLE, LEFT], types=[UP, DOWN]))
def test_ons(self):
self.assertTrue(self.triggers(mouse.on_click, [(UP, LEFT)]))
self.assertFalse(self.triggers(mouse.on_click, [(UP, RIGHT)]))
self.assertFalse(self.triggers(mouse.on_click, [(DOWN, LEFT)]))
self.assertFalse(self.triggers(mouse.on_click, [(DOWN, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_double_click, [(DOUBLE, LEFT)]))
self.assertFalse(self.triggers(mouse.on_double_click, [(DOUBLE, RIGHT)]))
self.assertFalse(self.triggers(mouse.on_double_click, [(DOWN, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_right_click, [(UP, RIGHT)]))
self.assertTrue(self.triggers(mouse.on_middle_click, [(UP, MIDDLE)]))
def test_wait(self):
# If this fails it blocks. Unfortunately, but I see no other way of testing.
from threading import Thread, Lock
lock = Lock()
lock.acquire()
def t():
mouse.wait()
lock.release()
Thread(target=t).start()
self.press()
lock.acquire()
def test_record_play(self):
from threading import Thread, Lock
lock = Lock()
lock.acquire()
def t():
self.recorded = mouse.record(RIGHT)
lock.release()
Thread(target=t).start()
self.click()
self.wheel(5)
self.move(100, 50)
self.press(RIGHT)
lock.acquire()
self.assertEqual(len(self.recorded), 5)
self.assertEqual(self.recorded[0]._replace(time=None), ButtonEvent(DOWN, LEFT, None))
self.assertEqual(self.recorded[1]._replace(time=None), ButtonEvent(UP, LEFT, None))
self.assertEqual(self.recorded[2]._replace(time=None), WheelEvent(5, None))
self.assertEqual(self.recorded[3]._replace(time=None), MoveEvent(100, 50, None))
self.assertEqual(self.recorded[4]._replace(time=None), ButtonEvent(DOWN, RIGHT, None))
mouse.play(self.recorded, speed_factor=0)
events = self.flush_events()
self.assertEqual(len(events), 5)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('wheel', 5))
self.assertEqual(events[3], ('move', (100, 50)))
self.assertEqual(events[4], (DOWN, RIGHT))
mouse.play(self.recorded)
events = self.flush_events()
self.assertEqual(len(events), 5)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('wheel', 5))
self.assertEqual(events[3], ('move', (100, 50)))
self.assertEqual(events[4], (DOWN, RIGHT))
mouse.play(self.recorded, include_clicks=False)
events = self.flush_events()
self.assertEqual(len(events), 2)
self.assertEqual(events[0], ('wheel', 5))
self.assertEqual(events[1], ('move', (100, 50)))
mouse.play(self.recorded, include_moves=False)
events = self.flush_events()
self.assertEqual(len(events), 4)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('wheel', 5))
self.assertEqual(events[3], (DOWN, RIGHT))
mouse.play(self.recorded, include_wheel=False)
events = self.flush_events()
self.assertEqual(len(events), 4)
self.assertEqual(events[0], (DOWN, LEFT))
self.assertEqual(events[1], (UP, LEFT))
self.assertEqual(events[2], ('move', (100, 50)))
self.assertEqual(events[3], (DOWN, RIGHT))
if __name__ == '__main__':
unittest.main()

171
mouse/_nixcommon.py Normal file
View File

@ -0,0 +1,171 @@
# -*- coding: utf-8 -*-
import struct
import os
import atexit
from time import time as now
from threading import Thread
from glob import glob
try:
from queue import Queue
except ImportError:
from Queue import Queue
event_bin_format = 'llHHI'
# Taken from include/linux/input.h
# https://www.kernel.org/doc/Documentation/input/event-codes.txt
EV_SYN = 0x00
EV_KEY = 0x01
EV_REL = 0x02
EV_ABS = 0x03
EV_MSC = 0x04
INVALID_ARGUMENT_ERRNO = 22
def make_uinput():
import fcntl, struct
# Requires uinput driver, but it's usually available.
uinput = open("/dev/uinput", 'wb')
UI_SET_EVBIT = 0x40045564
fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)
UI_SET_KEYBIT = 0x40045565
try:
for i in range(0x300):
fcntl.ioctl(uinput, UI_SET_KEYBIT, i)
except OSError as e:
if e.errno != INVALID_ARGUMENT_ERRNO:
raise e
BUS_USB = 0x03
uinput_user_dev = "80sHHHHi64i64i64i64i"
axis = [0] * 64 * 4
uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
uinput.flush() # Without this you may get Errno 22: Invalid argument.
UI_DEV_CREATE = 0x5501
fcntl.ioctl(uinput, UI_DEV_CREATE)
UI_DEV_DESTROY = 0x5502
#fcntl.ioctl(uinput, UI_DEV_DESTROY)
return uinput
class EventDevice(object):
def __init__(self, path):
self.path = path
self._input_file = None
self._output_file = None
@property
def input_file(self):
if self._input_file is None:
try:
self._input_file = open(self.path, 'rb')
except IOError as e:
if e.strerror == 'Permission denied':
print('Permission denied ({}). You must be sudo to access global events.'.format(self.path))
exit()
def try_close():
try:
self._input_file.close
except:
pass
atexit.register(try_close)
return self._input_file
@property
def output_file(self):
if self._output_file is None:
self._output_file = open(self.path, 'wb')
atexit.register(self._output_file.close)
return self._output_file
def read_event(self):
data = self.input_file.read(struct.calcsize(event_bin_format))
seconds, microseconds, type, code, value = struct.unpack(event_bin_format, data)
return seconds + microseconds / 1e6, type, code, value, self.path
def write_event(self, type, code, value):
integer, fraction = divmod(now(), 1)
seconds = int(integer)
microseconds = int(fraction * 1e6)
data_event = struct.pack(event_bin_format, seconds, microseconds, type, code, value)
# Send a sync event to ensure other programs update.
sync_event = struct.pack(event_bin_format, seconds, microseconds, EV_SYN, 0, 0)
self.output_file.write(data_event + sync_event)
self.output_file.flush()
class AggregatedEventDevice(object):
def __init__(self, devices, output=None):
self.event_queue = Queue()
self.devices = devices
self.output = output or self.devices[0]
def start_reading(device):
while True:
self.event_queue.put(device.read_event())
for device in self.devices:
thread = Thread(target=start_reading, args=[device])
thread.setDaemon(True)
thread.start()
def read_event(self):
return self.event_queue.get(block=True)
def write_event(self, type, code, value):
self.output.write_event(type, code, value)
import re
from collections import namedtuple
DeviceDescription = namedtuple('DeviceDescription', 'event_file is_mouse is_keyboard')
device_pattern = r"""N: Name="([^"]+?)".+?H: Handlers=([^\n]+)"""
def list_devices_from_proc(type_name):
try:
with open('/proc/bus/input/devices') as f:
description = f.read()
except FileNotFoundError:
return
devices = {}
for name, handlers in re.findall(device_pattern, description, re.DOTALL):
path = '/dev/input/event' + re.search(r'event(\d+)', handlers).group(1)
if type_name in handlers:
yield EventDevice(path)
def list_devices_from_by_id(type_name):
for path in glob('/dev/input/by-id/*-event-' + type_name):
yield EventDevice(path)
def aggregate_devices(type_name):
# Some systems have multiple keyboards with different range of allowed keys
# on each one, like a notebook with a "keyboard" device exclusive for the
# power button. Instead of figuring out which keyboard allows which key to
# send events, we create a fake device and send all events through there.
uinput = make_uinput()
fake_device = EventDevice('uinput Fake Device')
fake_device._input_file = uinput
fake_device._output_file = uinput
# We don't aggregate devices from different sources to avoid
# duplicates.
devices_from_proc = list(list_devices_from_proc(type_name))
if devices_from_proc:
return AggregatedEventDevice(devices_from_proc, output=fake_device)
# breaks on mouse for virtualbox
# was getting /dev/input/by-id/usb-VirtualBox_USB_Tablet-event-mouse
devices_from_by_id = list(list_devices_from_by_id(type_name))
if devices_from_by_id:
return AggregatedEventDevice(devices_from_by_id, output=fake_device)
# If no keyboards were found we can only use the fake device to send keys.
return fake_device
def ensure_root():
if os.geteuid() != 0:
raise ImportError('You must be root to use this library on linux.')

BIN
mouse/_nixcommon.pyc Normal file

Binary file not shown.

131
mouse/_nixmouse.py Normal file
View File

@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
import struct
from subprocess import check_output
import re
from ._nixcommon import EV_KEY, EV_REL, EV_MSC, EV_SYN, EV_ABS, aggregate_devices, ensure_root
from ._mouse_event import ButtonEvent, WheelEvent, MoveEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN
import ctypes
import ctypes.util
from ctypes import c_uint32, c_uint, c_int, c_void_p, byref
display = None
window = None
x11 = None
def build_display():
global display, window, x11
if display and window and x11: return
x11 = ctypes.cdll.LoadLibrary(ctypes.util.find_library('X11'))
# Required because we will have multiple threads calling x11,
# such as the listener thread and then main using "move_to".
x11.XInitThreads()
# Explicitly set XOpenDisplay.restype to avoid segfault on 64 bit OS.
# http://stackoverflow.com/questions/35137007/get-mouse-position-on-linux-pure-python
x11.XOpenDisplay.restype = c_void_p
display = c_void_p(x11.XOpenDisplay(0))
window = x11.XDefaultRootWindow(display)
def get_position():
build_display()
root_id, child_id = c_void_p(), c_void_p()
root_x, root_y, win_x, win_y = c_int(), c_int(), c_int(), c_int()
mask = c_uint()
ret = x11.XQueryPointer(display, c_uint32(window), byref(root_id), byref(child_id),
byref(root_x), byref(root_y),
byref(win_x), byref(win_y), byref(mask))
return root_x.value, root_y.value
def move_to(x, y):
build_display()
x11.XWarpPointer(display, None, window, 0, 0, 0, 0, x, y)
x11.XFlush(display)
REL_X = 0x00
REL_Y = 0x01
REL_Z = 0x02
REL_HWHEEL = 0x06
REL_WHEEL = 0x08
ABS_X = 0x00
ABS_Y = 0x01
BTN_MOUSE = 0x110
BTN_LEFT = 0x110
BTN_RIGHT = 0x111
BTN_MIDDLE = 0x112
BTN_SIDE = 0x113
BTN_EXTRA = 0x114
button_by_code = {
BTN_LEFT: LEFT,
BTN_RIGHT: RIGHT,
BTN_MIDDLE: MIDDLE,
BTN_SIDE: X,
BTN_EXTRA: X2,
}
code_by_button = {button: code for code, button in button_by_code.items()}
device = None
def build_device():
global device
if device: return
ensure_root()
device = aggregate_devices('mouse')
init = build_device
def listen(queue):
build_device()
while True:
time, type, code, value, device_id = device.read_event()
if type == EV_SYN or type == EV_MSC:
continue
event = None
arg = None
if type == EV_KEY:
event = ButtonEvent(DOWN if value else UP, button_by_code.get(code, '?'), time)
elif type == EV_REL:
value, = struct.unpack('i', struct.pack('I', value))
if code == REL_WHEEL:
event = WheelEvent(value, time)
elif code in (REL_X, REL_Y):
x, y = get_position()
event = MoveEvent(x, y, time)
if event is None:
# Unknown event type.
continue
queue.put(event)
def press(button=LEFT):
build_device()
device.write_event(EV_KEY, code_by_button[button], 0x01)
def release(button=LEFT):
build_device()
device.write_event(EV_KEY, code_by_button[button], 0x00)
def move_relative(x, y):
build_device()
# Note relative events are not in terms of pixels, but millimeters.
if x < 0:
x += 2**32
if y < 0:
y += 2**32
device.write_event(EV_REL, REL_X, x)
device.write_event(EV_REL, REL_Y, y)
def wheel(delta=1):
build_device()
if delta < 0:
delta += 2**32
device.write_event(EV_REL, REL_WHEEL, delta)
if __name__ == '__main__':
#listen(print)
move_to(100, 200)

BIN
mouse/_nixmouse.pyc Normal file

Binary file not shown.

216
mouse/_winmouse.py Normal file
View File

@ -0,0 +1,216 @@
# -*- coding: utf-8 -*-
import ctypes
import time
from ctypes import c_short, c_char, c_uint8, c_int32, c_int, c_uint, c_uint32, c_long, byref, Structure, CFUNCTYPE, POINTER
from ctypes.wintypes import DWORD, BOOL, HHOOK, MSG, LPWSTR, WCHAR, WPARAM, LPARAM
LPMSG = POINTER(MSG)
import atexit
from ._mouse_event import ButtonEvent, WheelEvent, MoveEvent, LEFT, RIGHT, MIDDLE, X, X2, UP, DOWN, DOUBLE, WHEEL, HORIZONTAL, VERTICAL
#user32 = ctypes.windll.user32
user32 = ctypes.WinDLL('user32', use_last_error = True)
class MSLLHOOKSTRUCT(Structure):
_fields_ = [("x", c_long),
("y", c_long),
('data', c_int32),
('reserved', c_int32),
("flags", DWORD),
("time", c_int),
]
LowLevelMouseProc = CFUNCTYPE(c_int, WPARAM, LPARAM, POINTER(MSLLHOOKSTRUCT))
SetWindowsHookEx = user32.SetWindowsHookExA
#SetWindowsHookEx.argtypes = [c_int, LowLevelMouseProc, c_int, c_int]
SetWindowsHookEx.restype = HHOOK
CallNextHookEx = user32.CallNextHookEx
#CallNextHookEx.argtypes = [c_int , c_int, c_int, POINTER(MSLLHOOKSTRUCT)]
CallNextHookEx.restype = c_int
UnhookWindowsHookEx = user32.UnhookWindowsHookEx
UnhookWindowsHookEx.argtypes = [HHOOK]
UnhookWindowsHookEx.restype = BOOL
GetMessage = user32.GetMessageW
GetMessage.argtypes = [LPMSG, c_int, c_int, c_int]
GetMessage.restype = BOOL
TranslateMessage = user32.TranslateMessage
TranslateMessage.argtypes = [LPMSG]
TranslateMessage.restype = BOOL
DispatchMessage = user32.DispatchMessageA
DispatchMessage.argtypes = [LPMSG]
GetDoubleClickTime = user32.GetDoubleClickTime
# Beware, as of 2016-01-30 the official docs have a very incomplete list.
# This one was compiled from experience and may be incomplete.
WM_MOUSEMOVE = 0x200
WM_LBUTTONDOWN = 0x201
WM_LBUTTONUP = 0x202
WM_LBUTTONDBLCLK = 0x203
WM_RBUTTONDOWN = 0x204
WM_RBUTTONUP = 0x205
WM_RBUTTONDBLCLK = 0x206
WM_MBUTTONDOWN = 0x207
WM_MBUTTONUP = 0x208
WM_MBUTTONDBLCLK = 0x209
WM_MOUSEWHEEL = 0x20A
WM_XBUTTONDOWN = 0x20B
WM_XBUTTONUP = 0x20C
WM_XBUTTONDBLCLK = 0x20D
WM_NCXBUTTONDOWN = 0x00AB
WM_NCXBUTTONUP = 0x00AC
WM_NCXBUTTONDBLCLK = 0x00AD
WM_MOUSEHWHEEL = 0x20E
WM_LBUTTONDOWN = 0x0201
WM_LBUTTONUP = 0x0202
WM_MOUSEMOVE = 0x0200
WM_MOUSEWHEEL = 0x020A
WM_MOUSEHWHEEL = 0x020E
WM_RBUTTONDOWN = 0x0204
WM_RBUTTONUP = 0x0205
buttons_by_wm_code = {
WM_LBUTTONDOWN: (DOWN, LEFT),
WM_LBUTTONUP: (UP, LEFT),
WM_LBUTTONDBLCLK: (DOUBLE, LEFT),
WM_RBUTTONDOWN: (DOWN, RIGHT),
WM_RBUTTONUP: (UP, RIGHT),
WM_RBUTTONDBLCLK: (DOUBLE, RIGHT),
WM_MBUTTONDOWN: (DOWN, MIDDLE),
WM_MBUTTONUP: (UP, MIDDLE),
WM_MBUTTONDBLCLK: (DOUBLE, MIDDLE),
WM_XBUTTONDOWN: (DOWN, X),
WM_XBUTTONUP: (UP, X),
WM_XBUTTONDBLCLK: (DOUBLE, X),
}
MOUSEEVENTF_ABSOLUTE = 0x8000
MOUSEEVENTF_MOVE = 0x1
MOUSEEVENTF_WHEEL = 0x800
MOUSEEVENTF_HWHEEL = 0x1000
MOUSEEVENTF_LEFTDOWN = 0x2
MOUSEEVENTF_LEFTUP = 0x4
MOUSEEVENTF_RIGHTDOWN = 0x8
MOUSEEVENTF_RIGHTUP = 0x10
MOUSEEVENTF_MIDDLEDOWN = 0x20
MOUSEEVENTF_MIDDLEUP = 0x40
MOUSEEVENTF_XDOWN = 0x0080
MOUSEEVENTF_XUP = 0x0100
simulated_mouse_codes = {
(WHEEL, HORIZONTAL): MOUSEEVENTF_HWHEEL,
(WHEEL, VERTICAL): MOUSEEVENTF_WHEEL,
(DOWN, LEFT): MOUSEEVENTF_LEFTDOWN,
(UP, LEFT): MOUSEEVENTF_LEFTUP,
(DOWN, RIGHT): MOUSEEVENTF_RIGHTDOWN,
(UP, RIGHT): MOUSEEVENTF_RIGHTUP,
(DOWN, MIDDLE): MOUSEEVENTF_MIDDLEDOWN,
(UP, MIDDLE): MOUSEEVENTF_MIDDLEUP,
(DOWN, X): MOUSEEVENTF_XDOWN,
(UP, X): MOUSEEVENTF_XUP,
}
NULL = c_int(0)
WHEEL_DELTA = 120
init = lambda: None
previous_button_event = None # defined in global scope
def listen(queue):
def low_level_mouse_handler(nCode, wParam, lParam):
global previous_button_event
struct = lParam.contents
# Can't use struct.time because it's usually zero.
t = time.time()
if wParam == WM_MOUSEMOVE:
event = MoveEvent(struct.x, struct.y, t)
elif wParam == WM_MOUSEWHEEL:
event = WheelEvent(struct.data / (WHEEL_DELTA * (2<<15)), t)
elif wParam in buttons_by_wm_code:
type, button = buttons_by_wm_code.get(wParam, ('?', '?'))
if wParam >= WM_XBUTTONDOWN:
button = {0x10000: X, 0x20000: X2}[struct.data]
event = ButtonEvent(type, button, t)
if (event.event_type == DOWN) and previous_button_event is not None:
# https://msdn.microsoft.com/en-us/library/windows/desktop/gg153548%28v=vs.85%29.aspx?f=255&MSPPError=-2147217396
if event.time - previous_button_event.time <= GetDoubleClickTime() / 1000.0:
event = ButtonEvent(DOUBLE, event.button, event.time)
previous_button_event = event
else:
# Unknown event type.
return CallNextHookEx(NULL, nCode, wParam, lParam)
queue.put(event)
return CallNextHookEx(NULL, nCode, wParam, lParam)
WH_MOUSE_LL = c_int(14)
mouse_callback = LowLevelMouseProc(low_level_mouse_handler)
mouse_hook = SetWindowsHookEx(WH_MOUSE_LL, mouse_callback, NULL, NULL)
# Register to remove the hook when the interpreter exits. Unfortunately a
# try/finally block doesn't seem to work here.
atexit.register(UnhookWindowsHookEx, mouse_hook)
msg = LPMSG()
while not GetMessage(msg, NULL, NULL, NULL):
TranslateMessage(msg)
DispatchMessage(msg)
def _translate_button(button):
if button == X or button == X2:
return X, {X: 0x10000, X2: 0x20000}[button]
else:
return button, 0
def press(button=LEFT):
button, data = _translate_button(button)
code = simulated_mouse_codes[(DOWN, button)]
user32.mouse_event(code, 0, 0, data, 0)
def release(button=LEFT):
button, data = _translate_button(button)
code = simulated_mouse_codes[(UP, button)]
user32.mouse_event(code, 0, 0, data, 0)
def wheel(delta=1):
code = simulated_mouse_codes[(WHEEL, VERTICAL)]
user32.mouse_event(code, 0, 0, int(delta * WHEEL_DELTA), 0)
def move_to(x, y):
user32.SetCursorPos(int(x), int(y))
def move_relative(x, y):
user32.mouse_event(MOUSEEVENTF_MOVE, int(x), int(y), 0, 0)
class POINT(Structure):
_fields_ = [("x", c_long), ("y", c_long)]
def get_position():
point = POINT()
user32.GetCursorPos(byref(point))
return (point.x, point.y)
if __name__ == '__main__':
def p(e):
print(e)
listen(p)

View File

@ -1,23 +1,21 @@
from pynput.mouse import Controller from mouse import get_position, move
from secrets import randbits from random import randint
from time import sleep from time import sleep
def initialize_mouse(): def erratic_movement(screen_width, screen_height):
mouse = Controller() x, y = get_position()
return mouse move(
x + randint(-100, 100) % screen_width, y + randint(-100, 100) % screen_height,
)
def erratic_movement(mouse):
x, y = mouse.position
mouse.move(x + randbits(2), y + randbits(2))
sleep(randbits(2))
def main(): def main():
mouse = initialize_mouse() screen_width = 1920
screen_height = 1080
while True: while True:
erratic_movement(mouse) erratic_movement(screen_width, screen_height)
sleep(randint(2, 10))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -2,4 +2,4 @@
with pkgs; with pkgs;
mkShell { buildInputs = [ python38 python38Packages.pynput ]; } mkShell { buildInputs = [ python38 ]; }