380 lines
11 KiB
Python
380 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
import orjson
|
|
import time
|
|
import os.path
|
|
import multiprocessing
|
|
import signal
|
|
import re
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import exc, func
|
|
from sqlalchemy.sql.expression import bindparam
|
|
|
|
from flask import current_app
|
|
|
|
from demweb.app import create_app
|
|
from demweb.extensions import db
|
|
from demweb import models
|
|
|
|
DEMO_PATH_PREFIX = ''
|
|
DEMO_REGEX = re.compile(r'auto-(\d+-\d+)-(\w+)')
|
|
|
|
def _itery(x):
|
|
if x is None:
|
|
return ()
|
|
if isinstance(x, list):
|
|
return x
|
|
return (x,)
|
|
|
|
|
|
def remove_chatcolors(message):
|
|
output = ''
|
|
msglen = len(message)
|
|
i = 0
|
|
while i < msglen:
|
|
c = message[i]
|
|
if ord(c) <= 8:
|
|
if ord(c) == 7:
|
|
i += 6
|
|
elif ord(c) == 8:
|
|
i += 8
|
|
else:
|
|
output += c
|
|
i += 1
|
|
return output
|
|
|
|
|
|
def parse_demo(path):
|
|
jmodified = False
|
|
with open(DEMO_PATH_PREFIX + path + '/out.json', 'r') as fp:
|
|
data = orjson.loads(fp.read())
|
|
|
|
demoname = os.path.basename(path)
|
|
match = DEMO_REGEX.match(demoname)
|
|
|
|
starttime = datetime.strptime(match.group(1), '%Y%m%d-%H%M%S')
|
|
tickinterval = data['serverinfo']['tickInterval']
|
|
|
|
session = models.Session(
|
|
time=starttime,
|
|
length=data['demoheader']['playback_time'],
|
|
demoname=demoname,
|
|
mapname=match.group(2),
|
|
mapmd5=data['serverinfo']['mapMD5'],
|
|
servername=data['demoheader']['servername'],
|
|
frames=data['demoheader']['playback_frames'],
|
|
ticks=data['demoheader']['playback_ticks'],
|
|
tickinterval=tickinterval,
|
|
dirty=data['demoheader']['dirty'],
|
|
playtime=0,
|
|
rounds=0,
|
|
ct_wins=0,
|
|
t_wins=0,
|
|
chats=0,
|
|
deaths=0,
|
|
kills=0,
|
|
voice_active=data['voice']['active_time'],
|
|
voice_total=data['voice']['total_time'],
|
|
silence_chunks=data['voice']['silence'] if 'silence' in data['voice'] else []
|
|
)
|
|
db.session.add(session)
|
|
db.session.commit()
|
|
|
|
session_playtime = 0
|
|
session_chats = 0
|
|
session_deaths = 0
|
|
session_kills = 0
|
|
|
|
if not data['players']:
|
|
data['players'] = dict()
|
|
|
|
for guid, obj in data['players'].items():
|
|
# convert to seconds
|
|
obj['playtime'] *= tickinterval
|
|
|
|
# try insert new player
|
|
while True:
|
|
try:
|
|
db.session.execute(
|
|
models.Player.__table__.insert().prefix_with('IGNORE').values(
|
|
guid=guid,
|
|
name=None,
|
|
first_seen=starttime,
|
|
last_seen=starttime,
|
|
playtime=0,
|
|
chats=0,
|
|
deaths=0,
|
|
kills=0,
|
|
voicetime=0
|
|
))
|
|
db.session.commit()
|
|
break
|
|
except Exception:
|
|
db.session.rollback()
|
|
print('deadlock 1')
|
|
|
|
# update player stats atomically
|
|
while True:
|
|
try:
|
|
db.session.execute(
|
|
models.Player.__table__.update(models.Player.guid == guid).values(
|
|
playtime = models.Player.playtime + obj['playtime'],
|
|
chats = models.Player.chats + obj['chats'],
|
|
deaths = models.Player.deaths + obj['deaths'],
|
|
kills = models.Player.kills + obj['kills'],
|
|
voicetime = models.Player.voicetime + obj['voicetime'],
|
|
first_seen = func.least(models.Player.first_seen, starttime),
|
|
last_seen = func.greatest(models.Player.last_seen, starttime),
|
|
))
|
|
db.session.commit()
|
|
break
|
|
except Exception:
|
|
db.session.rollback()
|
|
print('deadlock 2')
|
|
|
|
# try insert new player names
|
|
while True:
|
|
try:
|
|
db.session.execute(
|
|
models.PlayerNames.__table__.insert().prefix_with('IGNORE').values([
|
|
dict(
|
|
guid = guid,
|
|
name = name,
|
|
time = 0
|
|
)
|
|
for name in obj['names'].keys()
|
|
]))
|
|
db.session.commit()
|
|
break
|
|
except Exception:
|
|
db.session.rollback()
|
|
print('deadlock 3')
|
|
|
|
# update player names atomically
|
|
while True:
|
|
try:
|
|
db.session.execute(
|
|
models.PlayerNames.__table__.update()
|
|
.where(models.PlayerNames.guid == bindparam('_guid'))
|
|
.where(models.PlayerNames.name == bindparam('_name'))
|
|
.values(time = models.PlayerNames.time + bindparam('_nametime')),
|
|
[
|
|
dict(
|
|
_guid = guid,
|
|
_name = name,
|
|
_nametime = nametime
|
|
) for name, nametime in obj['names'].items()
|
|
]
|
|
)
|
|
db.session.commit()
|
|
break
|
|
except Exception:
|
|
db.session.rollback()
|
|
print('deadlock 4')
|
|
|
|
# try insert new player sprays
|
|
while True:
|
|
try:
|
|
if obj['sprays']:
|
|
db.session.execute(
|
|
models.PlayerSprays.__table__.insert().prefix_with('IGNORE').values(
|
|
[(guid, spray) for spray in obj['sprays']]
|
|
))
|
|
db.session.commit()
|
|
break
|
|
except Exception:
|
|
db.session.rollback()
|
|
print('deadlock 5')
|
|
|
|
# insert new player session
|
|
player_session = models.PlayerSession(
|
|
player_guid=guid,
|
|
session_id=session.id,
|
|
playtime=obj['playtime'],
|
|
chats=obj['chats'],
|
|
deaths=obj['deaths'],
|
|
kills=obj['kills'],
|
|
voicetime=obj['voicetime']
|
|
)
|
|
db.session.add(player_session)
|
|
db.session.commit()
|
|
|
|
if guid != "BOT":
|
|
session_playtime += player_session.playtime
|
|
session_chats += player_session.chats
|
|
session_deaths += player_session.deaths
|
|
session_kills += player_session.kills
|
|
|
|
session.playtime = session_playtime
|
|
session.chats = session_chats
|
|
session.deaths = session_deaths
|
|
session.kills = session_kills
|
|
db.session.commit()
|
|
|
|
for obj in _itery(data['events']):
|
|
if obj['event'] == 'round_start':
|
|
session.rounds += 1
|
|
elif obj['event'] == 'round_end':
|
|
if obj['winner'] == 2:
|
|
session.t_wins += 1
|
|
elif obj['winner'] == 3:
|
|
session.ct_wins += 1
|
|
|
|
# remove redundant info
|
|
eventdata = obj.copy()
|
|
del eventdata['tick']
|
|
del eventdata['event']
|
|
if 'steamid' in eventdata:
|
|
del eventdata['steamid']
|
|
|
|
event = models.Event(
|
|
player_guid=obj['steamid'] if 'steamid' in obj else None,
|
|
session_id=session.id,
|
|
tick=obj['tick'],
|
|
time=starttime + timedelta(seconds=obj['tick'] * tickinterval),
|
|
event=obj['event'],
|
|
data=eventdata
|
|
)
|
|
db.session.add(event)
|
|
db.session.commit()
|
|
|
|
for obj in _itery(data['chat']):
|
|
msg = obj['msgName']
|
|
if msg == '#Cstrike_Name_Change':
|
|
nick = obj['msgSender']
|
|
msg = 'changed their name to "{}"'.format(obj['msgText'])
|
|
elif obj['steamid'] == 'BOT':
|
|
nick = 'Console'
|
|
msg = obj['msgName'].lstrip('\x01\x07FF0000Console: ')
|
|
else:
|
|
try:
|
|
nick, msg = obj['msgName'][1:].split('\x01', 1)
|
|
except Exception:
|
|
msg = None
|
|
if msg:
|
|
msg = msg.lstrip(': ')
|
|
|
|
if msg:
|
|
nick = remove_chatcolors(nick)
|
|
msg = remove_chatcolors(msg)
|
|
|
|
chat = models.Chat(
|
|
player_guid=obj['steamid'],
|
|
session_id=session.id,
|
|
tick=obj['tick'],
|
|
time=starttime + timedelta(seconds=obj['tick'] * tickinterval),
|
|
name=nick,
|
|
chat=msg
|
|
)
|
|
db.session.add(chat)
|
|
db.session.commit()
|
|
|
|
if jmodified:
|
|
with open(DEMO_PATH_PREFIX + path + '/out.json', 'wb') as fp:
|
|
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
|
|
|
|
|
|
def worker(queue, error_queue):
|
|
app = create_app()
|
|
with app.app_context():
|
|
while True:
|
|
item = queue.get()
|
|
if not item:
|
|
queue.task_done()
|
|
break
|
|
|
|
try:
|
|
db.session.commit()
|
|
parse_demo(item)
|
|
except Exception as e:
|
|
error_queue.put(item)
|
|
print(f'vvv error from: {item} vvv')
|
|
traceback.print_exception(type(e), e, e.__traceback__)
|
|
print(f'^^^ error from: {item} ^^^')
|
|
|
|
queue.task_done()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) >= 2:
|
|
DEMO_PATH_PREFIX = sys.argv[1]
|
|
|
|
with open('work.txt', 'r') as fp:
|
|
work = set(fp.read().splitlines())
|
|
|
|
with open('done.txt', 'r') as fp:
|
|
done = set(fp.read().splitlines())
|
|
|
|
if False:
|
|
done = set()
|
|
app = create_app()
|
|
with app.app_context():
|
|
meta = db.metadata
|
|
for table in reversed(meta.sorted_tables):
|
|
db.session.execute(table.delete())
|
|
db.session.commit()
|
|
|
|
todo = work - done
|
|
|
|
q = multiprocessing.JoinableQueue()
|
|
q_err = multiprocessing.SimpleQueue()
|
|
|
|
# populate queue with jobs
|
|
print('Populating queue...')
|
|
for task in todo:
|
|
q.put(task)
|
|
|
|
# ignore STDINT(^C) and store original sigint handler
|
|
# this is so our forked processes ignore ^C and can finish their work cleanly
|
|
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
|
|
# start workers
|
|
print('Starting workers...')
|
|
workers = []
|
|
for i in range(20):
|
|
p = multiprocessing.Process(target=worker, args=(q, q_err))
|
|
workers.append(p)
|
|
p.start()
|
|
|
|
# restore original sigint handler (don't ignore ^C)
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
|
|
# wait for queue to empty or ^C
|
|
print('Waiting for jobs to complete or ^C...')
|
|
try:
|
|
q.join()
|
|
except KeyboardInterrupt:
|
|
print('Quit!')
|
|
|
|
# get any jobs which were left in the queue
|
|
print('Emptying queue...')
|
|
left = set()
|
|
while not q.empty():
|
|
item = q.get()
|
|
left.add(item)
|
|
q.task_done()
|
|
|
|
# send N kill signals for each worker
|
|
print('Putting kill jobs into queue...')
|
|
for _ in workers:
|
|
q.put(None)
|
|
q.close()
|
|
|
|
# wait until workers are dead
|
|
print('Waiting for workers to finish...')
|
|
for w in workers:
|
|
w.join()
|
|
|
|
# get the failed jobs
|
|
print('Emptying failed queue...')
|
|
while not q_err.empty():
|
|
item = q_err.get()
|
|
left.add(item)
|
|
q_err.close()
|
|
|
|
done.update(todo - left)
|
|
|
|
with open('done.txt', 'w') as fp:
|
|
fp.write('\n'.join(done))
|