demgirlz/parser.py

380 lines
11 KiB
Python

#!/usr/bin/env python3
import sys
import orjson
import time
import os.path
import multiprocessing
import signal
import re
import traceback
from datetime import datetime, timedelta
from sqlalchemy import exc, func
from sqlalchemy.sql.expression import bindparam
from flask import current_app
from demweb.app import create_app
from demweb.extensions import db
from demweb import models
DEMO_PATH_PREFIX = ''
DEMO_REGEX = re.compile(r'auto-(\d+-\d+)-(\w+)')
def _itery(x):
if x is None:
return ()
if isinstance(x, list):
return x
return (x,)
def remove_chatcolors(message):
output = ''
msglen = len(message)
i = 0
while i < msglen:
c = message[i]
if ord(c) <= 8:
if ord(c) == 7:
i += 6
elif ord(c) == 8:
i += 8
else:
output += c
i += 1
return output
def parse_demo(path):
jmodified = False
with open(DEMO_PATH_PREFIX + path + '/out.json', 'r') as fp:
data = orjson.loads(fp.read())
demoname = os.path.basename(path)
match = DEMO_REGEX.match(demoname)
starttime = datetime.strptime(match.group(1), '%Y%m%d-%H%M%S')
tickinterval = data['serverinfo']['tickInterval']
session = models.Session(
time=starttime,
length=data['demoheader']['playback_time'],
demoname=demoname,
mapname=match.group(2),
mapmd5=data['serverinfo']['mapMD5'],
servername=data['demoheader']['servername'],
frames=data['demoheader']['playback_frames'],
ticks=data['demoheader']['playback_ticks'],
tickinterval=tickinterval,
dirty=data['demoheader']['dirty'],
playtime=0,
rounds=0,
ct_wins=0,
t_wins=0,
chats=0,
deaths=0,
kills=0,
voice_active=data['voice']['active_time'],
voice_total=data['voice']['total_time'],
silence_chunks=data['voice']['silence'] if 'silence' in data['voice'] else []
)
db.session.add(session)
db.session.commit()
session_playtime = 0
session_chats = 0
session_deaths = 0
session_kills = 0
if not data['players']:
data['players'] = dict()
for guid, obj in data['players'].items():
# convert to seconds
obj['playtime'] *= tickinterval
# try insert new player
while True:
try:
db.session.execute(
models.Player.__table__.insert().prefix_with('IGNORE').values(
guid=guid,
name=None,
first_seen=starttime,
last_seen=starttime,
playtime=0,
chats=0,
deaths=0,
kills=0,
voicetime=0
))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 1')
# update player stats atomically
while True:
try:
db.session.execute(
models.Player.__table__.update(models.Player.guid == guid).values(
playtime = models.Player.playtime + obj['playtime'],
chats = models.Player.chats + obj['chats'],
deaths = models.Player.deaths + obj['deaths'],
kills = models.Player.kills + obj['kills'],
voicetime = models.Player.voicetime + obj['voicetime'],
first_seen = func.least(models.Player.first_seen, starttime),
last_seen = func.greatest(models.Player.last_seen, starttime),
))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 2')
# try insert new player names
while True:
try:
db.session.execute(
models.PlayerNames.__table__.insert().prefix_with('IGNORE').values([
dict(
guid = guid,
name = name,
time = 0
)
for name in obj['names'].keys()
]))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 3')
# update player names atomically
while True:
try:
db.session.execute(
models.PlayerNames.__table__.update()
.where(models.PlayerNames.guid == bindparam('_guid'))
.where(models.PlayerNames.name == bindparam('_name'))
.values(time = models.PlayerNames.time + bindparam('_nametime')),
[
dict(
_guid = guid,
_name = name,
_nametime = nametime
) for name, nametime in obj['names'].items()
]
)
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 4')
# try insert new player sprays
while True:
try:
if obj['sprays']:
db.session.execute(
models.PlayerSprays.__table__.insert().prefix_with('IGNORE').values(
[(guid, spray) for spray in obj['sprays']]
))
db.session.commit()
break
except Exception:
db.session.rollback()
print('deadlock 5')
# insert new player session
player_session = models.PlayerSession(
player_guid=guid,
session_id=session.id,
playtime=obj['playtime'],
chats=obj['chats'],
deaths=obj['deaths'],
kills=obj['kills'],
voicetime=obj['voicetime']
)
db.session.add(player_session)
db.session.commit()
if guid != "BOT":
session_playtime += player_session.playtime
session_chats += player_session.chats
session_deaths += player_session.deaths
session_kills += player_session.kills
session.playtime = session_playtime
session.chats = session_chats
session.deaths = session_deaths
session.kills = session_kills
db.session.commit()
for obj in _itery(data['events']):
if obj['event'] == 'round_start':
session.rounds += 1
elif obj['event'] == 'round_end':
if obj['winner'] == 2:
session.t_wins += 1
elif obj['winner'] == 3:
session.ct_wins += 1
# remove redundant info
eventdata = obj.copy()
del eventdata['tick']
del eventdata['event']
if 'steamid' in eventdata:
del eventdata['steamid']
event = models.Event(
player_guid=obj['steamid'] if 'steamid' in obj else None,
session_id=session.id,
tick=obj['tick'],
time=starttime + timedelta(seconds=obj['tick'] * tickinterval),
event=obj['event'],
data=eventdata
)
db.session.add(event)
db.session.commit()
for obj in _itery(data['chat']):
msg = obj['msgName']
if msg == '#Cstrike_Name_Change':
nick = obj['msgSender']
msg = 'changed their name to "{}"'.format(obj['msgText'])
elif obj['steamid'] == 'BOT':
nick = 'Console'
msg = obj['msgName'].lstrip('\x01\x07FF0000Console: ')
else:
try:
nick, msg = obj['msgName'][1:].split('\x01', 1)
except Exception:
msg = None
if msg:
msg = msg.lstrip(': ')
if msg:
nick = remove_chatcolors(nick)
msg = remove_chatcolors(msg)
chat = models.Chat(
player_guid=obj['steamid'],
session_id=session.id,
tick=obj['tick'],
time=starttime + timedelta(seconds=obj['tick'] * tickinterval),
name=nick,
chat=msg
)
db.session.add(chat)
db.session.commit()
if jmodified:
with open(DEMO_PATH_PREFIX + path + '/out.json', 'wb') as fp:
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
def worker(queue, error_queue):
app = create_app()
with app.app_context():
while True:
item = queue.get()
if not item:
queue.task_done()
break
try:
db.session.commit()
parse_demo(item)
except Exception as e:
error_queue.put(item)
print(f'vvv error from: {item} vvv')
traceback.print_exception(type(e), e, e.__traceback__)
print(f'^^^ error from: {item} ^^^')
queue.task_done()
if __name__ == '__main__':
if len(sys.argv) >= 2:
DEMO_PATH_PREFIX = sys.argv[1]
with open('work.txt', 'r') as fp:
work = set(fp.read().splitlines())
with open('done.txt', 'r') as fp:
done = set(fp.read().splitlines())
if False:
done = set()
app = create_app()
with app.app_context():
meta = db.metadata
for table in reversed(meta.sorted_tables):
db.session.execute(table.delete())
db.session.commit()
todo = work - done
q = multiprocessing.JoinableQueue()
q_err = multiprocessing.SimpleQueue()
# populate queue with jobs
print('Populating queue...')
for task in todo:
q.put(task)
# ignore STDINT(^C) and store original sigint handler
# this is so our forked processes ignore ^C and can finish their work cleanly
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
# start workers
print('Starting workers...')
workers = []
for i in range(20):
p = multiprocessing.Process(target=worker, args=(q, q_err))
workers.append(p)
p.start()
# restore original sigint handler (don't ignore ^C)
signal.signal(signal.SIGINT, original_sigint_handler)
# wait for queue to empty or ^C
print('Waiting for jobs to complete or ^C...')
try:
q.join()
except KeyboardInterrupt:
print('Quit!')
# get any jobs which were left in the queue
print('Emptying queue...')
left = set()
while not q.empty():
item = q.get()
left.add(item)
q.task_done()
# send N kill signals for each worker
print('Putting kill jobs into queue...')
for _ in workers:
q.put(None)
q.close()
# wait until workers are dead
print('Waiting for workers to finish...')
for w in workers:
w.join()
# get the failed jobs
print('Emptying failed queue...')
while not q_err.empty():
item = q_err.get()
left.add(item)
q_err.close()
done.update(todo - left)
with open('done.txt', 'w') as fp:
fp.write('\n'.join(done))