I am new to Python as well as threading and had to build an inline script routine for the use within a Open Sesame Experiment. The Routine should open a program, play tones and catch reaction times. It runs OK so far, but I would be glad for pointing out mistakes and any best practice that I did not follow as I want to refreshen my programming skills.
#import os.path
import os
import sys
import time
import datetime
from PIL import Image
from PIL import ImageGrab
import win32con
import win32gui
import win32process
import win32ui
import win32api
import subprocess
import threading
from openexp.synth import synth
import random
import pythoncom, pyHook, ctypes
""" GAME """
#length of game (s)
playlength = 50
""" SOUND """
# number of tones (best with respective to length and time interval!)
nTones = 5
#length of tone (ms)
soundlength = 60
#frequency of tone (hz)
frequency = 1000
#attack time (ms)
attacktime = 0
#decay time (ms)
decaytime = 0
#random time interval for playing (s)
shortest = 10
longest = 15
""" MISC """
#global response Time Params
responseTime=99
toneStart = 19999
tonePlayed = False
responded = False
strokeResponse = 99
firstTone = True
toneStartLog = []
strokeResponseLog = []
responseTimeLog = []
#global KEY_HANDLER (internal use)
STOP_KEY_HANDLER = False
STOP_TETRIS = False
#gameplay variables
gameStarted = 99
gameEnded = 99
keyStrokes = 0
#IntervallTimes for tones (pseudo random)
# !!! Make sure you change the logging as well (bottom mainthread function) when using more intervalls
totalexptime = 45
intervallTimes = [10.0,10.0]
#intervallTimes = [15.689,17.313,15.822,16.227,18.717,17.027,14.401,16.634,15.797,15.357,10.336,11.454,17.464,14.889,16.649,11.421,13.407,10.964,15.613,12.457]
#total seconds: 297.638
#---------------------------------------------------------
#User Settings:
SaveDirectory=r'C:\...'
#---------------------------------------------------------
class KeyHandler(threading.Thread):
# create a hook manager
hm = pyHook.HookManager()
n = 0
def __init__(self):
threading.Thread.__init__(self)
def OnKeyboardCharEvent(self,event):
global STOP_KEY_HANDLER
global strokeResponse
global tonePlayed
global toneStart
global keyStrokes
global tonePlayedLog
global toneStartLog
global strokeResponseLog
global responseTimeLog
global responded
if STOP_KEY_HANDLER:
self.killKey()
if event.Key=='E':
strokeResponse = time.clock()
if (tonePlayed):
print "added to log tic", time.clock()
#asap
responseTime = strokeResponse-toneStart
tonePlayed = False
#log for later experiment logging
toneStartLog.append(toneStart)
strokeResponseLog.append(strokeResponse)
responseTimeLog.append(responseTime)
responded = True
if event.Key=='Return':
#send return to put nothing in highscore
for hwnd in get_hwnds_for_pid (Tetris.pid):
win32gui.PostMessage (hwnd, win32con.WM_KEYDOWN, win32con.VK_RETURN, 0)
#take score screenshot
screenshot() # afaik blocks entire main thread so reaction times do not get affected
time.sleep(0.2)
#send F2 to start new game
for hwnd in get_hwnds_for_pid (Tetris.pid):
win32gui.PostMessage (hwnd, win32con.WM_KEYDOWN, win32con.VK_F2, 0)
keyStrokes = keyStrokes + 1
return True
def killKey(self):
KeyHandler.hm.UnhookKeyboard()
ctypes.windll.user32.PostQuitMessage(0)
def run(self):
# watch for all keyboard events
KeyHandler.hm.KeyDown = self.OnKeyboardCharEvent
# set the hook
KeyHandler.hm.HookKeyboard()
# activate message pipeline
pythoncom.PumpMessages()
class ToneTimer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def playSound(self): #play sound and log reaction time
global soundlength
global frequency
global attacktime
global decaytime
global toneStart
global tonePlayed
global responded
global firstTone
# create synthesizer object - lower freq = lower tone ;), length in ms
mySynth = synth(exp, osc="sine",freq=frequency,length=soundlength,attack=attacktime,decay=decaytime)
mySynth.stop_after(soundlength)
if responded == False and firstTone == False:
toneStartLog.append(-99)
strokeResponseLog.append(-99)
responseTimeLog.append(-99)
mySynth.play()
toneStart = time.clock()
tonePlayed = True
firstTone = False
def run(self):
global STOP_KEY_HANDLER
global STOP_TETRIS
global nTones
global playlength
global shortest
global longest
global gameEnded
global intervallTimes
global totalexptime
# Start = Experiment time start
expStart = time.clock()
# Intervall counter
it=0
# Release time
ptillfivemin=0
# Play all sounds in Intervall times
for x in intervallTimes:
ptillfivemin=ptillfivemin+intervallTimes[it]
time.sleep(intervallTimes[it])
self.playSound()
it = it+1
time.sleep(totalexptime-ptillfivemin)
expEnd = time.clock()
STOP_KEY_HANDLER = True
# backup send key to kill handler - not pretty but works if routine does not stop
for hwnd in get_hwnds_for_pid (Tetris.pid):
win32gui.PostMessage (hwnd, win32con.WM_KEYDOWN, win32con.VK_F5, 0)
# sleep to make sure processing is done
time.sleep(0.1)
# log stuff
logToExperiment()
# sleep to make sure processing is done
time.sleep(0.5)
# kill Tetris window
gameEnded = time.clock()
# take a screenshot to know score
screenshot()
# sleep to make sure processing is done
time.sleep(0.1)
# kill tetris finish routine
finished()
""" ---- Main thread functions ---- """
# get hands on a Windows GUI Window (Tetris) by checking windows handlers
def get_hwnds_for_pid (pid):
def callback (hwnd, hwnds):
if win32gui.IsWindowVisible (hwnd) and win32gui.IsWindowEnabled (hwnd):
_, found_pid = win32process.GetWindowThreadProcessId (hwnd)
if found_pid == pid:
hwnds.append (hwnd)
return True
hwnds = []
win32gui.EnumWindows (callback, hwnds)
return hwnds
# make a screenshot and save to path "SaveDirectory"
def screenshot():
global SaveDirectory
img=ImageGrab.grab()
saveas=os.path.join(SaveDirectory,"VP"+str(exp.get('subject_nr'))+datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")+'.jpg')
print saveas
img.save(saveas)
# finish with closing the Tetris window
def finished():
for hwnd in get_hwnds_for_pid (Tetris.pid):
win32gui.SendMessage (hwnd, win32con.WM_CLOSE, 0, 0)
# log to experiment
def logToExperiment():
global toneStartLog
global strokeResponseLog
global responseTimeLog
global keyStrokes
exp.set('TStart_RTT01',toneStartLog[0])
exp.set('TStart_RTT02',toneStartLog[1])
"""exp.set('TStart_RTT03',toneStartLog[2])
exp.set('TStart_RTT04',toneStartLog[3])
exp.set('TStart_RTT05',toneStartLog[4])
exp.set('TStart_RTT06',toneStartLog[5])
exp.set('TStart_RTT07',toneStartLog[6])
exp.set('TStart_RTT08',toneStartLog[7])
exp.set('TStart_RTT09',toneStartLog[8])
exp.set('TStart_RTT10',toneStartLog[9])
exp.set('TStart_RTT11',toneStartLog[10])
exp.set('TStart_RTT12',toneStartLog[11])
exp.set('TStart_RTT13',toneStartLog[12])
exp.set('TStart_RTT14',toneStartLog[13])
exp.set('TStart_RTT15',toneStartLog[14])
exp.set('TStart_RTT16',toneStartLog[15])
exp.set('TStart_RTT17',toneStartLog[16])
exp.set('TStart_RTT18',toneStartLog[17])
exp.set('TStart_RTT19',toneStartLog[18])
exp.set('TStart_RTT20',toneStartLog[19])"""
exp.set('SResponse_RTT01',strokeResponseLog[0])
exp.set('SResponse_RTT02',strokeResponseLog[1])
"""18 more variables that were commented were deleted for so"""
exp.set('ResponseTime_RTT01',responseTimeLog[0])
exp.set('ResponseTime_RTT02',responseTimeLog[1])
"""18 more variables that were commented were deleted for so"""
exp.set('keyStrokes_RTT',keyStrokes)
exp.set('playtime_Tetris', gameEnded-gameStarted)
exp.set('APM_RTT', ((keyStrokes/(gameEnded-gameStarted))*60))
print 'keyStrokes', keyStrokes
""" ---- Main Routine ---- """
# open Tetris
Tetris = subprocess.Popen (["C:...\Tetris.exe"])
# sleep to give the window time to appear
time.sleep (2.0)
# start game by sending "F2" key
for hwnd in get_hwnds_for_pid (Tetris.pid):
win32gui.PostMessage (hwnd, win32con.WM_KEYDOWN, win32con.VK_F2, 0)
gameStarted = time.clock()
# create and setup threads - routine finishes when STOP_KEY_HANDLER is True after timout and key gets invoked
keyH = KeyHandler()
ToneT = ToneTimer()
keyH.setDaemon(True)
ToneT.setDaemon(True)
keyH.start()
ToneT.start()