demgirlz/parser.py

435 lines
13 KiB
Python

#!/usr/bin/env python3
import sys
import orjson
import time
import os.path
import multiprocessing
import signal
import re
import traceback
import subprocess
from datetime import datetime, timedelta
from sqlalchemy import exc, func
from sqlalchemy.sql.expression import bindparam
from flask import current_app
from demweb.app import create_app
from demweb.extensions import db
from demweb import models
DEMO_PATH_PREFIX = ''
DEMO_REGEX = re.compile(r'auto-(\d+-\d+)-(\w+)')
def _itery(x):
if x is None:
return ()
if isinstance(x, list):
return x
return (x,)
def remove_chatcolors(message):
output = ''
msglen = len(message)
i = 0
while i < msglen:
c = message[i]
if ord(c) <= 8:
if ord(c) == 7:
i += 6
elif ord(c) == 8:
i += 8
else:
output += c
i += 1
return output
silence_start_re = re.compile(' silence_start: (?P<start>[0-9]+(\.?[0-9]*))$')
silence_end_re = re.compile(' silence_end: (?P<end>[0-9]+(\.?[0-9]*)) ')
total_duration_re = re.compile('size=[^ ]+ time=(?P<hours>[0-9]{2}):(?P<minutes>[0-9]{2}):(?P<seconds>[0-9\.]{5}) bitrate=')
def get_chunk_times(path):
proc = subprocess.Popen([
'ffmpeg',
'-i', path,
'-af', 'silencedetect=n=-80dB:d=0.1',
'-f', 'null',
'-'],
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL
)
output = proc.communicate()[1].decode('utf-8')
lines = output.splitlines()
# Chunks start when silence ends, and chunks end when silence starts.
chunk_starts = []
chunk_ends = []
for line in lines:
silence_start_match = silence_start_re.search(line)
silence_end_match = silence_end_re.search(line)
total_duration_match = total_duration_re.search(line)
if silence_start_match:
start = float(silence_start_match.group('start'))
if start == 0.:
# Ignore initial silence.
continue
chunk_ends.append(start)
if len(chunk_starts) == 0:
# Started with non-silence.
chunk_starts.append(0.)
elif silence_end_match:
chunk_starts.append(float(silence_end_match.group('end')))
elif total_duration_match:
hours = int(total_duration_match.group('hours'))
minutes = int(total_duration_match.group('minutes'))
seconds = float(total_duration_match.group('seconds'))
end_time = hours * 3600 + minutes * 60 + seconds
if len(chunk_starts) == 0:
# No silence found.
chunk_starts.append(0)
if len(chunk_starts) > len(chunk_ends):
if abs(chunk_starts[-1] - end_time) < 0.1:
# Last chunk starts at very the end? nah.
del chunk_starts[-1]
else:
# Finished with non-silence.
chunk_ends.append(end_time)
return list(zip(chunk_starts, chunk_ends))
def parse_demo(path):
jmodified = False
with open(DEMO_PATH_PREFIX + path + '/out.json', 'r') as fp:
data = orjson.loads(fp.read())
demoname = os.path.basename(path)
match = DEMO_REGEX.match(demoname)
starttime = datetime.strptime(match.group(1), '%Y%m%d-%H%M%S')
tickinterval = data['serverinfo']['tickInterval']
session = models.Session(
time=starttime,
length=data['demoheader']['playback_time'],
demoname=demoname,
mapname=match.group(2),
mapmd5=data['serverinfo']['mapMD5'],
servername=data['demoheader']['servername'],
frames=data['demoheader']['playback_frames'],
ticks=data['demoheader']['playback_ticks'],
tickinterval=tickinterval,
dirty=data['demoheader']['dirty'],
playtime=0,
rounds=0,
ct_wins=0,
t_wins=0,
chats=0,
deaths=0,
kills=0,
voice_active=data['voice']['active_time'],
voice_total=data['voice']['total_time'],
silence_chunks=data['voice']['silence'] if 'silence' in data['voice'] else []
)
db.session.add(session)
db.session.commit()
session_playtime = 0
session_chats = 0
session_deaths = 0
session_kills = 0
if not data['players']:
data['players'] = dict()
for guid, obj in data['players'].items():
# convert to seconds
obj['playtime'] *= tickinterval
# try insert new player
while True:
try:
db.session.execute(
models.Player.__table__.insert().prefix_with('IGNORE').values(
guid=guid,
name=None,
first_seen=starttime,
last_seen=starttime,
playtime=0,
chats=0,
deaths=0,
kills=0,
voicetime=0
))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 1')
# update player stats atomically
while True:
try:
db.session.execute(
models.Player.__table__.update(models.Player.guid == guid).values(
playtime = models.Player.playtime + obj['playtime'],
chats = models.Player.chats + obj['chats'],
deaths = models.Player.deaths + obj['deaths'],
kills = models.Player.kills + obj['kills'],
voicetime = models.Player.voicetime + obj['voicetime'],
first_seen = func.least(models.Player.first_seen, starttime),
last_seen = func.greatest(models.Player.first_seen, starttime),
))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 2')
# try insert new player names
while True:
try:
db.session.execute(
models.PlayerNames.__table__.insert().prefix_with('IGNORE').values([
dict(
guid = guid,
name = name,
time = 0
)
for name in obj['names'].keys()
]))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 3')
# update player names atomically
while True:
try:
db.session.execute(
models.PlayerNames.__table__.update()
.where(models.PlayerNames.guid == bindparam('_guid'))
.where(models.PlayerNames.name == bindparam('_name'))
.values(time = models.PlayerNames.time + bindparam('_nametime')),
[
dict(
_guid = guid,
_name = name,
_nametime = nametime
) for name, nametime in obj['names'].items()
]
)
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 4')
# try insert new player sprays
while True:
try:
if obj['sprays']:
db.session.execute(
models.PlayerSprays.__table__.insert().prefix_with('IGNORE').values(
[(guid, spray) for spray in obj['sprays']]
))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 5')
# calculate player voice chunks/timing if possible and not exists
if obj['voicetime'] > 0 and 'voice_chunks' not in obj:
voicefile = DEMO_PATH_PREFIX + path + '/voice/' + guid + '.opus'
if os.path.isfile(voicefile):
obj['voice_chunks'] = get_chunk_times(voicefile)
jmodified = True
# insert new player session
player_session = models.PlayerSession(
player_guid=guid,
session_id=session.id,
playtime=obj['playtime'],
chats=obj['chats'],
deaths=obj['deaths'],
kills=obj['kills'],
voicetime=obj['voicetime'],
voice_chunks=obj['voice_chunks'] if 'voice_chunks' in obj else None
)
db.session.add(player_session)
db.session.commit()
if guid != "BOT":
session_playtime += player_session.playtime
session_chats += player_session.chats
session_deaths += player_session.deaths
session_kills += player_session.kills
session.playtime = session_playtime
session.chats = session_chats
session.deaths = session_deaths
session.kills = session_kills
db.session.commit()
for obj in _itery(data['events']):
if obj['event'] == 'round_start':
session.rounds += 1
elif obj['event'] == 'round_end':
if obj['winner'] == 2:
session.t_wins += 1
elif obj['winner'] == 3:
session.ct_wins += 1
event = models.Event(
player_guid=guid,
session_id=session.id,
time=starttime + timedelta(seconds=obj['tick'] / tickinterval),
event=obj['event'],
data=obj
)
db.session.add(event)
db.session.commit()
for obj in _itery(data['chat']):
msg = obj['msgName']
if msg == '#Cstrike_Name_Change':
nick = obj['msgSender']
msg = 'changed their name to "{}"'.format(obj['msgText'])
elif obj['steamid'] == 'BOT':
nick = 'Console'
msg = obj['msgName'].lstrip('\x01\x07FF0000Console: ')
else:
try:
nick, msg = obj['msgName'][1:].split('\x01', 1)
except Exception:
msg = None
if msg:
msg = msg.lstrip(': ')
if msg:
nick = remove_chatcolors(nick)
msg = remove_chatcolors(msg)
chat = models.Chat(
player_guid=obj['steamid'],
session_id=session.id,
time=starttime + timedelta(seconds=obj['tick'] / tickinterval),
name=nick,
chat=msg
)
db.session.add(chat)
db.session.commit()
if jmodified:
with open(DEMO_PATH_PREFIX + path + '/out.json', 'wb') as fp:
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
def worker(queue, error_queue):
app = create_app()
with app.app_context():
while True:
item = queue.get()
if not item:
queue.task_done()
break
try:
db.session.commit()
parse_demo(item)
except Exception as e:
error_queue.put(item)
print(f'vvv error from: {item} vvv')
traceback.print_exception(type(e), e, e.__traceback__)
print(f'^^^ error from: {item} ^^^')
queue.task_done()
if __name__ == '__main__':
if len(sys.argv) >= 2:
DEMO_PATH_PREFIX = sys.argv[1]
with open('work.txt', 'r') as fp:
work = set(fp.read().splitlines())
with open('done.txt', 'r') as fp:
done = set(fp.read().splitlines())
if True:
done = set()
app = create_app()
with app.app_context():
meta = db.metadata
for table in reversed(meta.sorted_tables):
db.session.execute(table.delete())
db.session.commit()
todo = work - done
q = multiprocessing.JoinableQueue()
q_err = multiprocessing.SimpleQueue()
# populate queue with jobs
print('Populating queue...')
for task in todo:
q.put(task)
# ignore STDINT(^C) and store original sigint handler
# this is so our forked processes ignore ^C and can finish their work cleanly
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
# start workers
print('Starting workers...')
workers = []
for i in range(20):
p = multiprocessing.Process(target=worker, args=(q, q_err))
workers.append(p)
p.start()
# restore original sigint handler (don't ignore ^C)
signal.signal(signal.SIGINT, original_sigint_handler)
# wait for queue to empty or ^C
print('Waiting for jobs to complete or ^C...')
try:
q.join()
except KeyboardInterrupt:
print('Quit!')
# get any jobs which were left in the queue
print('Emptying queue...')
left = set()
while not q.empty():
item = q.get()
left.add(item)
q.task_done()
# send N kill signals for each worker
print('Putting kill jobs into queue...')
for _ in workers:
q.put(None)
q.close()
# wait until workers are dead
print('Waiting for workers to finish...')
for w in workers:
w.join()
# get the failed jobs
print('Emptying failed queue...')
while not q_err.empty():
item = q_err.get()
left.add(item)
q_err.close()
done.update(todo - left)
with open('done.txt', 'w') as fp:
fp.write('\n'.join(done))