-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
146 lines (124 loc) · 5.51 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from flask import Flask, render_template, request, session
import psycopg2
conn = psycopg2.connect('postgres://localhost')
conn.set_session(readonly=True, autocommit=True)
conn.set_client_encoding('UTF8')
cur = conn.cursor()
app = Flask(__name__)
app.secret_key = '76tv6rcs3x32azx43scrfybyuniu'
@app.route('/')
def start_page():
return render_template('sql_editor.html')
@app.route('/schemas')
def get_schemas():
cur.execute("select schema_name from information_schema.schemata;")
schemas = [s[0] for s in cur.fetchall() if not s[0].startswith("pg_")]
schemas.remove('information_schema')
session['schemas'] = schemas
return render_template("schema_explorer.html", schemas=schemas)
@app.route('/explore/<schema>/tables')
def get_tables(schema):
cur.execute("select table_name from information_schema.tables where table_schema = '" + schema + "'")
tables = [t[0] for t in cur.fetchall()]
session[schema] = tables
return render_template("table_explorer.html", tables=tables, focussed_schema=schema)
@app.route('/explore/<schema>/<table>/columns')
def show_columns(schema, table):
cur.execute(
"select column_name, data_type from information_schema.columns where table_schema = '" + schema + "' and table_name = '" + table + "'")
columns = cur.fetchall()
columns = [c for c in columns]
cur.execute('select count(*) from "' + schema + '"."' + table + '";')
items = cur.fetchone()[0]
return render_template("column_explorer.html",
schemas=session['schemas'],
tables=session[schema],
columns=columns,
focussed_schema=schema,
focussed_table=table,
items=items)
@app.route('/process', methods=['POST'])
def process():
try:
cur.execute(request.form['sql'])
rows = cur.fetchmany(1000)
columns = [a.name for a in cur.description]
return render_template('sql_editor.html', columns=columns, rows=rows, length=len(rows), sql=request.form['sql'])
except psycopg2.Error as e:
return render_template('sql_editor.html', error=e.pgerror, sql=request.form['sql'])
@app.route('/view100/<table>')
def view_100(table):
try:
sql = 'select * from ' + table + ' limit 100;'
cur.execute(sql)
rows = cur.fetchall()
columns = [a.name for a in cur.description]
return render_template('sql_editor.html', columns=columns, rows=rows, length=len(rows), sql=sql)
except psycopg2.Error as e:
return render_template('sql_editor.html', error=e.pgerror, sql=sql)
@app.route('/start/<schema>/<table>')
@app.route('/start/<schema>/<table>/<column>')
def start_sql_table(schema, table, column=None):
try:
col = '*'
if column:
col = column
sql = f"select {col} from \"{schema}\".\"{table}\""
cur.execute(sql)
rows = cur.fetchmany(1000)
columns = [a.name for a in cur.description]
return render_template('sql_editor.html', columns=columns, rows=rows, sql=sql)
except psycopg2.Error as e:
return render_template('sql_editor.html', error=e.pgerror, sql=sql)
def get_columns(cur, schema, table):
cur.execute(
"select column_name from information_schema.columns where table_schema = '"
+ schema + "' and table_name = '" + table + "'")
return [c[0] for c in cur.fetchall()]
# TODO refactor
@app.route('/autofilter/<schema>/<table>', methods=['GET', 'POST'])
def auto_filter(schema, table):
limit = 200
dropdowns = {}
other_cols = {}
try:
columns = get_columns(cur, schema, table)
for c in columns:
cur.execute(f'select "{c}" from "{schema}"."{table}" group by 1 limit {limit + 1}')
count = -1
res = cur.fetchall()
if res is not None:
count = len(res)
if c != 'id' and count and limit > count > -1:
cur.execute(f'select distinct "{c}" from "{schema}"."{table}" order by 1 asc')
vals = cur.fetchall()
dropdowns[c] = [{'value': '------', 'selected': False}]
for v in vals:
if len(request.form) > 0 and request.form[c] != '' and request.form[c] == v:
dropdowns[c].append({'value': v[0], 'selected': True})
else:
dropdowns[c].append({'value': v[0], 'selected': False})
else:
other_cols[c] = [{'name':c, 'value':''}]
# if len(request.form) > 0 and request.form[c] != '':
# other_cols[c].append({'name':c , 'value': request.form[c]})
add_str = ''
extra_params = []
if len(request.form) > 0:
for tup in request.form:
if request.form[tup] != '------' and request.form[tup] != '':
extra_params.append(f"\"{tup}\" = '{request.form[tup]}'")
add_str = ' WHERE ' + ' AND '.join(extra_params)
cur.execute(f"select * from {table} {add_str}")
rows = cur.fetchmany(500)
except psycopg2.Error as e:
return render_template('sql_editor.html', error=e.pgerror)
return render_template('auto_filter.html',
dropdowns=dropdowns,
other_cols=other_cols,
rows=rows,
columns=columns,
schema=schema,
table=table)
if __name__ == '__main__':
app.run()