#!/usr/bin/env python3 import sys import orjson import time import os.path import multiprocessing import signal import re import traceback from datetime import datetime, timedelta from sqlalchemy import exc, func from sqlalchemy.sql.expression import bindparam from flask import current_app from demweb.app import create_app from demweb.extensions import db from demweb import models DEMO_PATH_PREFIX = '' DEMO_REGEX = re.compile(r'auto-(\d+-\d+)-(\w+)') def _itery(x): if x is None: return () if isinstance(x, list): return x return (x,) def remove_chatcolors(message): output = '' msglen = len(message) i = 0 while i < msglen: c = message[i] if ord(c) <= 8: if ord(c) == 7: i += 6 elif ord(c) == 8: i += 8 else: output += c i += 1 return output def parse_demo(path): jmodified = False with open(DEMO_PATH_PREFIX + path + '/out.json', 'r') as fp: data = orjson.loads(fp.read()) demoname = os.path.basename(path) match = DEMO_REGEX.match(demoname) starttime = datetime.strptime(match.group(1), '%Y%m%d-%H%M%S') tickinterval = data['serverinfo']['tickInterval'] session = models.Session( time=starttime, length=data['demoheader']['playback_time'], demoname=demoname, mapname=match.group(2), mapmd5=data['serverinfo']['mapMD5'], servername=data['demoheader']['servername'], frames=data['demoheader']['playback_frames'], ticks=data['demoheader']['playback_ticks'], tickinterval=tickinterval, dirty=data['demoheader']['dirty'], playtime=0, rounds=0, ct_wins=0, t_wins=0, chats=0, deaths=0, kills=0, voice_active=data['voice']['active_time'], voice_total=data['voice']['total_time'], silence_chunks=data['voice']['silence'] if 'silence' in data['voice'] else [] ) db.session.add(session) db.session.commit() session_playtime = 0 session_chats = 0 session_deaths = 0 session_kills = 0 if not data['players']: data['players'] = dict() for guid, obj in data['players'].items(): # convert to seconds obj['playtime'] *= tickinterval # try insert new player while True: try: db.session.execute( models.Player.__table__.insert().prefix_with('IGNORE').values( guid=guid, name=None, first_seen=starttime, last_seen=starttime, playtime=0, chats=0, deaths=0, kills=0, voicetime=0 )) db.session.commit() break except Exception: db.session.rollback() print('deadlock 1') # update player stats atomically while True: try: db.session.execute( models.Player.__table__.update(models.Player.guid == guid).values( playtime = models.Player.playtime + obj['playtime'], chats = models.Player.chats + obj['chats'], deaths = models.Player.deaths + obj['deaths'], kills = models.Player.kills + obj['kills'], voicetime = models.Player.voicetime + obj['voicetime'], first_seen = func.least(models.Player.first_seen, starttime), last_seen = func.greatest(models.Player.last_seen, starttime), )) db.session.commit() break except Exception: db.session.rollback() print('deadlock 2') # try insert new player names while True: try: db.session.execute( models.PlayerNames.__table__.insert().prefix_with('IGNORE').values([ dict( guid = guid, name = name, time = 0 ) for name in obj['names'].keys() ])) db.session.commit() break except Exception: db.session.rollback() print('deadlock 3') # update player names atomically while True: try: db.session.execute( models.PlayerNames.__table__.update() .where(models.PlayerNames.guid == bindparam('_guid')) .where(models.PlayerNames.name == bindparam('_name')) .values(time = models.PlayerNames.time + bindparam('_nametime')), [ dict( _guid = guid, _name = name, _nametime = nametime ) for name, nametime in obj['names'].items() ] ) db.session.commit() break except Exception: db.session.rollback() print('deadlock 4') # try insert new player sprays while True: try: if obj['sprays']: db.session.execute( models.PlayerSprays.__table__.insert().prefix_with('IGNORE').values( [(guid, spray) for spray in obj['sprays']] )) db.session.commit() break except Exception: db.session.rollback() print('deadlock 5') # insert new player session player_session = models.PlayerSession( player_guid=guid, session_id=session.id, playtime=obj['playtime'], chats=obj['chats'], deaths=obj['deaths'], kills=obj['kills'], voicetime=obj['voicetime'] ) db.session.add(player_session) db.session.commit() if guid != "BOT": session_playtime += player_session.playtime session_chats += player_session.chats session_deaths += player_session.deaths session_kills += player_session.kills session.playtime = session_playtime session.chats = session_chats session.deaths = session_deaths session.kills = session_kills db.session.commit() for obj in _itery(data['events']): if obj['event'] == 'round_start': session.rounds += 1 elif obj['event'] == 'round_end': if obj['winner'] == 2: session.t_wins += 1 elif obj['winner'] == 3: session.ct_wins += 1 # remove redundant info eventdata = obj.copy() del eventdata['tick'] del eventdata['event'] if 'steamid' in eventdata: del eventdata['steamid'] event = models.Event( player_guid=obj['steamid'] if 'steamid' in obj else None, session_id=session.id, tick=obj['tick'], time=starttime + timedelta(seconds=obj['tick'] * tickinterval), event=obj['event'], data=eventdata ) db.session.add(event) db.session.commit() for obj in _itery(data['chat']): msg = obj['msgName'] if msg == '#Cstrike_Name_Change': nick = obj['msgSender'] msg = 'changed their name to "{}"'.format(obj['msgText']) elif obj['steamid'] == 'BOT': nick = 'Console' msg = obj['msgName'].lstrip('\x01\x07FF0000Console: ') else: try: nick, msg = obj['msgName'][1:].split('\x01', 1) except Exception: msg = None if msg: msg = msg.lstrip(': ') if msg: nick = remove_chatcolors(nick) msg = remove_chatcolors(msg) chat = models.Chat( player_guid=obj['steamid'], session_id=session.id, tick=obj['tick'], time=starttime + timedelta(seconds=obj['tick'] * tickinterval), name=nick, chat=msg ) db.session.add(chat) db.session.commit() if jmodified: with open(DEMO_PATH_PREFIX + path + '/out.json', 'wb') as fp: fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2)) def worker(queue, error_queue): app = create_app() with app.app_context(): while True: item = queue.get() if not item: queue.task_done() break try: db.session.commit() parse_demo(item) except Exception as e: error_queue.put(item) print(f'vvv error from: {item} vvv') traceback.print_exception(type(e), e, e.__traceback__) print(f'^^^ error from: {item} ^^^') queue.task_done() if __name__ == '__main__': if len(sys.argv) >= 2: DEMO_PATH_PREFIX = sys.argv[1] with open('work.txt', 'r') as fp: work = set(fp.read().splitlines()) with open('done.txt', 'r') as fp: done = set(fp.read().splitlines()) if False: done = set() app = create_app() with app.app_context(): meta = db.metadata for table in reversed(meta.sorted_tables): db.session.execute(table.delete()) db.session.commit() todo = work - done q = multiprocessing.JoinableQueue() q_err = multiprocessing.SimpleQueue() # populate queue with jobs print('Populating queue...') for task in todo: q.put(task) # ignore STDINT(^C) and store original sigint handler # this is so our forked processes ignore ^C and can finish their work cleanly original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) # start workers print('Starting workers...') workers = [] for i in range(20): p = multiprocessing.Process(target=worker, args=(q, q_err)) workers.append(p) p.start() # restore original sigint handler (don't ignore ^C) signal.signal(signal.SIGINT, original_sigint_handler) # wait for queue to empty or ^C print('Waiting for jobs to complete or ^C...') try: q.join() except KeyboardInterrupt: print('Quit!') # get any jobs which were left in the queue print('Emptying queue...') left = set() while not q.empty(): item = q.get() left.add(item) q.task_done() # send N kill signals for each worker print('Putting kill jobs into queue...') for _ in workers: q.put(None) q.close() # wait until workers are dead print('Waiting for workers to finish...') for w in workers: w.join() # get the failed jobs print('Emptying failed queue...') while not q_err.empty(): item = q_err.get() left.add(item) q_err.close() done.update(todo - left) with open('done.txt', 'w') as fp: fp.write('\n'.join(done))