projects-jenz/nosteam_verificiation/python/read_access_logs.py

91 lines
3.8 KiB
Python
Raw Normal View History

#!/home/nonroot/nginx_reader/venv/bin/python3
from settings import get_connection
def main():
motd_accessed = []
with open("/var/log/nginx/access_xenforo.log", "r") as f:
for l in f.readlines():
if 'MOTD.html' in l:
motd_accessed.append(l)
with get_connection() as conn:
with conn.cursor() as cur:
sql_statement = """
CREATE TABLE IF NOT EXISTS
`unloze_anti-spoofing`.requests_info
(
ipv4 varchar(64) not null,
status_code int4,
user_agent varchar(512),
x_forwarded varchar(512),
request_length int4,
request_time int4,
body_bytes_sent varchar(64),
bytes_sent varchar(64),
ssl_protocol varchar(256),
ssl_cipher varchar(256),
inserted_on datetime default now(),
primary key (ipv4)
)
"""
cur.execute(sql_statement)
#print('statement: ', cur.statement)
sql_statement = """
CREATE TABLE IF NOT EXISTS
`unloze_anti-spoofing`.user_agent_history
(
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
ipv4 varchar(64) not null,
user_agent varchar(512),
inserted_on datetime default now(),
FOREIGN KEY (ipv4) REFERENCES requests_info(ipv4)
)
"""
cur.execute(sql_statement)
#print('statement: ', cur.statement)
for d in motd_accessed:
ipv4 = d.split("] ")[1].split(" status")[0]
status_code = d.split("code: ")[1].split(" body")[0]
user_agent = d.split('user agent: "')[1].split('" x ')[0]
x_forwarded = d.split('x forwarded: "')[1].split('" request length:')[0]
request_length = d.split("request length: ")[1].split(" request_time:")[0]
request_time = d.split("request_time: ")[1].split(" content length:")[0]
body_bites = d.split("body bytes: ")[1].split(" bytes:")[0]
bytes_send = d.split("bytes: ")[1].split(" user agent:")[0].strip(" ")
ssl_protocol = d.split("ssl_protocol: ")[1].split(" ssl_cipher:")[0]
ssl_cipher = motd_accessed[0].split("ssl_cipher: ")[1].split("\n")[0]
sql_statement = """
INSERT IGNORE
`unloze_anti-spoofing`.requests_info
(ipv4, status_code, user_agent, x_forwarded, request_length, request_time, body_bytes_sent, bytes_sent,
ssl_protocol, ssl_cipher)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cur.execute(sql_statement, [ipv4, status_code, user_agent, x_forwarded, request_length, request_time, body_bites, bytes_send, ssl_protocol, ssl_cipher])
sql_statement = """
select * from `unloze_anti-spoofing`.user_agent_history
WHERE ipv4 = %s and user_agent
= %s
"""
cur.execute(sql_statement, [ipv4, user_agent])
res = cur.fetchall()
#print('res: ', res)
if not res:
sql_statement = """
INSERT IGNORE
`unloze_anti-spoofing`.user_agent_history
(ipv4, user_agent)
VALUES
(%s, %s)
"""
cur.execute(sql_statement, [ipv4, user_agent])
#print('statement: ', cur.statement)
conn.commit()
conn.close() #not sure if mysql supports with statement clauses or not but does not look like it tbh
if __name__ == '__main__':
main()
print('finished')