#!/usr/bin/python
#
# mapdata.py
#
# PURPOSE
# Display a simple interactive map of data points, allowing points to
# be highlighted by clicking on the map or table or by querying,
# and allowing some simple data plots.
#
# COPYRIGHT AND LICENSE
# Copyright (c) 2023, R. Dreas Nielsen
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# The GNU General Public License is available at
#
# AUTHOR
# Dreas Nielsen (RDN)
#
# ==================================================================
version = "2.8.6"
vdate = "2023-07-03"
copyright = "2023"
import sys
import os.path
import io
import codecs
import argparse
from configparser import ConfigParser
import csv
import re
import datetime
import time
import math
import collections
import webbrowser
import threading
import queue
import sqlite3
import tempfile
import random
import uuid
import traceback
import subprocess
import multiprocessing
import copy
import tkinter as tk
import tkinter.ttk as ttk
import tkinter.font as tkfont
import tkinter.filedialog as tkfiledialog
import tkintermapview as tkmv
from PIL import ImageGrab
from PIL import ImageTk
import odf.opendocument
import odf.table
import odf.text
import odf.number
import odf.style
import xlrd
import openpyxl
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
# Default name of configuration file. Files with other names may be read.
config_file_name = "mapdata.conf"
# Configuration files read on startup
config_files = []
# Configuration file read post-startup
config_files_user = []
# Default options
multiselect = "0"
#-- Default location marker. This may be overridden
location_marker = "triangle_open"
location_color = "black"
use_data_marker = True
use_data_color = True
#-- Selected item marker
select_symbol = "wedge"
select_color = "red"
#-- Label appearance
label_color = "black"
label_font = "Liberation Sans"
label_size = 10
label_bold = False
label_position = "below" # above or below
# Name of editor, read from environment if it exists
editor = os.environ.get("EDITOR")
#-- Operational configuration
# Whether to use a temporary file for Sqlite (as opposed to memory).
temp_dbfile = False
# Patch the tkintermapview CanvasPositionMarker 'calculate_text_y_offset()' function to
# allow labeling below the icon. The icon anchor position is always "center" for this app.
def new_calc_text_offset(self):
if self.icon is not None:
if label_position == "below":
self.text_y_offset = self.icon.height()/2 + 6 + label_size
else:
self.text_y_offset = -self.icon.height()/2 - 3
else:
self.text_y_offset = -56
tkmv.canvas_position_marker.CanvasPositionMarker.calculate_text_y_offset = new_calc_text_offset
# Patch function for ImageTk.PhotoImage.__del__
def new_img_del(img_self):
try:
name = img_self.__photo.name
img_self.__photo.name = None
img_self.__photo.tk.call("image", "delete", name)
except Exception:
pass
#===== Global constants and variables =====
# Tile servers for map basemap layers
bm_servers = {"OpenStreetMap": "https://a.tile.openstreetmap.org/{z}/{x}/{y}.png",
"Google streets": "https://mt0.google.com/vt/lyrs=m&hl=en&x={x}&y={y}&z={z}&s=Ga",
"Google satellite": "https://mt0.google.com/vt/lyrs=s&hl=en&x={x}&y={y}&z={z}&s=Ga",
"Open topo map": "https://tile.opentopomap.org/{z}/{x}/{y}.png",
"Stamen terrain": "https://stamen-tiles.a.ssl.fastly.net/terrain/{z}/{x}/{y}.png"
#, "Stamen toner": "https://stamen-tiles.a.ssl.fastly.net/toner/{z}/{x}/{y}.png"
}
# API keys for tile servers that require them. The dictionary keys should match basemap names.
api_keys = {}
# Initial basemap to use
#initial_basemap = tuple(bm_servers.keys())[0]
initial_basemap = "OpenStreetMap"
# List of initial basemap names, for use when saving config
initial_bms = list(bm_servers.keys())
# X11 bitmaps for map icons
icon_xbm = {
'ball': """#define ball_width 16
#define ball_height 16
static unsigned char circle_bits[] = {
0xc0, 0x03, 0xf0, 0x0f, 0xf8, 0x1f, 0xfc, 0x3f, 0xfe, 0x7f, 0xfe, 0x7f,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfe, 0x7f, 0xfe, 0x7f,
0xfc, 0x3f, 0xf8, 0x1f, 0xf0, 0x0f, 0xc0, 0x03};""",
'bird': """#define bird_width 16
#define bird_height 16
static unsigned char bird.xbm_bits[] = {
0x00, 0x00, 0x00, 0x1c, 0x00, 0x3f, 0x80, 0xef, 0xc0, 0x7f, 0xe0, 0x3f,
0xf0, 0x3f, 0xf8, 0x1f, 0xff, 0x1f, 0xfc, 0x0f, 0xe0, 0x07, 0x80, 0x01,
0x00, 0x01, 0x00, 0x01, 0x80, 0x03, 0xe0, 0x0f};""",
'block': """#define block_width 16
#define block_height 16
static unsigned char square_bits[] = {
0x00, 0x00, 0x00, 0x00, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f,
0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f,
0xfc, 0x3f, 0xfc, 0x3f, 0x00, 0x00, 0x00, 0x00};""",
'bookmark': """#define bookmark_width 16
#define bookmark_height 16
static unsigned char bookmark_bits[] = {
0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f,
0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f, 0x7e, 0x7e, 0x3e, 0x7c,
0x1e, 0x78, 0x0e, 0x70, 0x06, 0x60, 0x02, 0x40};""",
'circle': """#define circle_width 16
#define circle_height 16
static unsigned char circle_bits[] = {
0xc0, 0x03, 0xf0, 0x0f, 0x78, 0x1e, 0x1c, 0x38, 0x0e, 0x70, 0x06, 0x60,
0x07, 0xe0, 0x03, 0xc0, 0x03, 0xc0, 0x07, 0xe0, 0x06, 0x60, 0x0e, 0x70,
0x1c, 0x38, 0x78, 0x1e, 0xf0, 0x0f, 0xc0, 0x03};""",
'deposition': """
#define deposition_width 16
#define deposition_height 16
static unsigned char deposition_bits[] = {
0x55, 0xab, 0x00, 0x00, 0x6b, 0xdd, 0x00, 0x00, 0xf7, 0xee, 0x00, 0x00,
0xbb, 0xbb, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};""",
'diamond': """#define diamond_width 16
#define diamond_height 16
static unsigned char diamond_bits[] = {
0x80, 0x01, 0xc0, 0x03, 0xe0, 0x07, 0xf0, 0x0f, 0xf8, 0x1f, 0xfc, 0x3f,
0xfe, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xfe, 0x7f, 0xfc, 0x3f, 0xf8, 0x1f,
0xf0, 0x0f, 0xe0, 0x07, 0xc0, 0x03, 0x80, 0x01};""",
'drop': """,
#define drop.xbm_width 16
#define drop.xbm_height 16
static unsigned char drop.xbm_bits[] = {
0x80, 0x01, 0x80, 0x01, 0xc0, 0x03, 0xc0, 0x03, 0xe0, 0x07, 0xe0, 0x07,
0xf8, 0x1f, 0xf8, 0x1f, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f,
0xfc, 0x3f, 0xf0, 0x1f, 0xf0, 0x0f, 0xc0, 0x03};""",
'fish': """#define fish_width 16
#define fish_height 16
static unsigned char fish.xbm_bits[] = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x01, 0x0f,
0xe3, 0x7f, 0xf7, 0x78, 0x3e, 0xea, 0x9e, 0xfb, 0x73, 0x84, 0xc1, 0x63,
0x01, 0x3e, 0x00, 0x18, 0x00, 0x18, 0x00, 0x08};""",
'flag': """#define flag_width 16
#define flag_height 16
static unsigned char flag.xbm_bits[] = {
0x00, 0x00, 0x0e, 0x00, 0x3e, 0x00, 0xfe, 0x01, 0xfe, 0x1f, 0xfe, 0xff,
0xfe, 0xff, 0xfe, 0xff, 0xfe, 0xff, 0xfe, 0xff, 0xfe, 0xff, 0xf6, 0xff,
0x86, 0xff, 0x06, 0xf8, 0x06, 0x00, 0x06, 0x00};""",
'house': """#define house_width 16
#define house_height 16
static unsigned char house_bits[] = {
0x80, 0x01, 0xc0, 0x33, 0x60, 0x36, 0xb0, 0x3d, 0xd8, 0x3b, 0xec, 0x37,
0xf6, 0x6f, 0xfb, 0xdf, 0xfd, 0xbf, 0xfc, 0x3f, 0xfc, 0x3f, 0xfc, 0x3f,
0x7c, 0x3e, 0x7c, 0x3e, 0x7c, 0x3e, 0x7c, 0x3e};""",
'info': """#define info_width 16
#define info_height 16
static unsigned char info_bits[] = {
0xc0, 0x03, 0xf0, 0x0f, 0x78, 0x1e, 0x7c, 0x3e, 0xfe, 0x7f, 0xfe, 0x7f,
0x3f, 0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7f, 0xfe, 0x7e, 0x7e, 0x7e, 0x7e,
0x3c, 0x3c, 0xf8, 0x1f, 0xf0, 0x0f, 0xc0, 0x03};""",
'lightning': """#define lightning_width 16
#define lightning_height 16
static unsigned char Lightning_bits[] = {
0x00, 0xc0, 0x00, 0x70, 0x00, 0x1c, 0x00, 0x07, 0x80, 0x03, 0xe0, 0x01,
0xf0, 0x00, 0xf8, 0x03, 0xc0, 0x3f, 0x00, 0x1f, 0x80, 0x07, 0xc0, 0x01,
0xe0, 0x00, 0x38, 0x00, 0x0e, 0x00, 0x03, 0x00};""",
'plus': """#define plus_width 16
#define plus_height 16
static unsigned char plus_bits[] = {
0x80, 0x01, 0x80, 0x01, 0x80, 0x01, 0x80, 0x01, 0x80, 0x01, 0x80, 0x01,
0x80, 0x01, 0xff, 0xff, 0xff, 0xff, 0x80, 0x01, 0x80, 0x01, 0x80, 0x01,
0x80, 0x01, 0x80, 0x01, 0x80, 0x01, 0x80, 0x01};""",
'rose': """#define rose_width 16
#define rose_height 16
static unsigned char rose_bits[] = {
0x80, 0x01, 0x80, 0x01, 0xc0, 0x03, 0xc0, 0x03, 0xc0, 0x03, 0xe0, 0x07,
0xfc, 0x3f, 0xff, 0xff, 0xff, 0xff, 0xfc, 0x3f, 0xe0, 0x07, 0xc0, 0x03,
0xc0, 0x03, 0xc0, 0x03, 0x80, 0x01, 0x80, 0x01};""",
'square': """#define square_width 16
#define square_height 16
static unsigned char square_bits[] = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x07, 0xe0, 0x07, 0xe0, 0x07, 0xe0,
0x07, 0xe0, 0x07, 0xe0, 0x07, 0xe0, 0x07, 0xe0, 0x07, 0xe0, 0x07, 0xe0,
0x07, 0xe0, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};""",
'star': """#define star_width 16
#define star_height 16
static unsigned char star_bits[] = {
0x80, 0x01, 0x80, 0x01, 0xc0, 0x03, 0xc0, 0x03, 0xc0, 0x03, 0xe0, 0x07,
0xff, 0xff, 0xff, 0xff, 0xfc, 0x3f, 0xf0, 0x0f, 0xf8, 0x1f, 0xf8, 0x1f,
0x7c, 0x3e, 0x3c, 0x3c, 0x0e, 0x70, 0x06, 0x60};""",
'target': """#define target_width 16
#define target_height 16
static unsigned char target_bits[] = {
0xc0, 0x03, 0xf0, 0x0f, 0x78, 0x1e, 0x1c, 0x38, 0x0e, 0x70, 0x86, 0x61,
0xc7, 0xe3, 0xe3, 0xc7, 0xe3, 0xc7, 0xc7, 0xe3, 0x86, 0x61, 0x0e, 0x70,
0x1c, 0x38, 0x78, 0x1e, 0xf0, 0x0f, 0xc0, 0x03};""",
'tree': """
#define tree_width 16
#define tree_height 16
static unsigned char tree_bits[] = {
0xf8, 0x00, 0xa8, 0x37, 0x7c, 0x7d, 0xef, 0x4a, 0x37, 0xf5, 0xdf, 0xaf,
0xbe, 0xdb, 0xfc, 0x7f, 0xb0, 0x77, 0xc0, 0x7b, 0xc0, 0x1f, 0xc0, 0x03,
0xc0, 0x07, 0xc0, 0x07, 0xe0, 0x0f, 0xfc, 0x0f};""",
'triangle': """#define triangle_width 16
#define triangle_height 16
static unsigned char triangle_bits[] = {
0x80, 0x01, 0x80, 0x01, 0xc0, 0x03, 0xc0, 0x03, 0xe0, 0x07, 0xe0, 0x07,
0xf0, 0x0f, 0xf0, 0x0f, 0xf8, 0x1f, 0xf8, 0x1f, 0xfc, 0x3f, 0xfc, 0x3f,
0xfe, 0x7f, 0xfe, 0x7f, 0xff, 0xff, 0xff, 0xff};""",
'triangle_open': """#define triangle_open_width 16
#define triangle_open_height 16
static unsigned char triangle_open_bits[] = {
0x80, 0x01, 0x80, 0x01, 0xc0, 0x03, 0xc0, 0x03, 0xe0, 0x07, 0xe0, 0x07,
0x70, 0x0e, 0x70, 0x0e, 0x38, 0x1c, 0x38, 0x1c, 0x1c, 0x38, 0x1c, 0x38,
0x0e, 0x70, 0xfe, 0x7f, 0xff, 0xff, 0xff, 0xff};""",
'vapor': """
#define vapor_width 16
#define vapor_height 16
static unsigned char vapor_bits[] = {
0x20, 0x01, 0x30, 0x03, 0xcc, 0x4c, 0xc4, 0xcc, 0x13, 0x32, 0x31, 0x23,
0xc8, 0x8c, 0x4c, 0xc4, 0x11, 0x22, 0x33, 0x33, 0xc4, 0xc4, 0x4c, 0x4c,
0x32, 0x33, 0x20, 0x23, 0xc8, 0x0c, 0x80, 0x04};""",
'wave': """#define wave_width 16
#define wave_height 16
static unsigned char wave_bits[] = {
0x00, 0x00, 0x70, 0x00, 0xf8, 0x00, 0xce, 0x00, 0x83, 0x01, 0x00, 0xc3,
0x00, 0xe6, 0x70, 0x3e, 0xf8, 0x1c, 0xce, 0x00, 0x83, 0x01, 0x00, 0xc3,
0x00, 0xe6, 0x00, 0x3e, 0x00, 0x1c, 0x00, 0x00};""",
'wedge': """#define wedge_width 16
#define wedge_height 16
static unsigned char stn_marker_inv_bits[] = {
0xff, 0xff, 0xff, 0x7f, 0xfe, 0x7f, 0xfe, 0x3f, 0xfc, 0x3f, 0xfc, 0x1f,
0xf8, 0x1f, 0xf8, 0x0f, 0xf0, 0x0f, 0xf0, 0x07, 0xe0, 0x07, 0xe0, 0x03,
0xc0, 0x03, 0xc0, 0x01, 0x80, 0x01, 0x80, 0x00};""",
'x': """#define x_width 16
#define x_height 16
static unsigned char x_bits[] = {
0x00, 0x00, 0x06, 0x60, 0x0e, 0x70, 0x1c, 0x38, 0x38, 0x1c, 0x70, 0x0e,
0xe0, 0x07, 0xc0, 0x03, 0xc0, 0x03, 0xe0, 0x07, 0x70, 0x0e, 0x38, 0x1c,
0x1c, 0x38, 0x0e, 0x70, 0x06, 0x60, 0x00, 0x00};"""
}
# X11 bitmaps for map button bar icons
expand_xbm = """#define expand_width 16
#define expand_height 16
static unsigned char expand_bits[] = {
0x3f, 0xfc, 0x07, 0xe0, 0x0f, 0xf0, 0x1d, 0xb8, 0x39, 0x9c, 0x71, 0x8e,
0x60, 0x06, 0x00, 0x00, 0x00, 0x00, 0x61, 0x06, 0x71, 0x8e, 0x39, 0x9c,
0x1d, 0xb8, 0x0f, 0xf0, 0x07, 0xe0, 0x3f, 0xfc};"""
wedges_3_xbm = """#define wedges_3_width 16
#define wedges_3_height 16
static unsigned char wedges_3_bits[] = {
0xff, 0x01, 0xfe, 0x00, 0x7c, 0x00, 0x38, 0x00, 0x10, 0x00, 0x00, 0x00,
0x80, 0xff, 0x00, 0x7f, 0x00, 0x3e, 0x00, 0x1c, 0x00, 0x08, 0xff, 0x01,
0xfe, 0x00, 0x7c, 0x00, 0x38, 0x00, 0x10, 0x00};"""
wedge_sm_xbm = """#define wedge_sm_width 16
#define wedge_sm_height 16
static unsigned char wedge_sm_bits[] = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf8, 0x1f, 0xf8, 0x1f, 0xf0, 0x0f,
0xf0, 0x0f, 0xe0, 0x07, 0xe0, 0x07, 0xc0, 0x03, 0xc0, 0x03, 0x80, 0x01,
0x80, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};"""
circle_xbm = """#define circle_width 16
#define circle_height 16
static unsigned char circle_bits[] = {
0xc0, 0x03, 0xf0, 0x0f, 0x78, 0x1e, 0x1c, 0x38, 0x0e, 0x70, 0x06, 0x60,
0x07, 0xe0, 0x03, 0xc0, 0x03, 0xc0, 0x07, 0xe0, 0x06, 0x60, 0x0e, 0x70,
0x1c, 0x38, 0x78, 0x1e, 0xf0, 0x0f, 0xc0, 0x03};"""
cancel_xbm = """#define cancel_width 16
#define cancel_height 16
static unsigned char cancel_bits[] = {
0xc0, 0x03, 0xf0, 0x0f, 0x78, 0x1e, 0x1c, 0x38, 0x0e, 0x7c, 0x06, 0x6e,
0x07, 0xe7, 0x83, 0xc3, 0xc3, 0xc1, 0xe7, 0xe0, 0x76, 0x60, 0x3e, 0x78,
0x1c, 0x38, 0x78, 0x3e, 0xf0, 0x0f, 0xc0, 0x03};"""
# Color names for map symbols. See https://www.w3schools.com/colors/colors_names.asp.
color_names = ("aliceblue", "antiquewhite", "aqua", "aquamarine", "azure", "beige", "bisque", "black", "blanchedalmond",
"blue", "blueviolet", "brown", "burlywood", "cadetblue", "chartreuse", "chocolate", "coral", "cornflowerblue",
"cornsilk", "crimson", "cyan", "darkblue", "darkcyan", "darkgoldenrod", "darkgray", "darkgrey", "darkgreen",
"darkkhaki", "darkmagenta", "darkolivegreen", "darkorange", "darkorchid", "darkred", "darksalmon", "darkseagreen",
"darkslateblue", "darkslategray", "darkslategrey", "darkturquose", "darkviolet", "deeppink", "deepskyblue",
"dimgray", "dimgrey", "dodgerblue", "firebrick", "floralwhite", "forestgreen", "fuschia", "gainsboro", "ghostwhite",
"gold", "goldenrod", "gray", "grey", "green", "greenyellow", "honeydew", "hotpink", "indianred", "indigo", "ivory",
"khaki", "lavender", "lavenderblush", "lawngreen", "lemonchiffon", "lightblue", "lightcoral", "lightcyan",
"lightgoldenrodyellow", "lightgray", "lightgrey", "lightgreen", "lightpink", "lightsalmon", "lightseagreen",
"lightskyblue", "lightslategray", "lightslategrey", "lightsteelblue", "lightyellow", "lime", "limegreen", "linen",
"magenta", "maroon", "mediumaquamarine", "mediumblue", "mediumorchid", "mediumpurple", "mediumseagreen",
"mediumslateblue", "mediumspringgreen", "mediumturquose", "mediumvioletred", "midnightblue", "mintcream", "mistyrose",
"moccasin", "navajowhite", "navy", "oldlace", "olive", "olivedrab", "orange", "orangered", "orchid", "palegoldenrod",
"palegreen", "paleturquose", "palevioletred", "papayawhip", "peachpuff", "peru", "pink", "plum", "powderblue",
"purple", "rebeccapurple", "red", "rosybrown", "royalblue", "saddlebrown", "salmon", "sandybrown", "seagreen",
"seashell", "sienna", "silver", "skyblue", "slateblue", "slategray", "slategrey", "snow", "springgreen",
"steelblue", "tan", "teal", "thistle", "tomato", "turquoise", "violet", "wheat", "white", "whitesmoke", "yellow",
"yellowgreen")
# A shorter list for interactive selection of the marker color
select_colors = ('aqua', 'black', 'blue', 'blueviolet', 'brown', 'chartreuse', 'cornflowerblue', 'crimson',
'cyan', 'darkblue', 'darkgreen', 'darkmagenta', 'darkorange', 'darkred', 'darkslategray', 'deeppink',
'forestgreen', 'fuschia', 'green', 'greenyellow', 'magenta', 'maroon', 'navy', 'orange', 'orangered',
'purple', 'red', 'violet', 'white', 'yellow', 'yellowgreen')
# List of imported symbol names and paths
imported_symbols = []
# Keys for custom symbols are made up of the color name and the icon name, separated with a space.
custom_icons = {}
# X11 bitmap for the application window icon
win_icon_xbm = """#define window_icon_width 16
#define window_icon_height 16
static unsigned char window_icon_bits[] = {
0xff, 0xff, 0x01, 0x80, 0x01, 0x84, 0x01, 0x8e, 0x01, 0x9f, 0x81, 0xbf,
0x21, 0x80, 0x71, 0x80, 0xf9, 0x80, 0xfd, 0x81, 0x01, 0x84, 0x01, 0x8e,
0x01, 0x9f, 0x81, 0xbf, 0x01, 0x80, 0xff, 0xff};"""
#===== Functions and classes =====
def warning(message, kwargs):
dlg = MsgDialog("Warning", message, parent=kwargs.get('parent'), bgcolor="Gold")
dlg.show()
def fatal_error(message, kwargs):
dlg = MsgDialog("Fatal Error", message, parent=kwargs.get('parent'), bgcolor="Red")
dlg.show()
sys.exit()
class CsvFile(object):
def __init__(self, csvfname, junk_header_lines=0, dialect=None):
self.csvfname = csvfname
self.junk_header_lines = junk_header_lines
self.lineformat_set = False # Indicates whether delimiter, quotechar, and escapechar have been set
self.delimiter = None
self.quotechar = None
self.escapechar = None
self.inf = None
self.colnames = None
self.rows_read = 0
# Python 3 only
self.reader = csv.reader(open(csvfname, mode="rt", newline=''), dialect=dialect)
def __next__(self):
row = next(self.reader)
self.rows_read = self.rows_read + 1
return row
def next(self):
row = next(self.reader)
self.rows_read = self.rows_read + 1
if self.rows_read == 1:
self.colnames = row
return row
def __iter__(self):
return self
def treeview_sort_column(tv, col, reverse):
# Sort columns in Tkinter Treeview. From https://stackoverflow.com/questions/1966929/tk-treeview-column-sort#1967793
colvals = [(tv.set(k, col), k) for k in tv.get_children()]
colvals.sort(reverse=reverse)
# Rearrange items in sorted positions
for index, (val, k) in enumerate(colvals):
tv.move(k, '', index)
# Reverse sort next time
tv.heading(col, command=lambda: treeview_sort_column(tv, col, not reverse))
def set_tv_headers(tvtable, column_headers, colwidths, charpixels):
pixwidths = [charpixels * col for col in colwidths]
for i in range(len(column_headers)):
hdr = column_headers[i]
tvtable.column(hdr, width=pixwidths[i])
tvtable.heading(hdr, text=hdr, command=lambda _col=hdr: treeview_sort_column(tvtable, _col, False))
def fill_tv_table(tvtable, rowset, status_label=None):
for i, row in enumerate(rowset):
enc_row = [c if c is not None else '' for c in row]
tvtable.insert(parent='', index='end', iid=str(i), values=enc_row)
if status_label is not None:
status_label.config(text = " %d rows" % len(rowset))
def treeview_table(parent, rowset, column_headers, select_mode="none"):
# Creates a TreeView table containing the specified data, with scrollbars and status bar
# in an enclosing frame.
# This does not grid the table frame in its parent widget.
# Returns a tuple of 0: the frame containing the table, and 1: the table widget itself.
nrows = range(len(rowset))
ncols = range(len(column_headers))
hdrwidths = [len(column_headers[j]) for j in ncols]
if len(rowset) > 0:
if sys.version_info < (3,):
datawidthtbl = [[len(rowset[i][j] if isinstance(rowset[i][j], str) else type(u"")(rowset[i][j])) for i in nrows] for j in ncols]
else:
datawidthtbl = [[len(rowset[i][j] if isinstance(rowset[i][j], str) else str(rowset[i][j])) for i in nrows] for j in ncols]
datawidths = [max(cwidths) for cwidths in datawidthtbl]
else:
datawidths = hdrwidths
colwidths = [max(hdrwidths[i], datawidths[i]) for i in ncols]
# Set the font.
ff = tkfont.nametofont("TkFixedFont")
tblstyle = ttk.Style()
tblstyle.configure('tblstyle', font=ff)
charpixels = int(1.3 * ff.measure(u"0"))
tableframe = ttk.Frame(master=parent, padding="3 3 3 3")
statusframe = ttk.Frame(master=tableframe)
# Create and configure the Treeview table widget
tv_widget = ttk.Treeview(tableframe, columns=column_headers, selectmode=select_mode, show="headings")
tv_widget.configure()["style"] = tblstyle
ysb = ttk.Scrollbar(tableframe, orient='vertical', command=tv_widget.yview)
xsb = ttk.Scrollbar(tableframe, orient='horizontal', command=tv_widget.xview)
tv_widget.configure(yscrollcommand=ysb.set, xscrollcommand=xsb.set)
# Status bar
parent.statusbar = ttk.Label(statusframe, text=" %d rows" % len(rowset), relief=tk.RIDGE, anchor=tk.W)
tableframe.statuslabel = parent.statusbar
# Fill the Treeview table widget with data
set_tv_headers(tv_widget, column_headers, colwidths, charpixels)
fill_tv_table(tv_widget, rowset, parent.statusbar)
# Place the table
tv_widget.grid(column=0, row=0, sticky=tk.NSEW)
ysb.grid(column=1, row=0, sticky=tk.NS)
xsb.grid(column=0, row=1, sticky=tk.EW)
statusframe.grid(column=0, row=3, sticky=tk.EW)
tableframe.columnconfigure(0, weight=1)
tableframe.rowconfigure(0, weight=1)
# Place the status bar
parent.statusbar.pack(side=tk.BOTTOM, fill=tk.X)
#
return tableframe, tv_widget
def dquote(v):
# Returns a double-quoted value if it is not an identifier.
if not v.isidentifier():
return '"%s"' % v
else:
return v
def db_colnames(tbl_hdrs):
# Takes a list of table headers and returns a list of database column names,
# with double-quoting of any name that is not all alphanumeric and starts with
# an alphabetic.
colnames = []
for hdr in tbl_hdrs:
colnames.append(dquote(hdr))
return colnames
def isint(v):
# Missing values match and will be handled by 'conv_int()'
if v is None or (type(v) is str and v.strip() == ''):
return True
if type(v) == int:
return True
if type(v) == float:
return False
try:
int(v)
return True
except ValueError:
return False
except TypeError:
return False
def conv_int(v):
if v is None or (type(v) is str and v.strip() == ''):
return None
try:
rv = int(v)
return rv
except:
return None
def isfloat(v):
# Missing values match and will be handled by 'conv_float()'
if v is None or (type(v) is str and v.strip() == ''):
return True
try:
float(v)
return True
except ValueError:
return False
except TypeError:
return False
def conv_float(v):
if v is None or (type(v) is str and v.strip() == ''):
return None
try:
rv = float(v)
return rv
except:
return None
def isboolean(v):
return parse_boolean(v) is not None
def conv_datetime(v):
if v is None or (type(v) is str and v.strip() == ''):
return None
try:
rv = parse_datetime(v)
except:
try:
d = parse_date(v)
rv = datetime.datetime.combine(d, datetime.datetime.min.time())
except:
return None
return rv
def dt_type(v):
# Type of date/time: timestamp, date, or None
if type(v) is str and v.strip() == '':
v = None
if parse_date(v):
return "date"
if parse_datetime(v):
return "timestamp"
if parse_datetimetz(v):
return "timestamptz"
return None
def data_type(v):
# Characterizes the value v as one of a simple set of data types.
# Returns "timestamp", "date", "timestamptz", "int", "float", "boolean", or "string"
if v is None or (type(v) is str and v.strip() == ''):
return None
if isint(v):
return "int"
if isfloat(v):
return "float"
dt = dt_type(v)
if dt is not None:
return dt
if isboolean(v):
return "boolean"
return "string"
# Lookup table for priorities among data types
data_type_pair_priorities = {
"int" : {"int":"int", "float":"float", "date":"string", "timestamp":"string", "timestamptz":"string", "boolean":"string", "string":"string", None:"int"},
"float" : {"int":"float", "float":"float", "date":"string", "timestamp":"string", "timestamptz":"string", "boolean":"string", "string":"string", None:"float"},
"date" : {"int":"string", "float":"string", "date":"date", "timestamp":"timestamp", "timestamptz":"string", "boolean":"string", "string":"string", None:"date"},
"timestamp" : {"int":"string", "float":"string", "date":"timestamp", "timestamp":"timestamp", "datetimetz":"string", "boolean":"string", "string":"string", None:"timestamp"},
"timestamptz" : {"int":"string", "float":"string", "date":"string", "timestamp":"string", "timestamptz":"timestamptz", "boolean":"string", "string":"string", None:"timestamptz"},
"string" : {"int":"string", "float":"string", "date":"string", "timestamp":"string", "timestamptz":"string", "boolean":"string", "string":"string", None:"string"},
"boolean" : {"int":"string", "float":"string", "date":"string", "timestamp":"string", "timestamptz":"string", "boolean":"boolean", "string":"string", None:"boolean"},
None : {"int":"int", "float":"float", "date":"date", "timestamp":"timestamp", "timestamptz":"timestamptz", "boolean":"boolean", "string":"string", None:None}
}
def priority_data_type(dt1, dt2):
return data_type_pair_priorities[dt1][dt2]
def data_type_cast_fn(data_type_str):
if data_type_str == "string":
return str
elif data_type_str == "date":
return parse_date
elif data_type_str == "timestamp":
return conv_datetime
elif data_type_str == "timestamptz":
return parse_datetimetz
elif data_type_str == "int":
return conv_int
elif data_type_str == "float":
return conv_float
elif data_type_str == "boolean":
return parse_boolean
def common_data_type(values):
# Returns a data type common to all the values in the list.
# This is "string" unless all types are the same, or consist only of ints and floats.
# Null (None) values are ignored. If all values are null, return "string".
val2 = [v for v in values if v is not None and not (type(v) is str and v.strip() == '')]
if len(val2) == 0:
return "string"
else:
types = [data_type(v) for v in val2]
typeset = set(types)
if len(typeset) == 1:
return types[0]
elif len(typeset) == 2:
uq_types = list(typeset)
if 'int' in uq_types and 'float' in uq_types:
return 'float'
else:
if 'date' in uq_types and 'timestamp' in uq_types:
return 'timestamp'
else:
return "string"
else:
return "string"
def set_data_types_core(headers, rows):
# Column-by-column processing is slightly faster than row-by-row processing.
coltypes = []
for i, colname in enumerate(headers):
datavals = [row[i] for row in rows]
dt = None
for d in datavals:
ndt = data_type(d)
if ndt != dt:
dt = priority_data_type(ndt, dt)
if dt == "string":
break
non_null = [d for d in datavals if d is not None and not (type(d) is str and d.strip() == '')]
nullcount = len(datavals) - len(non_null)
uniquevals = len(set(non_null))
coltypes.append((colname, dt, nullcount, uniquevals))
return coltypes
def set_data_types(headers, rows, q):
q.put(set_data_types_core(headers, rows))
# Translations to SQLite type affinity names
sqlite_type_x = {'int': 'INTEGER', 'float': 'REAL', 'string': 'TEXT', 'timestamptz': 'TEXT',
'timestamp': 'TEXT', 'date': 'TEXT', 'boolean': 'INTEGER'}
def center_window(win, x_offset=0, y_offset=0):
win.update_idletasks()
m = re.match(r"(\d+)x(\d+)\+(-?\d+)\+(-?\d+)", win.geometry())
if m is not None:
wwd = int(m.group(1))
wht = int(m.group(2))
swd = win.winfo_screenwidth()
sht = win.winfo_screenheight()
xpos = (swd/2) - (wwd/2) + x_offset
ypos = (sht/2) - (wht/2) + y_offset
win.geometry("%dx%d+%d+%d" % (wwd, wht, xpos, ypos))
def raise_window(win):
win.attributes('-topmost', 1)
win.attributes('-topmost', 0)
def shift_window(win, x_offset=0, y_offset=0):
win.update_idletasks()
m = re.match(r"(\d+)x(\d+)\+(-?\d+)\+(-?\d+)", win.geometry())
if m is not None:
xpos = int(m.group(1)) + x_offset
ypos = int(m.group(2)) + y_offset
win.geometry("+%d+%d" % (xpos, ypos))
class MapUI(object):
def __init__(self, src_name, message, lat_col, lon_col, crs=4326, sheet=None,
label_col=None, symbol_col=None, color_col=None,
map_export_file=None, export_time_sec=10):
self.win = tk.Tk()
self.win.withdraw()
self.loading_dlg = LoadingDialog(self.win)
self.loading_dlg.display("Preparing map")
# Size and position window.
self.win.geometry("1200x1000")
center_window(self.win)
self.win.protocol("WM_DELETE_WINDOW", self.cancel)
self.data_src_name = src_name
self.win.title("Map of %s" % src_name)
# Patch ImageTk.PhotoImage.__del__
ImageTk.PhotoImage.__del__ = new_img_del
# Set the font
self.mapfont = self.makefont()
# Set the application window icon
#win_icon = tk.BitmapImage(data=win_icon_xbm, foreground="black", background="tan")
#self.win.iconbitmap(win_icon)
# The markers for all the locations in the data table
self.loc_map_markers = []
# The markers for the selected location(s)
self.sel_map_markers = []
# The number of table rows without coordinates
self.missing_latlon = 0
# Map bounds
self.min_lat = None
self.max_lat = None
self.min_lon = None
self.max_lon = None
# List of PlotDialog objects, so they can have data pushed or be deleted.
self.plot_list = []
# Database connection is set in 'add_data()'; variables are initialized here
self.dbtmpdir = None
self.dbname = None
self.db = None
# Create default markers for the map
self.loc_marker_icon = self.set_get_loc_marker()
# Initializes selection marker to the global settings
self.set_sel_marker(select_symbol, select_color)
# Create icons for the buttonbar
expand_icon = tk.BitmapImage(data=expand_xbm, foreground="black")
focus_icon = tk.BitmapImage(data=wedge_sm_xbm, foreground="red")
zoom_sel_icon = tk.BitmapImage(data=wedges_3_xbm, foreground="red")
unselect_icon = tk.BitmapImage(data=cancel_xbm, foreground="black")
# Use stacked frames for the main application window components. Map and table in a PanedWindow.
msgframe = ttk.Frame(self.win, padding="3 2")
ctrlframe = ttk.Frame(self.win, padding="3 2")
datapanes = ttk.PanedWindow(self.win, orient=tk.VERTICAL)
mapframe = ttk.Frame(datapanes, borderwidth=2, relief=tk.RIDGE)
self.tblframe = ttk.Frame(datapanes, padding="3 2")
datapanes.add(mapframe, weight=1)
datapanes.add(self.tblframe, weight=1)
# Allow vertical resizing of map and table frames, not of message and control frames
self.win.columnconfigure(0, weight=1)
self.win.rowconfigure(0, weight=0) # msgframe
self.win.rowconfigure(1, weight=0) # ctrlframe
self.win.rowconfigure(2, weight=1) # datapanes
# Grid all the main frames
msgframe.grid(row=0, column=0, sticky=tk.NSEW)
ctrlframe.grid(row=1, column=0, sticky=tk.W)
datapanes.grid(row=2, column=0, sticky=tk.NSEW)
# Populate the message frame
self.msg_label = ttk.Label(msgframe, text=message)
def wrap_msg(event):
self.msg_label.configure(wraplength=event.width - 5)
self.msg_label.bind("", wrap_msg)
self.msg_label.grid(column=0, row=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
msgframe.columnconfigure(0, weight=1)
msgframe.rowconfigure(0, weight=1)
# Populate the map control frame
ctrlframe.rowconfigure(0, weight=0)
ctrlframe.columnconfigure(0, weight=0)
# Basemap controls and buttons
self.basemap_label = ttk.Label(ctrlframe, text="Basemap:", anchor="w")
self.basemap_label.grid(row=0, column=0, padx=(5, 5), pady=(2, 5), sticky=tk.W)
global initial_basemap
bm_name = initial_basemap
if bm_name not in bm_servers:
bm_name = tuple(bm_servers.keys())[0]
initial_basemap = bm_name
self.basemap_var = tk.StringVar(self.win, bm_name)
self.map_option_menu = ttk.Combobox(ctrlframe, state="readonly", textvariable=self.basemap_var,
values=self.available_tile_servers(), width=18)
self.map_option_menu.bind('<>', self.change_basemap)
self.map_option_menu.grid(row=0, column=1, padx=(5, 30), pady=(2, 5), sticky=tk.W)
# Multi-select option
def ck_changed():
ck = self.multiselect_var.get()
if ck == '0':
self.unselect_map()
self.tbl.configure(selectmode = tk.BROWSE)
else:
self.tbl.configure(selectmode = tk.EXTENDED)
self.set_status()
# Set by global variable
self.multiselect_var = tk.StringVar(self.win, multiselect)
ck_multiselect = ttk.Checkbutton(ctrlframe, text="Multi-select", variable=self.multiselect_var, command=ck_changed)
ck_multiselect.grid(row=0, column=2, sticky=tk.W, padx=(0, 20))
# Map control buttons
zoomsel_btn = ttk.Button(ctrlframe, text="Zoom selected", image=zoom_sel_icon, compound=tk.LEFT, command=self.zoom_selected)
zoomsel_btn.image = zoom_sel_icon
zoomsel_btn.grid(row=0, column=3, sticky=tk.W)
expand_btn = ttk.Button(ctrlframe, text="Zoom full", image=expand_icon, compound=tk.LEFT, command=self.zoom_full)
expand_btn.image = expand_icon
expand_btn.grid(row=0, column=4, sticky=tk.W)
focus_btn = ttk.Button(ctrlframe, text="Center", image=focus_icon, compound=tk.LEFT, command=self.focus_map)
focus_btn.image = focus_icon
focus_btn.grid(row=0, column=5, sticky=tk.W)
unselect_btn = ttk.Button(ctrlframe, text="Un-select", image=unselect_icon, compound=tk.LEFT, command=self.unselect_map)
unselect_btn.image = unselect_icon
unselect_btn.grid(row=0, column=6, sticky=tk.W)
# Map widget
mapframe.rowconfigure(0, weight=1)
mapframe.columnconfigure(0, weight=1)
self.map_widget = tkmv.TkinterMapView(mapframe, height=600, width=600, corner_radius=0)
if initial_basemap != "OpenMapServer":
tileserver = self.tile_url(initial_basemap)
self.map_widget.set_tile_server(tileserver)
self.map_widget.grid(row=0, column=0, sticky=tk.NSEW)
# Remove the splash screen message and show the main UI.
self.loading_dlg.hide()
# Re-setting the size is necessary on Windows in at least some environments.
self.win.geometry("1200x1000")
self.win.deiconify()
# Get data if not provided as arguments
if src_name is None:
sdsd = SelDataSrcDialog(parent=self.win, mapui=self)
src_name, label_col, lat_col, lon_col, crs, symbol_col, color_col, message, headers, rows = sdsd.select()
if src_name is None:
self.cancel()
else:
# src_name is a filename, either CSV or spreadsheet
self.loading_dlg.display("Loading data")
fn, ext = os.path.splitext(src_name)
if ext.lower() == ".csv":
try:
headers, rows = file_data(src_name, 0)
except:
self.loading_dlg.hide_all()
fatal_error("Could not read data from %s" % src_data, kwargs={'parent': self.win})
else:
if sheet is None:
self.loading_dlg.hide_all()
fatal_error("A sheet name must be specified for spreadsheets", kwargs={'parent': self.win})
try:
if ext.lower() == '.ods':
headers, rows = ods_data(src_name, sheet, 0)
else:
headers, rows = xls_data(src_name, sheet, 0)
except:
self.loading_dlg.hide_all()
fatal_error("Could not read table from %s, sheet %s" % (src_name, sheet), kwargs={'parent': self.win})
self.loading_dlg.hide()
self.msg_label.config(text=message)
# Source and possibly un-projected crs
self.src_crs = crs
self.crs = crs
# Populate the table frame
self.tblframe.rowconfigure(0, weight=1)
self.tblframe.columnconfigure(0, weight=1)
try:
self.tableframe, self.tbl = self.add_data(rows, headers, lat_col, lon_col, label_col, symbol_col, color_col)
except:
self.loading_dlg.hide_all()
fatal_error("Cannot load data. Check latitude, longitude, and CRS values.", kwargs={'parent': self.win})
self.tableframe.grid(column=0, row=0, sticky=tk.NSEW)
self.set_tbl_selectmode()
self.set_status()
# Add menu
self.add_menu(table_object = self.tbl, column_headers=headers)
self.tbl.bind('', self.mark_map)
# Other key bindings
self.win.protocol("WM_DELETE_WINDOW", self.cancel)
# Limit resizing
self.win.minsize(width=860, height=640)
# Set table status message
self.set_status()
# Just export the map and quit?
if map_export_file is not None:
self.imageoutputfile = map_export_file
self.win.after(export_time_sec * 1000, self.export_map_and_quit)
self.loading_dlg.hide()
def makefont(self):
global label_font, label_size, label_bold
fams = tkfont.families()
if not label_font in fams:
alt_fonts = ["Liberation Sans", "Arial", "Helvetica", "Nimbus Sans", "Liberation Sans", "Trebuchet MS", "Tahoma", "DejaVu Sans", "Bitstream Vera Sans", "Open Sans"]
font_found = False
for f in alt_fonts:
if f in fams:
label_font = f
font_found = True
break
if not font_found:
label_font = tkfont.nametofont("TkDefaultFont").actual()["family"]
boldstr = "normal" if not label_bold else "bold"
return tkfont.Font(family=label_font, size=label_size, weight=boldstr)
def available_tile_servers(self):
# Return a list of those without API keys or for which API keys are provided
avail = []
for k in bm_servers:
if self.tile_url(k) is not None:
avail.append(k)
return avail
def tile_url(self, source_name):
# Return the URL with the API key replaced, unless it is not available.
source_url = bm_servers[source_name]
if "" in source_url.lower():
if source_name in api_keys:
api_key = api_keys[source_name]
for matched in re.findall("", source_url, re.IGNORECASE):
source_url = source_url.replace(matched, api_key)
return source_url
else:
return None
else:
return source_url
def mark_map(self, event):
# Highlight the selected row(s) in the table and get the coordinates to map it
if self.tbl.selection():
new_markers = []
for sel_row in self.tbl.selection():
rowdata = self.tbl.item(sel_row)["values"]
try:
lat_val = float(rowdata[self.lat_index])
except:
lat_val = None
try:
lon_val = float(rowdata[self.lon_index])
except:
lon_val = None
if lon_val is not None and lat_val is not None:
new_marker = self.map_widget.set_marker(lat_val, lon_val, icon=self.sel_marker_icon)
new_markers.append(new_marker)
for m in self.sel_map_markers:
m.delete()
self.sel_map_markers = new_markers
else:
for m in self.sel_map_markers:
m.delete()
self.update_plot_data()
self.set_status()
def set_sel_marker(self, symbol, color):
select_marker = tk.BitmapImage(data=icon_xbm[symbol], foreground=color)
mkr_key = "%s %s" % (color, symbol)
if mkr_key not in custom_icons:
custom_icons[mkr_key] = tk.BitmapImage(data=icon_xbm[symbol], foreground=color)
select_marker = custom_icons[mkr_key]
self.sel_marker_icon = select_marker
def redraw_sel_markers(self):
new_markers = []
for mkr in self.sel_map_markers:
mposition = mkr.position
micon = mkr.icon
mkr.delete()
new_marker = self.map_widget.set_marker(mposition[0], mposition[1], icon=micon)
new_markers.append(new_marker)
self.sel_map_markers = new_markers
def draw_sel_markers(self):
for mkr in self.sel_map_markers:
mkr.draw()
def set_get_loc_marker(self):
mkr_key = "%s %s" % (location_color, location_marker)
if mkr_key not in custom_icons:
custom_icons[mkr_key] = tk.BitmapImage(data=icon_xbm[location_marker], foreground=location_color)
return custom_icons[mkr_key]
def redraw_loc_markers(self, tdata):
# tdata is the treeview control containing the data table.
self.loading_dlg.display("Redrawing markers")
while len(self.loc_map_markers) > 0:
self.loc_map_markers.pop().delete()
self.draw_loc_markers(tdata)
self.loading_dlg.hide()
def draw_loc_markers(self, tdata):
# tdata is the treeview control containing the data table.
# Also set the number of rows missing coordinates and the bounding box.
self.loading_dlg.display("Preparing map")
self.missing_latlon = 0
for row_id in tdata.get_children():
rowdata = tdata.item(row_id)["values"]
try:
lat_val = float(rowdata[self.lat_index])
except:
lat_val = None
try:
lon_val = float(rowdata[self.lon_index])
except:
lon_val = None
if lon_val is not None and lat_val is not None:
if self.min_lat is None or lat_val < self.min_lat:
self.min_lat = lat_val
if self.max_lat is None or lat_val > self.max_lat:
self.max_lat = lat_val
if self.min_lon is None or lon_val < self.min_lon:
self.min_lon = lon_val
if self.max_lon is None or lon_val > self.max_lon:
self.max_lon = lon_val
if self.color_index is None and self.symbol_index is None:
marker_icon = self.loc_marker_icon
else:
if self.color_index is None or not use_data_color:
color = location_color
else:
color = rowdata[self.color_index].lower()
if color not in color_names:
color = location_color
if self.symbol_index is None or not use_data_marker:
symbol = location_marker
else:
symbol = rowdata[self.symbol_index].lower()
if symbol not in icon_xbm:
symbol = location_marker
mkr_key = "%s %s" % (color, symbol)
if mkr_key not in custom_icons:
custom_icons[mkr_key] = tk.BitmapImage(data=icon_xbm[symbol], foreground=color)
marker_icon = custom_icons[mkr_key]
if self.label_index is not None:
lbl = rowdata[self.label_index]
mkr = self.map_widget.set_marker(lat_val, lon_val, icon=marker_icon,
text=lbl, font=self.mapfont, text_color=label_color,
command=self.map_sel_table)
self.loc_map_markers.append(mkr)
else:
mkr = self.map_widget.set_marker(lat_val, lon_val, icon=marker_icon, command=self.map_sel_table)
self.loc_map_markers.append(mkr)
else:
self.missing_latlon += 1
self.update_plot_data()
self.loading_dlg.hide()
def add_data(self, rows, headers, lat_col, lon_col, label_col, symbol_col, color_col):
# Re-set all data-specific variables and widgets
self.headers = headers
self.rows = rows
self.lat_col = lat_col
self.lon_col = lon_col
self.src_lat_col = lat_col
self.src_lon_col = lon_col
self.lat_4326_col = None
self.lon_4326_col = None
self.label_col = label_col
self.symbol_col = symbol_col
self.color_col = color_col
self.lat_index = headers.index(lat_col)
self.lon_index = headers.index(lon_col)
self.src_lat_index = headers.index(lat_col)
self.src_lon_index = headers.index(lon_col)
self.label_index = headers.index(label_col) if label_col is not None and label_col != '' else None
self.symbol_index = headers.index(symbol_col) if symbol_col is not None and symbol_col != '' else None
self.color_index = headers.index(color_col) if color_col is not None and color_col != '' else None
if self.crs != 4326:
try:
from pyproj import CRS, Transformer
except:
self.loading_dlg.hide_all()
fatal_error("The pyproj library is required to re-project spatial coordinates", kwargs={})
try:
crs_proj = CRS(self.crs)
except:
self.loading_dlg.hide_all()
fatal_error("Invalid CRS (%s)" % self.crs, kwargs={})
if self.lat_4326_col is None:
for colname in ('lat_4326', 'latitude_4326', 'y_4326', 'unprojected_lat'):
if colname not in headers:
self.lat_4326_col = colname
headers.append(colname)
break
if self.lon_4326_col is None:
for colname in ('lon_4326', 'longitude_4326', 'x_4326', 'unprojected_lon'):
if colname not in headers:
self.lon_4326_col = colname
headers.append(colname)
break
self.lat_col = self.lat_4326_col
self.lon_col = self.lon_4326_col
self.lat_index = headers.index(self.lat_col)
self.lon_index = headers.index(self.lon_col)
crs_4326 = CRS(4326)
reproj = Transformer.from_crs(crs_proj, crs_4326, always_xy=True)
for r in rows:
y = r[self.src_lat_index]
x = r[self.src_lon_index]
if y is not None and y != 0 and x is not None and x != 0:
try:
newx, newy = reproj.transform(x, y)
except:
newx = None
newy = None
else:
newx = None
newy = None
if len(r) < len(headers):
r.extend([newy, newx])
else:
r[self.lat_index] = newy
r[self.lon_index] = newx
# Populate the treeview
tframe, tdata = treeview_table(self.tblframe, rows, headers, "browse")
self.table_row_count = len(tdata.get_children())
# Scan the table, put points on the map, and find the map extent.
self.min_lat = self.max_lat = self.min_lon = self.max_lon = None
self.sel_map_markers = []
self.missing_latlon = 0
self.draw_loc_markers(tdata)
# Set the map extent based on coordinates.
self.map_widget.fit_bounding_box((self.max_lat, self.min_lon), (self.min_lat, self.max_lon))
# Copy data from the treeview table to the database. This includes the treeview IDs
# Database connection
if self.db is not None:
self.db.close()
if self.dbtmpdir is not None:
self.dbtmpdir.cleanup(ignore_cleanup_errors = True)
if temp_dbfile:
self.dbtmpdir = tempfile.TemporaryDirectory()
self.dbname = os.path.join(self.dbtmpdir.name, "mapdata.db")
self.db = sqlite3.connect(self.dbname)
else:
self.tmpdir = None
self.dbname = None
self.db = sqlite3.connect(":memory:")
cur = self.db.cursor()
colnames = db_colnames(headers)
colnames.append("treeviewid")
cur.execute("create table mapdata (%s);" % ",".join(colnames))
tbldata = []
for row_id in tdata.get_children():
row_vals = tdata.item(row_id)["values"]
row_vals = [None if isinstance(x, str) and x.strip() == '' else x for x in row_vals]
row_vals.append(row_id)
tbldata.append(row_vals)
params = ",".join(['?'] * len(colnames))
cur.executemany("insert into mapdata values (%s)" % params, tbldata)
cur.close()
# Initial value for user-entered WHERE clause
self.whereclause = ""
# Determe data types for use in table statistics display and in column selection for plotting
self.data_types = None
if os.name == 'posix':
self.data_types_queue = multiprocessing.Queue()
self.data_types_process = multiprocessing.Process(target=set_data_types, args=(headers, rows, self.data_types_queue))
self.data_types_process.start()
else:
self.data_types = set_data_types_core(headers, rows)
# Return frame and data table
return tframe, tdata
def remove_data(self):
while len(self.sel_map_markers) > 0:
self.sel_map_markers.pop().delete()
while len(self.loc_map_markers) > 0:
self.loc_map_markers.pop().delete()
self.map_widget.delete_all_marker()
self.close_all_plots()
self.tableframe.destroy()
self.tbl.destroy()
def set_tbl_selectmode(self):
ck = self.multiselect_var.get()
if ck == '0':
self.tbl.configure(selectmode = tk.BROWSE)
else:
self.tbl.configure(selectmode = tk.EXTENDED)
self.tbl.bind('', self.mark_map)
def replace_data(self, rows, headers, lat_col, lon_col, label_col, symbol_col, color_col):
self.remove_data()
try:
self.tableframe, self.tbl = self.add_data(rows, headers, lat_col, lon_col, label_col, symbol_col, color_col)
except:
self.loading_dlg.hide_all()
fatal_error("Cannot load data. Check latitude, longitude, and CRS values.", kwargs={'parent': self.win})
self.tableframe.grid(column=0, row=0, sticky=tk.NSEW)
self.set_tbl_selectmode()
self.set_status()
def new_data_file(self):
dfd = DataFileDialog()
fn, id_col, lat_col, lon_col, crs, sym_col, col_col, msg, headers, rows = dfd.get_datafile()
if fn is not None and fn != '':
self.crs = crs
self.data_src_name = os.path.abspath(fn)
base_fn = os.path.basename(fn)
self.win.title("Map of %s" % base_fn)
self.replace_data(rows, headers, lat_col, lon_col, id_col, sym_col, col_col)
if msg is not None and msg != '':
self.msg_label['text'] = msg
def new_spreadsheet_file(self):
dfd = ImportSpreadsheetDialog(self.win, self)
fn, id_col, lat_col, lon_col, crs, sym_col, col_col, msg, headers, rows = dfd.get_datafile()
if fn is not None and fn != '':
self.crs = crs
self.data_src_name = os.path.abspath(fn)
base_fn = os.path.basename(fn)
self.win.title("Map of %s" % base_fn)
self.replace_data(rows, headers, lat_col, lon_col, id_col, sym_col, col_col)
if msg is not None and msg != '':
self.msg_label['text'] = msg
def new_db_table(self):
dbd = DbConnectDialog(self.win, self)
tablename, id_col, lat_col, lon_col, crs, sym_col, col_col, desc, headers, rows = dbd.get_data()
if tablename is not None and tablename != '':
self.crs = crs
self.win.title("Map of %s" % tablename)
self.replace_data(rows, headers, lat_col, lon_col, id_col, sym_col, col_col)
if desc is not None and desc != '':
self.msg_label['text'] = desc
def zoom_full(self):
self.map_widget.fit_bounding_box((self.max_lat, self.min_lon), (self.min_lat, self.max_lon))
def zoom_selected(self):
if len(self.sel_map_markers) > 0:
if len(self.sel_map_markers) == 1:
self.focus_map()
else:
min_lat = max_lat = min_lon = max_lon = None
for m in self.sel_map_markers:
lat, lon = m.position
if min_lat is None or lat < min_lat:
min_lat = lat
if max_lat is None or lat > max_lat:
max_lat = lat
if min_lon is None or lon < min_lon:
min_lon = lon
if max_lon is None or lon > max_lon:
max_lon = lon
self.map_widget.fit_bounding_box((max_lat, min_lon), (min_lat, max_lon))
def focus_map(self):
# Center the map on the last marker
if len(self.sel_map_markers) > 0:
m = self.sel_map_markers[-1]
self.map_widget.set_position(m.position[0], m.position[1])
def unselect_map(self):
for m in self.sel_map_markers:
self.map_widget.delete(m)
self.tbl.selection_remove(*self.tbl.selection())
self.sel_map_markers = []
self.update_plot_data()
self.set_status()
def change_basemap(self, *args):
new_map = self.basemap_var.get()
tileserver = self.tile_url(new_map)
self.map_widget.set_tile_server(tileserver)
def map_sel_table(self, marker):
# Highlight the table row for the clicked map marker
lat, lon = marker.position
if self.multiselect_var.get() == '0':
for mkr in self.sel_map_markers:
self.map_widget.delete(mkr)
self.sel_map_markers = []
self.tbl.selection_remove(*self.tbl.selection())
for row_id in self.tbl.get_children():
rowdata = self.tbl.item(row_id)["values"]
try:
lat_val = float(rowdata[self.lat_index])
except:
lat_val = None
try:
lon_val = float(rowdata[self.lon_index])
except:
lon_val = None
if lon_val is not None and lat_val is not None:
if lat_val == lat and lon_val == lon:
self.tbl.selection_add(row_id)
self.tbl.see(row_id)
new_marker = self.map_widget.set_marker(lat, lon, icon=self.sel_marker_icon)
if not new_marker in self.sel_map_markers:
self.sel_map_markers.append(new_marker)
self.update_plot_data()
self.set_status()
def set_status(self):
statusmsg = " %d rows" % self.table_row_count
if self.missing_latlon > 0:
statusmsg = statusmsg + " (%d without lat/lon)" % self.missing_latlon
if len(self.tbl.selection()) > 0:
statusmsg = statusmsg + " | %s selected" % len(self.tbl.selection())
if self.multiselect_var.get() == "1":
statusmsg = statusmsg + " | Ctrl-click to select multiple rows"
self.tblframe.statusbar.config(text = statusmsg)
def get_all_data(self, column_list):
# Plotting support. Return all data for the specified columns as a list of column-oriented lists.
res = []
for c in column_list:
i = self.headers.index(c)
res.append([row[i] for row in self.rows])
return res
def get_sel_data(self, column_list):
# Plotting support. Return data from selected rows for the specified columns, as a list of lists.
res = [[] for _ in column_list]
indices = [self.headers.index(c) for c in column_list]
for sel_row in self.tbl.selection():
datarow = self.tbl.item(sel_row)["values"]
for i, index in enumerate(indices):
res[i].append(datarow[index])
return res
def update_plot_data(self):
for plot in self.plot_list:
if plot.sel_only_var.get() == "1" and plot.auto_update:
plot.q_redraw()
def clone_plot(self, plot_obj):
if self.data_types is None:
self.loading_dlg.display("Evaluating data types")
self.data_types = self.data_types_queue.get()
self.data_types_process.join()
self.data_types_process.close()
self.loading_dlg.hide()
clone = PlotDialog(self, self.data_types)
self.plot_list.append(clone)
clone.dlg.geometry(plot_obj.dlg.geometry())
shift_window(clone.dlg, x_offset=10, y_offset=10)
clone.type_var.set(plot_obj.type_var.get())
clone.sel_only_var.set(plot_obj.sel_only_var.get())
clone.autoupdate_var.set(plot_obj.autoupdate_var.get())
clone.x_var.set(plot_obj.x_var.get())
clone.y_var.set(plot_obj.y_var.get())
clone.xlog_var.set(plot_obj.xlog_var.get())
clone.ylog_var.set(plot_obj.ylog_var.get())
clone.x_sel["values"] = copy.copy(plot_obj.x_sel["values"])
clone.y_sel["values"] = copy.copy(plot_obj.y_sel["values"])
clone.x_sel["state"] = plot_obj.x_sel["state"]
clone.y_sel["state"] = plot_obj.y_sel["state"]
clone.xlog_ck["state"] = plot_obj.xlog_ck["state"]
clone.ylog_ck["state"] = plot_obj.ylog_ck["state"]
clone.data_btn["state"] = plot_obj.data_btn["state"]
clone.plot_data_btn["state"] = plot_obj.plot_data_btn["state"]
clone.dlg.bind("", clone.do_help)
clone.dlg.bind("", clone.clone_plot)
clone.dlg.bind("", clone.do_close)
clone.dlg.bind("", clone.do_close)
clone.dlg.bind("", clone.set_title)
clone.dlg.bind("", clone.set_xlabel)
clone.dlg.bind("", clone.set_ylabel)
if clone.type_var.get() == "Histogram":
clone.dlg.bind("", clone.set_bins)
clone.dataset = copy.copy(plot_obj.dataset)
clone.plot_data = copy.copy(plot_obj.plot_data)
clone.data_labels = copy.copy(plot_obj.data_labels)
clone.plot_data_labels = copy.copy(plot_obj.plot_data_labels)
clone.q_redraw(get_data=False)
raise_window(clone.dlg)
clone.dlg.focus()
def remove_plot(self, plot_obj):
# For use by the plot 'do_close()' method.
try:
self.plot_list.remove(plot_obj)
except:
pass
def close_plot(self, plot_obj):
try:
plot_obj.do_close()
self.remove_plot()
except:
pass
def close_all_plots(self):
while len(self.plot_list) > 0:
self.plot_list[0].do_close()
# The callback will remove the plot.
self.plot_list = []
def change_crs(self):
crsdlg = NewCrsDialog(self.crs)
new_crs = crsdlg.get_crs()
if new_crs is not None:
if new_crs != self.crs:
try:
from pyproj import CRS, Transformer
except:
self.loading_dlg.hide_all()
fatal_error("The pyproj library is required to re-project spatial coordinates", kwargs={})
try:
crs_proj = CRS(new_crs)
except:
warning("Invalid CRS (%s)" % new_crs, kwargs={})
else:
if self.lat_4326_col is None:
for colname in ('lat_4326', 'latitude_4326', 'y_4326', 'unprojected_lat'):
if colname not in self.headers:
self.lat_4326_col = colname
self.headers.append(colname)
for r in self.rows:
r.append(None)
break
if self.lon_4326_col is None:
for colname in ('lon_4326', 'longitude_4326', 'x_4326', 'unprojected_lon'):
if colname not in self.headers:
self.lon_4326_col = colname
self.headers.append(colname)
for r in self.rows:
r.append(None)
break
self.lat_col = self.lat_4326_col
self.lon_col = self.lon_4326_col
self.lat_index = self.headers.index(self.lat_4326_col)
self.lon_index = self.headers.index(self.lon_4326_col)
crs_4326 = CRS(4326)
self.crs = new_crs
reproj = Transformer.from_crs(crs_proj, crs_4326, always_xy=True)
for r in self.rows:
y = r[self.src_lat_index]
x = r[self.src_lon_index]
if y is not None and y != 0 and x is not None and x != 0:
try:
newx, newy = reproj.transform(x, y)
r[self.lat_index] = newy
r[self.lon_index] = newx
except:
r[self.lat_index] = None
r[self.lon_index] = None
else:
r[self.lat_index] = None
r[self.lon_index] = None
selected = self.tbl.selection()
self.replace_data(self.rows, self.headers, self.src_lat_col, self.src_lon_col, self.label_col, self.symbol_col, self.color_col)
self.tbl.selection_set(tuple(selected))
self.mark_map({})
def cancel(self):
self.win.destroy()
sys.exit()
def export_map_and_quit(self):
fn, ext = os.path.splitext(self.imageoutputfile)
if ext.lower() == ".ps":
self.export_map_to_ps(self.imageoutputfile)
else:
self.map_widget.update_idletasks()
#self.win.after(200, self.save_imageoutputfile)
self.save_imageoutputfile()
self.win.destroy()
def export_map_to_ps(self, outfile):
self.map_widget.canvas.postscript(file=outfile, colormode='color')
def save_imageoutputfile(self):
obj = self.map_widget.canvas
bounds = (obj.winfo_rootx(), obj.winfo_rooty(),
obj.winfo_rootx() + obj.winfo_width(), obj.winfo_rooty() + obj.winfo_height())
ImageGrab.grab(bbox=bounds).save(self.imageoutputfile)
def export_map_to_img(self, outfile):
# Allow map to recover from blocking by the file dialog box before grabbing and exporting the canvas
self.map_widget.update_idletasks()
self.imageoutputfile = outfile
self.win.after(1000, self.save_imageoutputfile)
def add_menu(self, table_object, column_headers):
mnu = tk.Menu(self.win)
self.win.config(menu=mnu)
file_menu = tk.Menu(mnu, tearoff=0)
tbl_menu = tk.Menu(mnu, tearoff=0)
map_menu = tk.Menu(mnu, tearoff=0)
sel_menu = tk.Menu(mnu, tearoff=0)
plot_menu = tk.Menu(mnu, tearoff=0)
help_menu = tk.Menu(mnu, tearoff=0)
mnu.add_cascade(label="File", menu=file_menu, underline=0)
mnu.add_cascade(label="Table", menu=tbl_menu, underline=0)
mnu.add_cascade(label="Map", menu=map_menu, underline=0)
mnu.add_cascade(label="Selections", menu=sel_menu, underline=0)
mnu.add_cascade(label="Plot", menu=plot_menu, underline=0)
mnu.add_cascade(label="Help", menu=help_menu, underline=0)
def save_table():
if table_object.selection():
rowset = []
for sel_row in table_object.selection():
rowset.append(table_object.item(sel_row)["values"])
outfile = tkfiledialog.asksaveasfilename(title="File to save selected rows",
filetypes=[('CSV files', '.csv'), ('ODS files', '.ods'), ('TSV files', '.tsv'), ('Plain text', '.txt'), ('LaTeX', '.tex')])
if outfile:
if outfile[-3:].lower() == 'csv':
write_delimited_file(outfile, "csv", column_headers, rowset)
elif outfile[-3:].lower() == 'tsv':
write_delimited_file(outfile, "tsv", column_headers, rowset)
elif outfile[-3:].lower() == 'txt':
write_delimited_file(outfile, "plain", column_headers, rowset)
elif outfile[-3:].lower() == 'tex':
write_delimited_file(outfile, "tex", column_headers, rowset)
elif outfile[-3:].lower() == 'ods':
export_ods(outfile, column_headers, rowset, append=True, sheetname="Selected map items")
else:
# Force write as CSV.
outfile = outfile + ".csv"
write_delimited_file(outfile, "csv", column_headers, rowset)
def save_map():
outfile = tkfiledialog.asksaveasfilename(title="File to save map",
filetypes=[('Postscript files', '.ps'), ('JPEG files', '.jpg'), ('PNG files', '.png')])
fn, ext = os.path.splitext(outfile)
if len(ext) > 1 and outfile[-2:].lower() == 'ps':
self.export_map_to_ps(outfile)
else:
self.export_map_to_img(outfile)
def change_marker():
global select_symbol, select_color
marker_dlg = MarkerDialog(map_menu)
symbol, color = marker_dlg.get_marker()
if symbol is not None or color is not None:
if symbol is None or symbol == '':
symbol = select_symbol
if color is None or color == '':
color = select_color
symb_name = "%s %s" % (color, symbol)
if symb_name not in custom_icons:
custom_icons[symb_name] = tk.BitmapImage(data=icon_xbm[symbol], foreground=color)
select_symbol = symbol
select_color = color
self.sel_marker_icon = custom_icons[symb_name]
def change_labeling():
global location_marker, location_color, use_data_marker, use_data_color
global label_font, label_size, label_bold, label_position
lbl_dlg = LabelDialog(map_menu, self.headers, self.label_col)
mkr, clr, datamkr, dataclr, column, ffam, fsize, fbold, pos = lbl_dlg.get_labeling()
fsize = int(fsize)
fbold = False if "0" else True
if mkr is not None:
if mkr != location_marker or clr != location_color or datamkr != use_data_marker or \
dataclr != use_data_color or column != self.label_col or ffam != label_font or \
fsize != label_size or fbold != label_bold or pos != label_position:
fontchanged = ffam != label_font or fsize != label_size or fbold != label_bold
location_marker = mkr
location_color = clr
use_data_marker = datamkr == '1'
use_data_color = dataclr == '1'
self.label_col = column if column != '' else None
label_font = ffam
label_size = fsize
label_bold = fbold
label_position = pos
if fontchanged:
self.mapfont = self.makefont()
self.loc_marker_icon = self.set_get_loc_marker()
self.label_index = self.headers.index(self.label_col) if self.label_col is not None and self.label_col != '' else None
self.redraw_loc_markers(self.tbl)
self.redraw_sel_markers()
def import_symbol_file():
sd = ImportSymbolDialog()
name, fn = sd.run()
if name is not None and fn is not None:
import_symbol(name, fn)
fqfn = os.path.abspath(fn)
symb_spec = (name, fqfn)
if not symb_spec in imported_symbols:
imported_symbols.append(symb_spec)
def read_config_file():
fn = tkfiledialog.askopenfilename(filetypes=([('Config files', '.conf')]))
if fn != '':
global multiselect, select_symbol, select_color
pre_select = multiselect
pre_basemap = self.basemap_var.get()
pre_symbol = select_symbol
pre_color = select_color
pre_loc_symbol = location_marker
pre_loc_color = location_color
pre_label_color = label_color
pre_label_font = label_font
pre_label_size = label_size
pre_label_bold = label_bold
pre_label_position = label_position
read_config(fn)
# (Re)set configuration options to global defaults
self.map_option_menu['values'] = self.available_tile_servers()
if multiselect != pre_select:
self.multiselect_var.set(multiselect)
if initial_basemap != pre_basemap:
self.basemap_var.set(initial_basemap)
tileserver = self.tile_url(initial_basemap)
self.map_widget.set_tile_server(tileserver)
if select_symbol != pre_symbol or select_color != pre_color:
self.set_sel_marker(select_symbol, select_color)
# Redraw markers if any setting has changed
if location_marker != pre_loc_symbol or location_color != pre_loc_color or \
label_color != pre_label_color or label_font != pre_label_font or \
label_size != pre_label_size or label_bold != pre_label_bold or \
label_position != pre_label_position:
if label_font != pre_label_font or label_size != pre_label_size or label_bold != pre_label_bold:
self.mapfont = self.makefont()
self.loc_marker_icon = self.set_get_loc_marker()
self.redraw_loc_markers(self.tbl)
self.redraw_sel_markers()
global config_files_user
config_files_user.append(os.path.abspath(fn))
def save_config():
fn = tkfiledialog.asksaveasfilename(filetypes=([('Config files', '.conf')]))
if fn != '':
f = open(fn, "w")
f.write("# Configuration file for mapdata.py\n# Created by export from mapdata.py at %s\n" % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
f.write("\n[basemap_tile_servers]\n")
added_bms = [k for k in bm_servers if not k in initial_bms]
for k in added_bms:
f.write("%s=%s\n" % (k, bm_servers[k]))
f.write("\n[api_keys]\n")
for k in api_keys:
f.write("%s=%s\n" % (k, api_keys[k]))
f.write("\n[symbols]\n")
for s in imported_symbols:
f.write("%s=%s\n" % (s[0], s[1]))
f.write("\n[defaults]\n")
f.write("basemap=%s\n" % self.basemap_var.get())
f.write("location_marker=%s\n" % location_marker)
f.write("location_color=%s\n" % location_color)
f.write("use_data_marker=%s\n" % use_data_marker)
f.write("use_data_color=%s\n" % use_data_color)
f.write("multilselect=%s\n" % ('Yes' if self.multiselect_var.get() == '1' else 'No'))
f.write("select_symbol=%s\n" % select_symbol)
f.write("select_color=%s\n" % select_color)
f.write("label_color=%s\n" % label_color)
f.write("label_font=%s\n" % label_font)
f.write("label_size=%s\n" % label_size)
f.write("label_bold=%s\n" % ('No' if not label_bold else 'Yes'))
f.write("label_position=%s\n" % label_position)
if editor is not None:
f.write("\n[config]\n")
f.write("editor=%s\n" % editor)
def set_editor():
global editor
dlg = GetEditorDialog(self.win, editor)
new_editor = dlg.show()
if new_editor is not None:
editor = new_editor
def show_data_types():
if self.data_types is None:
self.loading_dlg.display("Evaluating data types")
self.data_types = self.data_types_queue.get()
self.data_types_process.join()
self.data_types_process.close()
self.loading_dlg.hide()
dlg = MsgDialog2("Data Types", "Data types, data completeness, and number of unique non-missing values for columns of the data table:", can_resize=True)
tframe, tdata = treeview_table(dlg.content_frame, self.data_types, ["Column", "Type", "Missing", "Unique"], "browse")
tframe.grid(row=0, column=0, sticky=tk.NSEW)
dlg.show()
def run_query():
dlg = QueryDialog(self.headers, self.db, self.whereclause)
whereclause, action = dlg.get_where()
if whereclause is not None:
self.whereclause = whereclause
sqlcmd = "SELECT treeviewid FROM mapdata WHERE %s" % whereclause
cur = self.db.cursor()
try:
result = cur.execute(sqlcmd)
id_list = [r[0] for r in result.fetchall()]
except:
cur.close()
warning("Invalid data selection expression: %s" % whereclause, kwargs={})
else:
cur.close()
# Enable multiselect
global multiselect
multiselect = "1"
self.multiselect_var.set("1")
self.tbl.configure(selectmode = tk.EXTENDED)
if action == "Replace":
self.unselect_map()
self.tbl.selection_set(list(id_list))
elif action == "Union":
all_selections = tuple(set(self.tbl.selection()) | set(id_list))
self.tbl.selection_set(all_selections)
elif action == "Intersection":
int_selections = tuple(set(self.tbl.selection()) & set(id_list))
self.tbl.selection_set(int_selections)
elif action == "Difference O-N":
# Old - New
diff_selections = tuple(set(self.tbl.selection()) - set(id_list))
self.tbl.selection_set(diff_selections)
else:
# New - Old
diff_selections = tuple(set(id_list) - set(self.tbl.selection()))
self.tbl.selection_set(diff_selections)
self.mark_map(None)
self.set_status()
def invert_selections():
selected = self.tbl.selection()
new_selections = []
for iid in self.tbl.get_children():
if not iid in selected:
new_selections.append(iid)
self.tbl.selection_set(tuple(new_selections))
self.mark_map(None)
self.set_status()
def new_plot():
if self.data_types is None:
self.loading_dlg.display("Evaluating data types")
self.data_types = self.data_types_queue.get()
self.data_types_process.join()
self.data_types_process.close()
self.loading_dlg.hide()
dlg = PlotDialog(self, self.data_types)
self.plot_list.append(dlg)
dlg.show
def online_help():
webbrowser.open("https://mapdata.osdn.io", new=2, autoraise=True)
def show_config_files():
if len(config_files) == 0 and len(config_files_user) == 0:
msg = "No configuration files have been read."
else:
if len(config_files) > 0:
msg = "Configuration files read on startup:\n %s" % "\n ".join(config_files)
if len(config_files_user) > 0:
msg = msg + "\n\n"
if len(config_files_user) > 0:
msg = msg + "Configuration files read after startup, in sequence:\n %s" % "\n ".join(config_files_user)
dlg = MsgDialog("Config files", msg)
dlg.show()
def show_about():
message="""
mapdata.py
version: %s, %s
Copyright %s, R Dreas Nielsen
License: GNU GPL3""" % (version, vdate, copyright)
dlg = MsgDialog("About", message)
dlg.show()
file_menu.add_command(label="Open CSV", command = self.new_data_file, underline=5)
file_menu.add_command(label="Open spreadsheet", command = self.new_spreadsheet_file, underline=5)
file_menu.add_command(label="Open database", command = self.new_db_table, underline=5)
file_menu.add_command(label="Import symbol", command = import_symbol_file, underline=0)
file_menu.add_command(label="Set editor", command = set_editor, underline=4)
file_menu.add_command(label="Read config", command = read_config_file, underline=0)
file_menu.add_command(label="Save config", command = save_config, underline=0)
file_menu.add_command(label="Quit", command = self.cancel, underline=0)
tbl_menu.add_command(label="Un-select all", command = self.unselect_map, underline=0)
tbl_menu.add_command(label="Export selected", command = save_table, underline=1)
tbl_menu.add_command(label="Data types", command = show_data_types, underline=5)
map_menu.add_command(label="Change marker", command = change_marker, underline=7)
map_menu.add_command(label="Change labeling", command = change_labeling, underline=7)
map_menu.add_command(label="Zoom selected", command = self.zoom_selected, underline=5)
map_menu.add_command(label="Zoom full", command = self.zoom_full, underline=5)
map_menu.add_command(label="Center on selection", command = self.focus_map, underline=0)
map_menu.add_command(label="Un-select all", command = self.unselect_map, underline=0)
map_menu.add_command(label="Change CRS", command = self.change_crs, underline=1)
map_menu.add_command(label="Export", command = save_map, underline=1)
sel_menu.add_command(label="Invert", command = invert_selections, underline=0)
sel_menu.add_command(label="Un-select all", command = self.unselect_map, underline=0)
sel_menu.add_command(label="Set by query", command = run_query, underline=7)
plot_menu.add_command(label="New", command = new_plot, underline=0)
plot_menu.add_command(label="Close all", command = self.close_all_plots, underline=0)
help_menu.add_command(label="Online help", command = online_help, underline=7)
help_menu.add_command(label="Config files", command = show_config_files, underline=0)
help_menu.add_command(label="About", command = show_about, underline=0)
class LoadingDialog(object):
def __init__(self, parent):
self.parent = parent
self.dlg = tk.Toplevel(parent)
self.dlg.title("MapData")
#self.dlg.geometry("150x50")
center_window(self.dlg)
self.dlg.update_idletasks()
self.dlg.withdraw()
self.dlg.wm_overrideredirect(True)
self.dlg.configure(bg="Gold")
self.messages = []
self.var_lbl = tk.StringVar(self.dlg, "")
self.lbl_loading = tk.Label(self.dlg, bg="Gold", textvariable=self.var_lbl)
self.lbl_loading.place(relx=0.5, rely=0.5, anchor="center")
self.dlg.update()
self.dots = 3
def update_lbl(self):
if len(self.messages) > 0:
self.dots = self.dots % 3 + 1
lbl = self.messages[0] + '.' * self.dots
self.var_lbl.set(lbl)
self.dlg.update()
self.after_id = self.dlg.after(250, self.update_lbl)
def display(self, message):
self.messages.append(message)
self.var_lbl.set(message)
self.dlg.deiconify()
raise_window(self.dlg)
self.dlg.config(cursor="watch")
self.dlg.update()
#self.dlg.focus_force()
#self.after_id = self.dlg.after(250, self.update_lbl)
def hide(self):
self.var_lbl.set("")
#self.dlg.after_cancel(self.after_id)
do_withdraw = True
self.dlg.config(cursor="arrow")
if len(self.messages) > 0:
self.messages.pop(-1)
if len(self.messages) > 0:
self.var_lbl.set(self.messages[0])
do_withdraw = False
if do_withdraw:
self.dlg.withdraw()
def hide_all(self):
self.var_lbl.set("")
self.messages = []
self.dlg.config(cursor="arrow")
self.dlg.withdraw()
class LabelDialog(object):
def __init__(self, parent, column_list, label_col):
columns = ['']
columns.extend(column_list)
label_col = '' if label_col is None else label_col
self.dlg = tk.Toplevel()
self.dlg.title("Change Labeling")
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.grid(row=1, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
#
symbol_lbl = ttk.Label(prompt_frame, text="Location symbol:")
symbol_lbl.grid(row=0, column=0, sticky=tk.E, padx=(6,3))
self.symbol_var = tk.StringVar(self.dlg, location_marker)
symbol_vals = list(icon_xbm.keys())
symbol_vals.sort()
self.symbol_opts = ttk.Combobox(prompt_frame, state="readonly", textvariable=self.symbol_var,
values=symbol_vals, width=15)
self.symbol_opts.grid(row=0, column=1, columnspan=3, sticky=tk.W, padx=(6,3))
color_lbl = ttk.Label(prompt_frame, text="Color:")
color_lbl.grid(row=1, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.color_var = tk.StringVar(self.dlg, location_color)
color_opts = ttk.Combobox(prompt_frame, state="readonly", textvariable=self.color_var,
values=list(select_colors), width=15)
color_opts.grid(row=1, column=1, columnspan=3, sticky=tk.W, padx=(6,3), pady=(3,3))
#
self.use_data_marker_var = tk.StringVar(prompt_frame, use_data_marker)
ck_use_data_marker = ttk.Checkbutton(prompt_frame, text="Use data symbol", variable=self.use_data_marker_var)
ck_use_data_marker.grid(row=0, column=3, columnspan=2, sticky=tk.W, padx=(3,6), pady=(3,3))
self.use_data_color_var = tk.StringVar(prompt_frame, use_data_color)
ck_use_color_marker = ttk.Checkbutton(prompt_frame, text="Use data color", variable=self.use_data_color_var)
ck_use_color_marker.grid(row=1, column=3, columnspan=2, sticky=tk.W, padx=(3,6), pady=(3,3))
#
self.col_var = tk.StringVar(prompt_frame, label_col)
col_lbl = ttk.Label(prompt_frame, text = "Data column:")
col_lbl.grid(row=2, column=0, sticky=tk.E, padx=(6,3), pady=(6,3))
col_opts = ttk.Combobox(prompt_frame, state="readonly", textvariable=self.col_var,
values=columns, width=40)
col_opts.grid(row=2, column=1, columnspan=4, sticky=tk.W, padx=(3,6), pady=(3,3))
#
self.font_var = tk.StringVar(prompt_frame, label_font)
font_lbl = ttk.Label(prompt_frame, text="Font:")
font_lbl.grid(row=3, column=0, sticky=tk.E, padx=(3,3), pady=(6,3))
fonts = list(set(list(tkfont.families())))
fonts.sort()
font_opts = ttk.Combobox(prompt_frame, state="readonly", textvariable=self.font_var,
values=fonts, width=25)
font_opts.grid(row=3, column=1, columnspan=3, sticky=tk.W, padx=(3,3), pady=(3,3))
self.bold_var = tk.StringVar(prompt_frame, "0" if not label_bold else "1")
ck_bold = ttk.Checkbutton(prompt_frame, text="Bold", variable=self.bold_var)
ck_bold.grid(row=3, column=4, sticky=tk.W, padx=(3,6), pady=(3,3))
self.size_var = tk.IntVar(prompt_frame, label_size)
#
size_lbl = ttk.Label(prompt_frame, text="Size:")
size_lbl.grid(row=4, column=0, sticky=tk.E, padx=(6,3), pady=(3,3))
size_opt = ttk.Combobox(prompt_frame, state="normal", textvariable=self.size_var,
values=[8, 10, 12, 14, 16, 20, 24], width=3)
size_opt.grid(row=4, column=1, sticky=tk.W, padx=(6,3), pady=(3,3))
self.position_var = tk.StringVar(prompt_frame, label_position)
position_lbl = ttk.Label(prompt_frame, text="Position:")
position_lbl.grid(row=4, column=2, sticky=tk.E, padx=(3,3), pady=(3,3))
position_sel = ttk.Combobox(prompt_frame, state="readonly", textvariable=self.position_var,
values=["above", "below"], width=6)
position_sel.grid(row=4, column=3, sticky=tk.W, padx=(3,6), pady=(3,3))
#
# Buttons
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
ok_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,3))
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel)
cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_cancel)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#change-labeling", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
self.canceled = False
self.dlg.destroy()
def get_labeling(self):
self.dlg.grab_set()
center_window(self.dlg)
self.dlg.resizable(False, False)
self.symbol_opts.focus()
self.dlg.wait_window(self.dlg)
if not self.canceled:
return (self.symbol_var.get(), self.color_var.get(), self.use_data_marker_var.get(),
self.use_data_color_var.get(), self.col_var.get(), self.font_var.get(), self.size_var.get(),
self.bold_var.get(), self.position_var.get())
else:
return (None,None,None,None,None,None,None,None,None)
class MarkerDialog(object):
def __init__(self, parent):
self.dlg = tk.Toplevel()
self.dlg.title("Change Marker")
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.grid(row=1, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
symbol_lbl = ttk.Label(prompt_frame, text="Marker symbol:")
symbol_lbl.grid(row=0, column=0, sticky=tk.E, padx=(6,3))
self.symbol_var = tk.StringVar(self.dlg, select_symbol)
symbol_vals = list(icon_xbm.keys())
symbol_vals.sort()
self.symbol_opts = ttk.Combobox(prompt_frame, state="readonly", textvar=self.symbol_var,
values=symbol_vals, width=15)
self.symbol_opts.grid(row=0, column=1, sticky=tk.W, padx=(3,6))
color_lbl = ttk.Label(prompt_frame, text="Color:")
color_lbl.grid(row=1, column=0, sticky=tk.E, padx=(6,3))
self.color_var = tk.StringVar(self.dlg, select_color)
color_vals = list(select_colors)
color_opts = ttk.Combobox(prompt_frame, state="readonly", textvar=self.color_var,
values=color_vals, width=15)
color_opts.grid(row=1, column=1, sticky=tk.W, padx=(3,6))
# Buttons
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
ok_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,3))
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel)
cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_cancel)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#change-marker", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
self.canceled = False
self.dlg.destroy()
def get_marker(self):
self.dlg.grab_set()
center_window(self.dlg)
self.dlg.resizable(False, False)
self.symbol_opts.focus()
self.dlg.wait_window(self.dlg)
if not self.canceled:
return (self.symbol_var.get(), self.color_var.get())
else:
return (None, None)
class ImportSymbolDialog(object):
def __init__(self):
def get_fn(*args):
fn = tkfiledialog.askopenfilename(filetypes=([('X11 bitmaps', '.xbm')]))
if fn != '':
self.fn_var.set(fn)
def check_enable(*args):
if self.fn_var.get() != '' and self.symbol_var.get() != '':
self.ok_btn["state"] = tk.NORMAL
else:
self.ok_btn["state"] = tk.DISABLED
self.dlg = tk.Toplevel()
self.dlg.title("Import X11 Bitmap Symbol")
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
button_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
button_frame.grid(row=1, column=0, columnspan=3, sticky=tk.EW, pady=(3,3))
button_frame.columnconfigure(0, weight=1)
btn_frame = tk.Frame(button_frame)
btn_frame.grid(row=0, column=0, sticky=tk.EW)
btn_frame.columnconfigure(0, weight=1)
# Prompts
symbol_lbl = ttk.Label(prompt_frame, text="Symbol name:")
symbol_lbl.grid(row=0, column=0, sticky=tk.E, padx=(3,3))
self.symbol_var = tk.StringVar(self.dlg, "")
self.symbol_var.trace('w', check_enable)
self.symbol_entry = ttk.Entry(prompt_frame, textvariable=self.symbol_var, width=12)
self.symbol_entry.grid(row=0, column=1, sticky=tk.W, padx=(3,3))
#
fn_label = ttk.Label(prompt_frame, text="File:")
fn_label.grid(row=1, column=0, sticky=tk.E, padx=(3,3))
self.fn_var = tk.StringVar(prompt_frame, '')
self.fn_var.trace('w', check_enable)
fn_entry = ttk.Entry(prompt_frame, textvariable=self.fn_var)
fn_entry.configure(width=64)
fn_entry.grid(row=1, column=1, sticky=tk.EW, padx=(3,3))
fn_btn = ttk.Button(prompt_frame, text="Browse", command=get_fn)
fn_btn.grid(row=1, column=2, sticky=tk.W)
# Buttons
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
self.ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
self.ok_btn["state"] = tk.DISABLED
self.ok_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,3))
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel)
cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_cancel)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#import-symbol", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
if self.ok_btn["state"] != tk.DISABLED:
self.canceled = False
self.dlg.destroy()
def run(self):
self.dlg.grab_set()
center_window(self.dlg)
self.dlg.resizable(False, False)
self.symbol_entry.focus()
self.dlg.wait_window(self.dlg)
if not self.canceled:
return (self.symbol_var.get(), self.fn_var.get())
else:
return (None, None)
class DataFileDialog(object):
def __init__(self):
def get_fn():
fn = tkfiledialog.askopenfilename(filetypes=([('CSV files', '.csv')]), parent=self.dlg)
if fn != '':
self.fn_var.set(fn)
csvreader = CsvFile(fn)
self.header_list = csvreader.next()
self.id_sel["values"] = self.header_list
self.lat_sel["values"] = self.header_list
self.lon_sel["values"] = self.header_list
self.sym_sel["values"] = self.header_list
self.col_sel["values"] = self.header_list
def check_enable(*args):
if self.fn_var.get() != '' and self.lat_var.get() != '' and self.lon_var.get() != '':
ok_btn["state"] = tk.NORMAL
else:
ok_btn["state"] = tk.DISABLED
def new_fn(*args):
check_enable()
self.header_list = []
self.dlg = tk.Toplevel()
self.dlg.title("Open CSV Data File for Map Display")
# Main frames
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
dir_frame = tk.Frame(prompt_frame)
dir_frame.grid(row=0, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
dir_frame.rowconfigure(0, weight=1)
dir_frame.columnconfigure(0, weight=1)
req_frame = ttk.LabelFrame(prompt_frame, text="Required")
req_frame.grid(row=1, column=0, sticky=tk.EW, padx=(6,3), pady=(3,3))
req_frame.columnconfigure(0, weight=1)
opt_frame = ttk.LabelFrame(prompt_frame, text="Optional")
opt_frame.grid(row=2, column=0, sticky=tk.EW, padx=(6,3), pady=(9,3))
opt_frame.columnconfigure(0, weight=1)
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.grid(row=1, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
# Prompts
#-- Directions
dir_lbl = ttk.Label(dir_frame,
text="Select a CSV file with columns containing latitude and longitude values, and optionally other information.",
width=80, justify=tk.LEFT, wraplength=600)
dir_lbl.grid(row=0, column=0, padx=(3,3), pady=(3,3))
def wrap_msg(event):
dir_lbl.configure(wraplength=event.width - 5)
dir_lbl.bind("", wrap_msg)
#-- Filename
fn_frame = tk.Frame(req_frame)
fn_frame.grid(row=0, column=0, sticky=tk.EW, pady=(5,5))
fn_label = ttk.Label(fn_frame, text="File:")
fn_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3))
self.fn_var = tk.StringVar(fn_frame, '')
self.fn_var.trace('w', new_fn)
fn_entry = ttk.Entry(fn_frame, textvariable=self.fn_var)
fn_entry.configure(width=64)
fn_entry.grid(row=0, column=1, sticky=tk.EW, padx=(3,3))
fn_btn = ttk.Button(fn_frame, text="Browse", command=get_fn)
fn_btn.grid(row=0, column=2, sticky=tk.W, padx=(3,3))
#-- Required columns
column_choices = list(self.header_list)
#
req_col_frame = tk.Frame(req_frame)
req_col_frame.grid(row=1, column=0, sticky=tk.EW, pady=(3,3))
lat_label = ttk.Label(req_col_frame, text="Latitude column:")
lat_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.lat_var = tk.StringVar(req_col_frame, '')
self.lat_var.trace('w', check_enable)
self.lat_sel = ttk.Combobox(req_col_frame, state="readonly", textvariable=self.lat_var, values=self.header_list, width=12)
self.lat_sel.grid(row=0, column=1, sticky=tk.W, padx=(3,3), pady=(3,3))
#
lon_label = ttk.Label(req_col_frame, text="Longitude column:")
lon_label.grid(row=0, column=2, sticky=tk.E, padx=(20,3), pady=(3,3))
self.lon_var = tk.StringVar(req_frame, '')
self.lon_var.trace('w', check_enable)
self.lon_sel = ttk.Combobox(req_col_frame, state="readonly", textvariable=self.lon_var, values=self.header_list, width=12)
self.lon_sel.grid(row=0, column=3, sticky=tk.W, padx=(3,3), pady=(3,3))
#-- Optional columns
opt_col_frame = tk.Frame(opt_frame)
opt_col_frame.grid(row=2, column=0, sticky=tk.EW, pady=(3,3))
id_label = ttk.Label(opt_col_frame, text="Label column:")
id_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.id_var = tk.StringVar(opt_col_frame, '')
self.id_sel = ttk.Combobox(opt_col_frame, state="readonly", textvariable=self.id_var, values=self.header_list, width=12)
self.id_sel.grid(row=0, column=1, sticky=tk.W, padx=(3,20), pady=(3,3))
#
crs_label = ttk.Label(opt_col_frame, text="CRS:")
crs_label.grid(row=0, column=2, sticky=tk.E, padx=(3,3), pady=(3,3))
self.crs_var = tk.IntVar(opt_col_frame, 4326)
self.crs_var.trace('w', check_enable)
self.crs_sel = ttk.Entry(opt_col_frame, width=8, textvariable=self.crs_var)
self.crs_sel.grid(row=0, column=3, sticky=tk.W, padx=(3,20), pady=(3,3))
#
sym_label = ttk.Label(opt_col_frame, text="Symbol column:")
sym_label.grid(row=1, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.sym_var = tk.StringVar(opt_col_frame, '')
self.sym_sel = ttk.Combobox(opt_col_frame, state="readonly", textvariable=self.sym_var, values=self.header_list, width=12)
self.sym_sel.grid(row=1, column=1, sticky=tk.W, padx=(3,20), pady=(3,3))
#
col_label = ttk.Label(opt_col_frame, text="Color column:")
col_label.grid(row=1, column=2, sticky=tk.E, padx=(3,3), pady=(3,3))
self.col_var = tk.StringVar(opt_col_frame, '')
self.col_sel = ttk.Combobox(opt_col_frame, state="readonly", textvariable=self.col_var, values=self.header_list, width=12)
self.col_sel.grid(row=1, column=3, sticky=tk.W, padx=(3,20), pady=(3,3))
#-- Description
title_label = ttk.Label(opt_col_frame, text="Description:")
title_label.grid(row=2, column=0, sticky=tk.E, padx=(6,3), pady=(3,3))
self.title_var = tk.StringVar(opt_col_frame, '')
title_entry = ttk.Entry(opt_col_frame, width=60, textvariable=self.title_var)
title_entry.grid(row=2, column=1, columnspan=3, sticky=tk.EW, padx=(3,6), pady=(3,3))
# Buttons
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
ok_btn.grid(row=0, column=1, sticky=tk.E, padx=3)
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel)
cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
ok_btn["state"] = tk.DISABLED
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_cancel)
self.dlg.resizable(False, False)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#open-csv-data-file", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
if self.fn_var.get() != '' and self.lat_var.get() != '' and self.lon_var.get() != '':
self.canceled = False
self.dlg.destroy()
def get_datafile(self):
self.dlg.grab_set()
center_window(self.dlg)
self.dlg.resizable(False, False)
self.dlg.wait_window(self.dlg)
self.dlg = None
if not self.canceled:
headers, rows = file_data(self.fn_var.get())
return (self.fn_var.get(), self.id_var.get(), self.lat_var.get(), self.lon_var.get(),
self.crs_var.get(), self.sym_var.get(), self.col_var.get(), self.title_var.get(),
headers, rows)
else:
return (None, None, None, None, None, None, None, None, None, None)
class ImportSpreadsheetDialog(object):
def __init__(self, parent, mapui):
self.parent = parent
self.mapui = mapui
def get_fn(*args):
fn = tkfiledialog.askopenfilename(filetypes=([('Spreadsheets', '.ods .xlsx .xls')]), parent=self.dlg)
if fn != '':
self.fn_var.set(fn)
def check_w1enable(*args):
if self.fn_var.get() != '':
if os.path.isfile(self.fn_var.get()):
w1next_btn["state"] = tk.NORMAL
else:
w1next_btn["state"] = tk.DISABLED
else:
w1next_btn["state"] = tk.DISABLED
def check_w2enable(*args):
if self.fn_var.get() != '' and self.sheet_var.get() != '':
w2next_btn["state"] = tk.NORMAL
else:
w2next_btn["state"] = tk.DISABLED
def check_w3enable(*args):
if self.fn_var.get() != '' and self.sheet_var.get() != '' and self.lat_var.get() != '' and self.lon_var.get() != '':
w3ok_btn["state"] = tk.NORMAL
else:
w3ok_btn["state"] = tk.DISABLED
def new_fn(*args):
check_w1enable()
self.sheet_list = []
self.header_list = []
self.dlg = tk.Toplevel(parent)
self.dlg.title("Open Spreadsheet File for Map Display")
# Main frames
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
# Wizard frames 1, 2, and 3 are gridded in the same cell to make a wizard.
self.dlg.rowconfigure(0, weight=0)
wiz1_frame = tk.Frame(self.dlg) # For description, filename, and sheet name
wiz1_frame.grid(row=1, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
wiz1_frame.rowconfigure(0, weight=1)
wiz2_frame = tk.Frame(self.dlg) # For sheet selections
wiz2_frame.grid(row=1, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
wiz2_frame.rowconfigure(0, weight=1)
wiz2_frame.columnconfigure(0, weight=1)
wiz3_frame = tk.Frame(self.dlg) # For column selections
wiz3_frame.grid(row=1, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
wiz3_frame.columnconfigure(0, weight=1)
self.dlg.rowconfigure(1, weight=0)
self.dlg.resizable(False, False)
wiz1_frame.lift()
# Populate directions frame
dir_lbl = ttk.Label(prompt_frame,
text="Select a spreadsheet file with columns containing latitude and longitude values, and optionally other information.",
width=80, justify=tk.LEFT, wraplength=600)
dir_lbl.grid(row=0, column=0, padx=(6,6), pady=(3,3))
def wrap_msg(event):
dir_lbl.configure(wraplength=event.width - 5)
dir_lbl.bind("", wrap_msg)
# Populate wiz1_frame
w1req_frame = ttk.LabelFrame(wiz1_frame, text="Required")
w1req_frame.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
w1req_frame.columnconfigure(0, weight=1)
fn_frame = tk.Frame(w1req_frame)
fn_frame.grid(row=0, column=0, sticky=tk.EW, pady=(3,3))
fn_label = ttk.Label(fn_frame, text="File:")
fn_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3))
self.fn_var = tk.StringVar(fn_frame, '')
self.fn_var.trace('w', new_fn)
fn_entry = ttk.Entry(fn_frame, textvariable=self.fn_var)
fn_entry.configure(width=64)
fn_entry.grid(row=0, column=1, sticky=tk.EW, padx=(3,3))
fn_btn = ttk.Button(fn_frame, text="Browse", command=get_fn, underline=0)
fn_btn.grid(row=0, column=2, sticky=tk.W, padx=(3,3))
self.dlg.bind("", get_fn)
w1opt_frame = ttk.LabelFrame(wiz1_frame, text="Optional")
w1opt_frame.grid(row=1, column=0, sticky=tk.EW, padx=(6,3), pady=(9,3))
w1opt_frame.columnconfigure(0, weight=1)
desc_label = ttk.Label(w1opt_frame, text="Description:")
desc_label.grid(row=0, column=0, sticky=tk.E, padx=(6,3), pady=(3,3))
self.desc_var = tk.StringVar(w1opt_frame, '')
desc_entry = ttk.Entry(w1opt_frame, width=60, textvariable=self.desc_var)
desc_entry.grid(row=0, column=1, sticky=tk.W, padx=(3,6), pady=(3,3))
def w1_next(*args):
if self.fn_var.get() != '':
# Open spreadsheet, get sheet names
self.mapui.loading_dlg.display("Importing spreadsheet")
fn, ext = os.path.splitext(self.fn_var.get())
ext = ext.lower()
try:
if ext == '.ods':
sso = OdsFile()
elif ext == '.xlsx':
sso = XlsxFile()
else:
sso = XlsFile()
except:
warning("Could not open %s" % self.fn_var.get(), kwargs={'parent': self.dlg})
else:
sso.open(self.fn_var.get())
self.sheet_list = sso.sheetnames()
self.sheet_sel["values"] = self.sheet_list
if ext in ('.ods', '.xlsx'):
try:
sso.close()
except:
pass
else:
try:
sso.release_resources()
del sso
except:
pass
self.dlg.bind("")
self.dlg.bind("")
wiz2_frame.lift()
self.dlg.bind("", w2_back)
self.dlg.bind("", w2_next)
self.mapui.loading_dlg.hide()
w1btn_frame = tk.Frame(wiz1_frame, borderwidth=3, relief=tk.RIDGE)
w1btn_frame.grid(row=2, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
w1btn_frame.columnconfigure(0, weight=1)
self.canceled = False
#
w1help_btn = ttk.Button(w1btn_frame, text="Help", command=self.do_help, underline=0)
w1help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
w1next_btn = ttk.Button(w1btn_frame, text="Next", command=w1_next, underline=0)
w1next_btn.grid(row=0, column=1, sticky=tk.E, padx=3)
self.dlg.bind("", w1_next)
w1cancel_btn = ttk.Button(w1btn_frame, text="Cancel", command=self.do_cancel, underline=0)
w1cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
w1next_btn["state"] = tk.DISABLED
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_cancel)
# Populate wiz2_frame
w2req_frame = ttk.LabelFrame(wiz2_frame, text="Required")
w2req_frame.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
w2req_frame.columnconfigure(0, weight=1)
w2inner_frame = tk.Frame(w2req_frame)
w2inner_frame.grid(row=0, column=0, sticky=tk.W)
#
sheet_label = ttk.Label(w2inner_frame, text="Sheet:")
sheet_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3))
self.sheet_var = tk.StringVar(w2req_frame, '')
self.sheet_var.trace('w', check_w2enable)
self.sheet_sel = ttk.Combobox(w2inner_frame, state="readonly", textvariable=self.sheet_var, values=self.sheet_list, width=16)
self.sheet_sel.grid(row=0, column=1, sticky=tk.W, padx=(3,3))
#
xrows_label = ttk.Label(w2inner_frame, text="Initial rows to skip:")
xrows_label.grid(row=1, column=0, sticky=tk.E, padx=(3,3))
self.xrows_var = tk.IntVar(w2req_frame, 0)
self.xrows_var.trace('w', check_w2enable)
xrows_entry = ttk.Entry(w2inner_frame, textvariable=self.xrows_var, width=6)
xrows_entry.grid(row=1, column=1, sticky=tk.W, padx=(3,3))
def w2_back(*args):
self.dlg.bind("")
self.dlg.bind("")
wiz1_frame.lift()
self.dlg.bind("", w1_next)
self.dlg.bind("", get_fn)
def w2_next(*args):
# Open spreadsheet, get column names
if self.fn_var.get() != '' and self.sheet_var.get() != '':
fn, ext = os.path.splitext(self.fn_var.get())
try:
if ext.lower() == '.ods':
hdrs, data = ods_data(self.fn_var.get(), self.sheet_var.get(), junk_header_rows=self.xrows_var.get())
else:
hdrs, data = xls_data(self.fn_var.get(), self.sheet_var.get(), junk_header_rows=self.xrows_var.get())
except:
warning("Could not read table from %s, sheet %s" % (self.fn_var.get(), self.sheet_var.get()),
kwargs={'parent': self.dlg})
else:
self.headers = hdrs
self.header_list = list(hdrs)
self.rows = data
# Set list box values
self.id_sel["values"] = self.header_list
self.lat_sel["values"] = self.header_list
self.lon_sel["values"] = self.header_list
self.sym_sel["values"] = self.header_list
self.col_sel["values"] = self.header_list
self.dlg.bind("")
self.dlg.bind("")
wiz3_frame.lift()
self.dlg.bind("", w3_back)
w2btn_frame = tk.Frame(wiz2_frame, borderwidth=3, relief=tk.RIDGE)
w2btn_frame.grid(row=2, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
w2btn_frame.columnconfigure(0, weight=1)
#
w2help_btn = ttk.Button(w2btn_frame, text="Help", command=self.do_help, underline=0)
w2help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
w2prev_btn = ttk.Button(w2btn_frame, text="Back", command=w2_back, underline=0)
w2prev_btn.grid(row=0, column=1, sticky=tk.E, padx=3)
w2next_btn = ttk.Button(w2btn_frame, text="Next", command=w2_next, underline=0)
w2next_btn.grid(row=0, column=2, sticky=tk.E, padx=3)
w2cancel_btn = ttk.Button(w2btn_frame, text="Cancel", command=self.do_cancel, underline=0)
w2cancel_btn.grid(row=0, column=3, sticky=tk.E, padx=(3,6))
w2next_btn["state"] = tk.DISABLED
# Populate wiz3_frame
w3req_frame = ttk.LabelFrame(wiz3_frame, text="Required")
w3req_frame.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
w3req_frame.columnconfigure(0, weight=1)
#
lat_label = ttk.Label(w3req_frame, text="Latitude column:")
lat_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.lat_var = tk.StringVar(w3req_frame, '')
self.lat_var.trace('w', check_w3enable)
self.lat_sel = ttk.Combobox(w3req_frame, state="readonly", textvariable=self.lat_var, values=self.header_list, width=15)
self.lat_sel.grid(row=0, column=1, sticky=tk.W, padx=(3,30), pady=(3,3))
#
lon_label = ttk.Label(w3req_frame, text="Longitude column:")
lon_label.grid(row=0, column=2, sticky=tk.E, padx=(3,3), pady=(3,3))
self.lon_var = tk.StringVar(w3req_frame, '')
self.lon_var.trace('w', check_w3enable)
self.lon_sel = ttk.Combobox(w3req_frame, state="readonly", textvariable=self.lon_var, values=self.header_list, width=15)
self.lon_sel.grid(row=0, column=3, sticky=tk.W, padx=(3,6), pady=(3,3))
w3opt_frame = ttk.LabelFrame(wiz3_frame, text="Optional")
w3opt_frame.grid(row=1, column=0, sticky=tk.EW, padx=(6,6), pady=(9,3))
w3opt_frame.columnconfigure(0, weight=1)
#
id_label = ttk.Label(w3opt_frame, text="Label column:")
id_label.grid(row=0, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.id_var = tk.StringVar(w3opt_frame, '')
self.id_sel = ttk.Combobox(w3opt_frame, state="readonly", textvariable=self.id_var, values=self.header_list, width=12)
self.id_sel.grid(row=0, column=1, sticky=tk.W, padx=(3,30), pady=(3,3))
#
crs_label = ttk.Label(w3opt_frame, text="CRS:")
crs_label.grid(row=0, column=2, sticky=tk.E, padx=(3,3), pady=(3,3))
self.crs_var = tk.IntVar(w3opt_frame, 4326)
self.crs_var.trace('w', check_w2enable)
self.crs_sel = ttk.Entry(w3opt_frame, width=8, textvariable=self.crs_var)
self.crs_sel.grid(row=0, column=3, sticky=tk.W, padx=(3,6), pady=(3,3))
#
sym_label = ttk.Label(w3opt_frame, text="Symbol column:")
sym_label.grid(row=1, column=0, sticky=tk.E, padx=(3,3), pady=(3,3))
self.sym_var = tk.StringVar(w3opt_frame, '')
self.sym_sel = ttk.Combobox(w3opt_frame, state="readonly", textvariable=self.sym_var, values=self.header_list, width=12)
self.sym_sel.grid(row=1, column=1, sticky=tk.W, padx=(3,30), pady=(3,3))
#
col_label = ttk.Label(w3opt_frame, text="Color column:")
col_label.grid(row=1, column=2, sticky=tk.E, padx=(3,3), pady=(3,3))
self.col_var = tk.StringVar(w3opt_frame, '')
self.col_sel = ttk.Combobox(w3opt_frame, state="readonly", textvariable=self.col_var, values=self.header_list, width=12)
self.col_sel.grid(row=1, column=3, sticky=tk.W, padx=(3,6), pady=(3,3))
def w3_back(*args):
self.dlg.bind("")
wiz2_frame.lift()
self.dlg.bind("", w2_back)
self.dlg.bind("", w2_next)
w3btn_frame = tk.Frame(wiz3_frame, borderwidth=3, relief=tk.RIDGE)
w3btn_frame.grid(row=2, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
w3btn_frame.columnconfigure(0, weight=1)
#
w3help_btn = ttk.Button(w3btn_frame, text="Help", command=self.do_help, underline=0)
w3help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
w3prev_btn = ttk.Button(w3btn_frame, text="Back", command=w3_back)
w3prev_btn.grid(row=0, column=1, sticky=tk.E, padx=3)
w3ok_btn = ttk.Button(w3btn_frame, text="OK", command=self.do_select, underline=0)
w3ok_btn.grid(row=0, column=2, sticky=tk.E, padx=3)
self.dlg.bind('', self.do_select)
w3cancel_btn = ttk.Button(w3btn_frame, text="Cancel", command=self.do_cancel, underline=0)
w3cancel_btn.grid(row=0, column=3, sticky=tk.E, padx=(3,6))
w3ok_btn["state"] = tk.DISABLED
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#open-spreadsheet-data-file", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
if self.fn_var.get() != '' and self.sheet_var.get() != '' and self.lat_var.get() != '' and self.lon_var.get() != '':
self.canceled = False
self.dlg.destroy()
def get_datafile(self):
self.dlg.grab_set()
center_window(self.dlg)
self.dlg.resizable(False, False)
self.dlg.wait_window(self.dlg)
self.dlg = None
if not self.canceled:
return (self.fn_var.get(), self.id_var.get(), self.lat_var.get(), self.lon_var.get(),
self.crs_var.get(), self.sym_var.get(), self.col_var.get(), self.desc_var.get(),
self.headers, self.rows)
else:
return (None, None, None, None, None, None, None, None, None, None)
class NewCrsDialog(object):
def __init__(self, current_crs):
self.dlg = tk.Toplevel()
self.dlg.title("Change CRS")
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.grid(row=1, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.rowconfigure(0, weight=1)
crs_lbl = ttk.Label(prompt_frame, text="New CRS:")
crs_lbl.grid(row=0, column=0, sticky=tk.E, padx=(3,3))
self.crs_var = tk.IntVar(self.dlg, current_crs)
self.crs_entry = ttk.Entry(prompt_frame, width=12, textvariable=self.crs_var)
self.crs_entry.grid(row=0, column=1, sticky=tk.W, padx=(3,3))
# Buttons
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
ok_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,3))
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel)
cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_cancel)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#change-crs", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
self.canceled = False
self.dlg.destroy()
def get_crs(self):
self.dlg.grab_set()
center_window(self.dlg)
self.dlg.resizable(False, False)
self.crs_entry.focus()
self.dlg.wait_window(self.dlg)
if not self.canceled:
return self.crs_var.get()
else:
return None
class QueryDialog(object):
def __init__(self, column_headers, db_conn, init_sql=""):
self.dlg = tk.Toplevel()
self.dlg.title("Query Data")
self.canceled = True
self.dlg.columnconfigure(0, weight=1)
self.dlg.rowconfigure(1, weight=1)
# Frames
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
prompt_frame.columnconfigure(0, weight=1)
query_frame = tk.Frame(self.dlg)
query_frame.grid(row=1, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
query_frame.rowconfigure(0, weight=1)
query_frame.columnconfigure(0, weight=1)
query_frame.columnconfigure(1, weight=3)
sql_frame = tk.Frame(query_frame)
sql_frame.grid(row=0, column=0, sticky=tk.NSEW, padx=(3,3), pady=(3,3))
sql_frame.rowconfigure(0, weight=1)
sql_frame.columnconfigure(0, weight=1)
col_frame = tk.Frame(query_frame)
col_frame.grid(row=0, column=1, rowspan=2, sticky=tk.NS, padx=(3,3), pady=(3,3))
col_frame.rowconfigure(0, weight=1)
act_frame = tk.Frame(query_frame)
act_frame.grid(row=1, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.grid(row=2, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
# Prompt
prompt_lbl = ttk.Label(prompt_frame, wraplength=300, justify=tk.LEFT, text="Enter an expression below to identify the data rows that you want to select. The syntax of this expression should correspond to a SQL 'WHERE' clause. Column names with non-alphanumeric characters should be double-quoted. String literals should be single-quoted. The '%' character is a wildcard. Ctrl-Enter completes entry.")
prompt_lbl.grid(row=0, column=0, sticky=tk.EW, padx=(3,3))
def wrap_prompt(event):
prompt_lbl.configure(wraplength=event.width - 5)
prompt_lbl.bind("", wrap_prompt)
# SQL entry
self.sql = init_sql
self.sql_text = tk.Text(sql_frame, width=60, height=10)
if init_sql is not None and init_sql != "":
self.sql_text.insert(tk.END, init_sql)
self.sql_text.grid(row=0, column=0, sticky=tk.NSEW, padx=(3,0), pady=(3,3))
self.sql_text.bind("", self.check_enable)
sbar = tk.Scrollbar(sql_frame)
sbar.grid(row=0, column=1, sticky=tk.NS, padx=(0,3), pady=(3,3))
sbar.config(command=self.sql_text.yview)
self.sql_text.config(yscrollcommand = sbar.set)
# Column values
col_lbl = ttk.Label(col_frame, text="Column values:")
col_lbl.grid(row=0, column=0, sticky=tk.NW, padx=(3,3), pady=(3,3))
col_var = tk.StringVar(col_frame, "")
colsel = ttk.Combobox(col_frame, state="readonly", textvariable=col_var, values=column_headers, width=20)
colsel.grid(row=1, column=0, sticky=tk.NW, padx=(3,3), pady=(3,3))
tv_frame = tk.Frame(col_frame)
tv_frame.grid(row=2, column=0, sticky=tk.NS, padx=(3,3), pady=(3,3))
col_frame.rowconfigure(0, weight=0)
col_frame.rowconfigure(1, weight=0)
col_frame.rowconfigure(2, weight=1)
col_frame.columnconfigure(2, weight=1)
def colval_to_sql(event):
item_iid = self.tv_tbl.identify('item', event.x, event.y)
item_val = self.tv_tbl.item(item_iid, "values")[0]
if not isfloat(item_val):
item_val = "'%s'" % item_val
self.sql_text.insert(tk.END, " "+item_val)
def list_col_vals(event):
curs = db_conn.cursor()
colname = dquote(col_var.get())
res = curs.execute('SELECT DISTINCT %s FROM mapdata ORDER BY %s' % (colname, colname))
rowset = res.fetchall()
for widget in tv_frame.winfo_children():
widget.destroy()
tblframe, self.tv_tbl = treeview_table(tv_frame, rowset, [col_var.get()])
tblframe.grid(column=0, row=0, sticky=tk.NSEW)
tv_frame.rowconfigure(0, weight=1)
tv_frame.columnconfigure(0, weight=1)
curs.close()
self.tv_tbl.bind("", colval_to_sql)
colsel.bind("<>", list_col_vals)
# Action selection
self.act_var = tk.StringVar(act_frame, "Replace")
act_lbl = ttk.Label(act_frame, text="Action:")
act_lbl.grid(row=0, column=0, sticky=tk.E, padx=(6,3))
act_sel = ttk.Combobox(act_frame, state="readonly", textvariable=self.act_var, values=["Replace", "Union", "Intersection", "Difference O-N", "Difference N-O"], width=15)
act_sel.grid(row=0, column=1, sticky=tk.W, padx=(3,6))
# Buttons
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
self.ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
self.ok_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,3))
if init_sql or "" == "":
self.ok_btn["state"] = tk.DISABLED
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel, underline=0)
cancel_btn.grid(row=0, column=2, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
def check_enable(self, *args):
if self.sql_text.get("1.0", "end-1c") != '':
self.ok_btn["state"] = tk.NORMAL
else:
self.ok_btn["state"] = tk.DISABLED
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#query-data", new=2, autoraise=True)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
self.sql = self.sql_text.get("1.0", "end-1c")
if len(self.sql) > 0:
self.canceled = False
self.dlg.destroy()
def get_where(self):
self.dlg.grab_set()
center_window(self.dlg)
self.sql_text.focus()
self.dlg.wait_window(self.dlg)
if not self.canceled:
return (self.sql, self.act_var.get())
else:
return (None, None)
class PlotDialog(object):
def __init__(self, parent, column_specs):
self.parent = parent
self.column_specs = column_specs
self.dataset = None
self.data_labels = None
self.plot_data = None
self.plot_data_labels = None
self.dlg = tk.Toplevel()
self.dlg.title("Plot")
self.dlg.columnconfigure(0, weight=1)
self.auto_update = True
self.plot_title = None
self.bins = 10
def set_autoupdate():
if self.autoupdate_var.get() == "1":
self.auto_update = True
self.q_redraw()
else:
self.auto_update = False
# Message
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
prompt_frame.columnconfigure(0, weight=1)
msg_lbl = ttk.Label(prompt_frame, width=70, text="Select the type of plot, columns for X and Y data, and whether to show all data or only selected data.")
msg_lbl.grid(row=0, column=0, sticky=tk.W, padx=(6,6), pady=(3,3))
def wrap_msg(event):
msg_lbl.configure(wraplength=event.width - 5)
msg_lbl.bind("", wrap_msg)
# Controls
ctrl_frame = tk.Frame(self.dlg)
ctrl_frame.grid(row=1, column=0, sticky=tk.N+tk.EW)
self.type_var = tk.StringVar(ctrl_frame, "")
type_lbl = ttk.Label(ctrl_frame, text="Plot type:")
type_lbl.grid(row=0, column=0, sticky=tk.E, padx=(6,3), pady=(3,3))
self.type_sel = ttk.Combobox(ctrl_frame, state="readonly", textvariable=self.type_var, width=20,
values=["Box plot", "Category counts", "Empirical CDF", "Histogram", "Line plot", "Scatter plot", "Y range plot"])
self.type_sel.grid(row=0, column=1, columnspan=2, sticky=tk.W, padx=(3,6), pady=(3,3))
self.type_sel.bind("<>", self.set_xy)
self.sel_only_var = tk.StringVar(ctrl_frame, "0")
self.sel_only_ck = ttk.Checkbutton(ctrl_frame, text="Selected data only", command=self.q_redraw, variable=self.sel_only_var,
onvalue="1", offvalue="0")
self.sel_only_ck.grid(row=1, column=0, columnspan=2, sticky=tk.W, padx=(6,3), pady=(3,3))
self.autoupdate_var = tk.StringVar(ctrl_frame, "1")
self.autoupdate_ck = ttk.Checkbutton(ctrl_frame, text="Auto-update", command=set_autoupdate, variable=self.autoupdate_var,
onvalue="1", offvalue="0")
self.autoupdate_ck.grid(row=1, column=2, sticky=tk.W, padx=(3,3), pady=(3,3))
self.x_var = tk.StringVar(ctrl_frame, "")
x_lbl = ttk.Label(ctrl_frame, text="X column:")
x_lbl.grid(row=0, column=3, sticky=tk.E, padx=(6,3), pady=(3,3))
self.x_sel = ttk.Combobox(ctrl_frame, state="disabled", textvariable=self.x_var, width=20)
self.x_sel.grid(row=0, column=4, sticky=tk.W, padx=(3,6), pady=(3,3))
self.x_sel.bind("<>", self.x_changed)
self.y_var = tk.StringVar(ctrl_frame, "")
y_lbl = ttk.Label(ctrl_frame, text="Y column:")
y_lbl.grid(row=1, column=3, sticky=tk.E, padx=(6,3), pady=(3,3))
self.y_sel = ttk.Combobox(ctrl_frame, state="disabled", textvariable=self.y_var, width=20)
self.y_sel.grid(row=1, column=4, sticky=tk.W, padx=(3,6), pady=(3,3))
self.y_sel.bind("<>", self.y_changed)
self.xlog_var = tk.StringVar(ctrl_frame, "0")
self.xlog_ck = ttk.Checkbutton(ctrl_frame, text="Log X", state="disabled", command=self.q_redraw, variable=self.xlog_var,
onvalue="1", offvalue="0")
self.xlog_ck.grid(row=0, column=5, sticky=tk.W, padx=(6,6), pady=(3,3))
self.ylog_var = tk.StringVar(ctrl_frame, "0")
self.ylog_ck = ttk.Checkbutton(ctrl_frame, text="Log Y", state="disabled", command=self.q_redraw, variable=self.ylog_var,
onvalue="1", offvalue="0")
self.ylog_ck.grid(row=1, column=5, sticky=tk.W, padx=(6,6), pady=(3,3))
# Plot
self.content_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
self.content_frame.grid(row=2, column=0, sticky=tk.NSEW)
self.dlg.rowconfigure(2, weight=1)
self.dlg.columnconfigure(0, weight=1)
self.content_frame.rowconfigure(0, weight=1)
self.content_frame.columnconfigure(0, weight=1)
self.plotfig = Figure(dpi=100)
self.plotfig.set_figheight(5)
self.plotfig_canvas = FigureCanvasTkAgg(self.plotfig, self.content_frame)
self.plot_nav = NavigationToolbar2Tk(self.plotfig_canvas, self.content_frame)
self.plot_axes = self.plotfig.add_subplot(111)
self.plotfig_canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=1)
self.plot_nav.update()
# Buttons
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=1)
btn_frame.grid(row=3, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=0)
btn_frame.columnconfigure(1, weight=0)
btn_frame.columnconfigure(2, weight=0)
btn_frame.columnconfigure(3, weight=0)
btn_frame.columnconfigure(4, weight=1)
self.canceled = False
self.help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
self.help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
self.dlg.bind("", self.do_help)
self.data_btn = ttk.Button(btn_frame, text="Source Data", state="disabled", command=self.show_data, underline=0)
self.data_btn.grid(row=0, column=1, sticky=tk.W, padx=(3,3))
self.plot_data_btn = ttk.Button(btn_frame, text="Plot Data", state="disabled", command=self.show_plot_data, underline=0)
self.plot_data_btn.grid(row=0, column=2, sticky=tk.W, padx=(3,3))
self.clone_btn = ttk.Button(btn_frame, text="Clone", command=self.clone_plot, underline=3)
self.clone_btn.grid(row=0, column=3, sticky=tk.W, padx=(3,6))
close_btn = ttk.Button(btn_frame, text="Close", command=self.do_close, underline=0)
close_btn.grid(row=0, column=4, sticky=tk.E, padx=(6,6))
self.dlg.bind("", self.clone_plot)
self.dlg.bind("", self.do_close)
self.dlg.bind("", self.do_close)
self.dlg.bind("", self.set_title)
self.dlg.bind("", self.set_xlabel)
self.dlg.bind("", self.set_ylabel)
center_window(self.dlg)
raise_window(self.dlg)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/dialogs.html#plot-dialog", new=2, autoraise=True)
def show_data(self, *args):
# Show data that have been collected for plotting, but not summarized as needed for a particular plot type.
if self.dataset is not None:
dlg = MsgDialog2("Source Data", "Original data:")
variables = len(self.dataset)
rowwise_data = []
for i in range(len(self.dataset[0])):
row = []
for j in range(variables):
row.append(self.dataset[j][i])
rowwise_data.append(row)
tframe, tdata = treeview_table(dlg.content_frame, rowwise_data, self.data_labels)
tframe.grid(row=0, column=0, sticky=tk.NSEW)
dlg.show()
def show_plot_data(self, *args):
# Show data as summarized for a particular plot type.
if self.plot_data is not None:
dlg = MsgDialog2("Data for Plotting", "Data to be plotted:")
variables = len(self.plot_data)
rowwise_data = []
max_data_len = max([len(self.plot_data[i]) for i in range(variables)])
for i in range(max_data_len):
row = []
for j in range(variables):
try:
# Boxplot data are not necessarily a full matrix
row.append(self.plot_data[j][i])
except:
row.append(None)
rowwise_data.append(row)
tframe, tdata = treeview_table(dlg.content_frame, rowwise_data, self.plot_data_labels)
tframe.grid(row=0, column=0, sticky=tk.NSEW)
dlg.show()
def clone_plot(self, *args):
self.parent.clone_plot(self)
def x_changed(self, *args):
plot_type = self.type_var.get()
if plot_type in ("Category counts", "Box plot"):
self.xlog_ck["state"] = "disabled"
else:
self.xlog_ck["state"] = "normal"
self.q_redraw(args)
def y_changed(self, *args):
plot_type = self.type_var.get()
if plot_type in ("Category counts", "Histogram", "Empirical CDF"):
self.ylog_ck["state"] = "disabled"
else:
self.ylog_ck["state"] = "normal"
self.q_redraw(args)
def set_xy(self, *args):
# Enable X and Y value selection, and set Combobox values based on plot type and column types.
self.plotfig.clear()
self.plot_title = None
self.dlg.bind("")
self.plot_axes = self.plotfig.add_subplot(111)
self.plotfig_canvas.draw()
self.dataset = None
self.data_labels = None
self.plot_data = None
self.plot_data_labels = None
self.data_btn["state"] = "disabled"
self.plot_data_btn["state"] = "disabled"
categ_columns = [c[0] for c in self.column_specs if c[1] in ("string", "boolean")]
categ_columns.sort()
# quant_columns includes date and timestamp columns
quant_columns = [c[0] for c in self.column_specs if c[1] in ("int", "float", "date", "timestamp", "timestamptz")]
quant_columns.sort()
numeric_columns = [c[0] for c in self.column_specs if c[1] in ("int", "float")]
numeric_columns.sort()
date_columns = [c[0] for c in self.column_specs if c[1] in ("date", "timestamp", "timestamptz")]
date_columns.sort()
self.x_var.set('')
self.y_var.set('')
self.xlog_ck["state"] = "normal"
self.ylog_ck["state"] = "normal"
self.x_sel["state"] = "readonly"
self.y_sel["state"] = "readonly"
plot_type = self.type_var.get()
if plot_type == "Category counts":
self.x_sel["values"] = categ_columns
self.xlog_ck["state"] = "disabled"
self.y_sel["state"] = "disabled"
self.ylog_ck["state"] = "disabled"
elif plot_type in ("Histogram", "Empirical CDF"):
self.x_sel["values"] = numeric_columns
self.y_sel["state"] = "disabled"
self.ylog_ck["state"] = "disabled"
if plot_type == "Histogram":
self.dlg.bind("", self.set_bins)
elif plot_type == "Box plot":
self.x_sel["values"] = list(set(categ_columns) | set(date_columns))
self.xlog_ck["state"] = "disabled"
self.y_sel["values"] = numeric_columns
else:
self.x_sel["values"] = quant_columns
self.y_sel["values"] = quant_columns
def q_redraw(self, get_data=True, *args):
# Conditionally (re)draw the plot.
plot_type = self.type_var.get()
can_redraw = (plot_type in ("Category counts", "Empirical CDF", "Histogram") and self.x_var.get() != '') \
or (plot_type in ("Scatter plot", "Line plot", "Box plot", "Y range plot") and self.x_var.get() != '' and self.y_var.get() != '')
if can_redraw:
self.plotfig.clear()
self.plot_axes = self.plotfig.add_subplot(111)
self.plotfig_canvas.draw()
if get_data or self.dataset is None or self.plot_data is None:
self.get_data()
if self.dataset is not None:
self.redraw()
def get_data(self):
self.data_btn["state"] = "disabled"
self.plot_data_btn["state"] = "disabled"
self.dataset = None
plot_type = self.type_var.get()
column_list = [self.x_var.get()]
if self.y_var.get() != '':
column_list.append(self.y_var.get())
if self.sel_only_var.get() == "1":
dataset = self.parent.get_sel_data(column_list)
else:
dataset = self.parent.get_all_data(column_list)
if dataset is None or len(dataset[0]) == 0:
self.dataset = None
self.data_labels = None
self.plot_data = None
self.plot_data_labels = None
self.data_btn["state"] = "disabled"
self.plot_data_btn["state"] = "disabled"
else:
# Remove missing data
column_indexes = range(len(dataset))
clean_data = [[] for _ in dataset]
for i in range(len(dataset[0])):
ok = True
for col in column_indexes:
if dataset[col][i] is None or dataset[col][i] == '':
ok = False
if ok:
for col in column_indexes:
clean_data[col].append(dataset[col][i])
dataset = None
# Convert quantitative data types
if plot_type != "Category counts":
x_data_type = [cs[1] for cs in self.column_specs if cs[0] == self.x_var.get()][0]
cast_fn = data_type_cast_fn(x_data_type)
for i in range(len(clean_data[0])):
clean_data[0][i] = cast_fn(clean_data[0][i])
if self.y_sel["state"] != "disabled" and self.y_var.get() != "" and len(clean_data) > 1:
y_data_type = [cs[1] for cs in self.column_specs if cs[0] == self.y_var.get()][0]
cast_fn = data_type_cast_fn(y_data_type)
for i in range(len(clean_data[1])):
clean_data[1][i] = cast_fn(clean_data[1][i])
# Set data labels
if self.y_var.get() != '':
self.data_labels = [self.x_var.get(), self.y_var.get()]
else:
self.data_labels = [self.x_var.get()]
# Log-transform data if specified.
log_data = [[] for _ in clean_data]
log_error = False
if self.xlog_ck["state"] != "disabled" and self.xlog_var.get() == "1":
for i in range(len(clean_data[0])):
try:
log_data[0].append(math.log10(clean_data[0][i]))
except:
log_error = True
self.xlog_var.set("0")
self.xlog_ck["state"] = "disabled"
break
if not log_error:
clean_data[0] = log_data[0]
self.data_labels[0] = "Log10 of " + self.x_var.get()
if self.ylog_ck["state"] != "disabled" and self.ylog_var.get() == "1" and len(clean_data) > 1:
log_error = False
for i in range(len(clean_data[1])):
try:
log_data[1].append(math.log10(clean_data[1][i]))
except:
log_error = True
self.ylog_var.set("0")
self.ylog_ck["state"] = "disabled"
break
if not log_error:
clean_data[1] = log_data[1]
self.data_labels[1] = "Log10 of " + self.y_var.get()
log_data = None
#
self.dataset = clean_data
self.data_btn["state"] = "normal"
# Summarize and sort the data as needed for each type of plot.
if plot_type == "Category counts":
# Count of values for each X, ordered by X
counter = collections.Counter(self.dataset[0])
x_vals = list(counter.keys())
x_vals.sort()
x_counts = [counter[k] for k in x_vals]
self.plot_data = [x_vals, x_counts]
#self.plot_data_labels = [self.x_var.get(), "Count"]
self.plot_data_labels = [self.data_labels[0], "Count"]
elif plot_type == "Box plot":
# A list of Y values for each X value
x_vals = list(set(self.dataset[0]))
ds = list(zip(self.dataset[0], self.dataset[1]))
plot_data = []
for x in x_vals:
plot_data.append([d[1] for d in ds if d[0] == x])
self.plot_data = plot_data
self.plot_data_labels = x_vals
elif plot_type == "Empirical CDF":
# Y is the fraction of data points below each X value
x_counts = np.unique(self.dataset[0], return_counts=True)
y_vals = list(np.cumsum(x_counts[1]/np.sum(x_counts[1])))
self.plot_data = [list(x_counts[0]), y_vals]
self.plot_data_labels = [self.data_labels[0], "Cumulative frequency"]
elif plot_type == "Y range plot":
# Min and max Y for each X
x_vals = list(set(self.dataset[0]))
x_vals.sort()
y_vals = [[None, None]] * len(x_vals)
plotdata = dict(zip(x_vals, y_vals))
for i in range(len(self.dataset[0])):
x = self.dataset[0][i]
y = self.dataset[1][i]
y_vals = plotdata[x]
if y_vals[0] is None or y < y_vals[0]:
plotdata[x][0] = y
if y_vals[1] is None or y > y_vals[1]:
plotdata[x][1] = y
y1 = [plotdata[x][0] for x in x_vals]
y2 = [plotdata[x][1] for x in x_vals]
self.plot_data = [x_vals, y1, y2]
self.plot_data_labels = [self.x_var.get(), self.y_var.get() + " min", self.y_var.get() + " max"]
elif plot_type == "Line plot":
# Sort by X
ds = list(zip(self.dataset[0], self.dataset[1]))
ds.sort()
ds2 = list(zip(*ds))
self.plot_data = [list(ds2[0]), list(ds2[1])]
self.plot_data_labels = self.data_labels
elif plot_type in ("Histogram", "Scatter plot", "Y range plot"):
# No special preparation
self.plot_data = self.dataset
self.plot_data_labels = self.data_labels
self.plot_data_btn["state"] = "normal"
def redraw(self):
plot_type = self.type_var.get()
if self.plot_data is not None and len(self.plot_data) > 0 and len(self.plot_data[0]) > 0:
if plot_type == "Category counts":
self.plot_axes.bar(self.plot_data[0], self.plot_data[1])
self.plot_axes.set_xlabel(self.plot_data_labels[0])
self.plot_axes.set_ylabel(self.plot_data_labels[1])
elif plot_type == "Histogram":
self.plot_axes.hist(self.plot_data[0], bins=self.bins)
self.plot_axes.set_xlabel(self.x_var.get())
self.plot_axes.set_ylabel("Counts")
elif plot_type == "Scatter plot":
self.plot_axes.scatter(self.plot_data[0], self.plot_data[1])
self.plot_axes.set_xlabel(self.plot_data_labels[0])
self.plot_axes.set_ylabel(self.plot_data_labels[1])
elif plot_type == "Line plot":
self.plot_axes.plot(self.plot_data[0], self.plot_data[1])
self.plot_axes.set_xlabel(self.plot_data_labels[0])
self.plot_axes.set_ylabel(self.plot_data_labels[1])
elif plot_type == "Empirical CDF":
self.plot_axes.stackplot(self.plot_data[0], self.plot_data[1])
self.plot_axes.set_xlabel(self.plot_data_labels[0])
self.plot_axes.set_ylabel(self.plot_data_labels[1])
elif plot_type == "Y range plot":
self.plot_axes.fill_between(self.plot_data[0], self.plot_data[1], self.plot_data[2])
self.plot_axes.set_xlabel(self.x_var.get())
self.plot_axes.set_ylabel(self.y_var.get())
elif plot_type == "Box plot":
self.plot_axes.boxplot(self.plot_data, labels=self.plot_data_labels)
self.plot_axes.set_xlabel(self.x_var.get())
self.plot_axes.set_ylabel(self.data_labels[1])
if self.plot_title is not None:
self.plot_axes.set_title(self.plot_title)
self.plotfig_canvas.draw()
self.plot_nav.update()
def set_title(self, *args):
dlg = OneEntryDialog(self.dlg, "Plot Title", "Enter a title for the plot:")
title = dlg.show()
if title is not None:
self.plot_title = title
self.plot_axes.set_title(title)
self.plotfig_canvas.draw()
def set_xlabel(self, *args):
dlg = OneEntryDialog(self.dlg, "X-Axis Label", "Enter a title for the X-axis label:")
xlabel = dlg.show()
if xlabel is not None:
self.plot_axes.set_xlabel(xlabel)
self.plotfig_canvas.draw()
def set_ylabel(self, *args):
dlg = OneEntryDialog(self.dlg, "Y-Axis Label", "Enter a title for the Y-axis label:")
ylabel = dlg.show()
if ylabel is not None:
self.plot_axes.set_ylabel(ylabel)
self.plotfig_canvas.draw()
def set_bins(self, *args):
dlg = OneIntDialog(self.dlg, "Histogram Bins", "Enter the number of histogram bins", min_value=2, max_value=50, initial=self.bins)
num_bins = dlg.show()
if num_bins is not None:
self.bins = num_bins
if self.type_var.get() == "Histogram":
self.q_redraw()
def do_close(self, *args):
self.parent.remove_plot(self)
try:
self.dlg.destroy()
except:
pass
def show(self):
self.dlg.update_idle_tasks()
self.dlg.minsize(width=500, height=500)
self.dlg.wait_window(self.dlg)
class MsgDialog(object):
def __init__(self, title, message, parent=None, bgcolor=None, can_resize=True):
if parent is not None:
self.dlg = tk.Toplevel(parent)
else:
self.dlg = tk.Toplevel()
if bgcolor is not None:
self.dlg.configure(bg=bgcolor)
self.dlg.title(title)
self.dlg.rowconfigure(0, weight=1)
self.dlg.columnconfigure(0, weight=2)
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(6,6))
prompt_frame.rowconfigure(0, weight=1)
prompt_frame.columnconfigure(0, weight=2)
msg_lbl = ttk.Label(prompt_frame, wraplength=100, text=message)
msg_lbl.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
def wrap_msg(event):
msg_lbl.configure(wraplength=event.width - 5)
msg_lbl.bind("", wrap_msg)
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=2)
btn_frame.grid(row=1, column=0, sticky=tk.EW, pady=(0,0))
btn_frame.columnconfigure(0, weight=1)
# Buttons
self.canceled = False
ok_btn = ttk.Button(btn_frame, text="Close", command=self.do_select, underline=0)
ok_btn.grid(row=0, column=0, sticky=tk.E, padx=(12,6))
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
self.dlg.resizable(can_resize, can_resize)
self.dlg.minsize(width=300, height=50)
ok_btn.focus()
def do_select(self, *args):
self.dlg.destroy()
def show(self, grab=False):
if grab:
self.dlg.grab_set()
center_window(self.dlg)
raise_window(self.dlg)
self.dlg.attributes('-topmost', 'true')
self.dlg.attributes('-topmost', 'false')
self.dlg.wait_window(self.dlg)
class MsgDialog2(object):
# With an extra content frame.
def __init__(self, title, message, parent=None, bgcolor=None, can_resize=True):
if parent is not None:
self.dlg = tk.Toplevel(parent)
else:
self.dlg = tk.Toplevel()
if bgcolor is not None:
self.dlg.configure(bg=bgcolor)
self.dlg.title(title)
#self.dlg.rowconfigure(0, weight=1)
self.dlg.columnconfigure(0, weight=1)
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(6,6))
prompt_frame.columnconfigure(0, weight=1)
msg_lbl = ttk.Label(prompt_frame, wraplength=80, text=message)
msg_lbl.grid(row=0, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
def wrap_msg(event):
msg_lbl.configure(wraplength=event.width - 5)
msg_lbl.bind("", wrap_msg)
self.content_frame = tk.Frame(self.dlg)
self.content_frame.grid(row=1, column=0, sticky=tk.NSEW)
self.dlg.rowconfigure(1, weight=1)
self.content_frame.rowconfigure(0, weight=1)
self.content_frame.columnconfigure(0, weight=1)
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=1)
btn_frame.grid(row=2, column=0, sticky=tk.EW, pady=(0,0))
btn_frame.columnconfigure(0, weight=1)
# Buttons
self.canceled = False
ok_btn = ttk.Button(btn_frame, text="Close", command=self.do_select, underline=0)
ok_btn.grid(row=0, column=0, sticky=tk.E, padx=(12,6))
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
self.dlg.resizable(can_resize, can_resize)
self.dlg.minsize(width=300, height=50)
#self.dlg.maxsize(width=1600, height=1000)
ok_btn.focus()
def do_select(self, *args):
self.dlg.destroy()
def show(self, grab=False):
if grab:
self.dlg.grab_set()
center_window(self.dlg)
raise_window(self.dlg)
self.dlg.attributes('-topmost', 'true')
self.dlg.attributes('-topmost', 'false')
self.dlg.wait_window(self.dlg)
class OneEntryDialog(object):
def __init__(self, parent, title, prompt):
self.dlg = tk.Toplevel(parent)
self.dlg.title(title)
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
prompt_frame.columnconfigure(0, weight=1)
msg_lbl = ttk.Label(prompt_frame, text=prompt)
msg_lbl.grid(row=0, column=0, sticky=tk.W, padx=(6,6), pady=(6,6))
self.entry_var = tk.StringVar(self.dlg, "")
self.entry_var.trace('w', self.check_enable)
self.val_entry = ttk.Entry(prompt_frame, width=50, textvariable=self.entry_var)
self.val_entry.grid(row=1, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=1)
btn_frame.grid(row=2, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
self.val_entry.focus()
# Buttons
self.canceled = False
self.ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
self.ok_btn.grid(row=0, column=0, sticky=tk.E, padx=(6,3))
self.ok_btn["state"] = tk.DISABLED
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel, underline=0)
cancel_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
def check_enable(self, *args):
if self.entry_var.get() != '':
self.ok_btn["state"] = tk.NORMAL
else:
self.ok_btn["state"] = tk.DISABLED
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
if self.entry_var.get() != '':
self.canceled = False
self.dlg.destroy()
def show(self):
self.dlg.grab_set()
center_window(self.dlg)
raise_window(self.dlg)
self.dlg.resizable(True, False)
self.dlg.attributes('-topmost', 'true')
self.dlg.wait_window(self.dlg)
if self.canceled:
return None
else:
return self.entry_var.get()
class OneIntDialog(object):
def __init__(self, parent, title, prompt, min_value, max_value, initial):
self.dlg = tk.Toplevel(parent)
self.dlg.title(title)
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
prompt_frame.columnconfigure(0, weight=1)
msg_lbl = ttk.Label(prompt_frame, text=prompt)
msg_lbl.grid(row=0, column=0, sticky=tk.W, padx=(6,6), pady=(6,6))
self.entry_var = tk.IntVar(self.dlg, initial)
self.val_entry = ttk.Spinbox(prompt_frame, textvariable=self.entry_var, from_=min_value, to=max_value)
self.val_entry.grid(row=1, column=0, sticky=tk.EW, padx=(6,6), pady=(3,3))
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=1)
btn_frame.grid(row=2, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
self.val_entry.focus()
# Buttons
self.canceled = False
self.ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
self.ok_btn.grid(row=0, column=0, sticky=tk.E, padx=(6,3))
self.dlg.bind('', self.do_select)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel, underline=0)
cancel_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_select)
self.dlg.bind("", self.do_select)
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
self.canceled = False
self.dlg.destroy()
def show(self):
self.dlg.grab_set()
center_window(self.dlg)
raise_window(self.dlg)
self.dlg.resizable(True, False)
self.dlg.attributes('-topmost', 'true')
self.dlg.wait_window(self.dlg)
if self.canceled:
return None
else:
return self.entry_var.get()
class GetEditorDialog(object):
def __init__(self, parent, current_editor):
self.dlg = tk.Toplevel(parent)
self.dlg.title("Set Text Editor")
self.dlg.columnconfigure(0, weight=1)
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, pady=(3,3))
prompt_frame.columnconfigure(0, weight=1)
msg_lbl = ttk.Label(prompt_frame, wraplength=600, text="Choose the text editor to be used to edit SQL commands when pulling data from a database. The editor is set from the EDITOR environment variable on startup, and may also be changed using a configuration file.")
msg_lbl.grid(row=0, column=0, columnspan=2, sticky=tk.EW, padx=(6,6), pady=(6,3))
def wrap_msg(event):
msg_lbl.configure(wraplength=event.width - 5)
msg_lbl.bind("", wrap_msg)
self.entry_var = tk.StringVar(self.dlg, current_editor or "")
self.entry_var.trace('w', self.check_enable)
self.val_entry = ttk.Entry(prompt_frame, width=60, textvariable=self.entry_var)
self.val_entry.grid(row=1, column=0, sticky=tk.EW, padx=(6,6), pady=(3,6))
fn_btn = ttk.Button(prompt_frame, text="Browse", command=self.get_fn, underline=0)
fn_btn.grid(row=1, column=1, sticky=tk.W, padx=(3,3))
self.dlg.bind("", self.get_fn)
# Buttons
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=1)
btn_frame.grid(row=2, column=0, sticky=tk.EW, pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
self.val_entry.focus()
self.canceled = False
self.ok_btn = ttk.Button(btn_frame, text="OK", command=self.do_select, underline=0)
self.ok_btn.grid(row=0, column=0, sticky=tk.E, padx=(6,3))
self.ok_btn["state"] = tk.DISABLED
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel, underline=0)
cancel_btn.grid(row=0, column=1, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_cancel)
self.dlg.bind("", self.do_select)
self.dlg.bind('', self.do_select)
self.check_enable()
def get_fn(self, *args):
fn = tkfiledialog.askopenfilename(parent=self.dlg)
if fn is not None and fn != '' and fn != ():
self.entry_var.set(fn)
def check_enable(self, *args):
if self.entry_var.get() != '':
self.ok_btn["state"] = tk.NORMAL
else:
self.ok_btn["state"] = tk.DISABLED
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def do_select(self, *args):
if self.entry_var.get() != '':
self.canceled = False
self.dlg.destroy()
def show(self):
self.dlg.grab_set()
center_window(self.dlg)
raise_window(self.dlg)
self.dlg.resizable(True, False)
self.dlg.attributes('-topmost', 'true')
self.dlg.wait_window(self.dlg)
if self.canceled:
return None
else:
return self.entry_var.get()
class SelDataSrcDialog(object):
def __init__(self, parent, mapui):
self.parent = parent
self.mapui = mapui
self.canceled = False
self.dlg = tk.Toplevel(parent)
self.dlg.title("Select Mapping Data")
self.dlg.protocol("WM_DELETE_WINDOW", self.do_cancel)
self.dlg.columnconfigure(0, weight=1)
self.rv = (None, None, None, None, None, None, None, None, None, None)
# Prompt
prompt_frame = tk.Frame(self.dlg)
prompt_frame.grid(row=0, column=0, sticky=tk.NSEW, padx=(6,6), pady=(6,3))
msg_lbl = ttk.Label(prompt_frame, width=30, wraplength=100, anchor=tk.W, justify=tk.LEFT, text="Select the type of data source to use. You will then be prompted for details about the selected data source.")
msg_lbl.grid(row=0, column=0, padx=(6,6), pady=(3,3))
def wrap_msg(event):
msg_lbl.configure(wraplength=event.width - 5)
msg_lbl.bind("", wrap_msg)
# Select buttons
sel_frame = tk.Frame(self.dlg)
sel_frame.grid(row=1, column=0, sticky=tk.NSEW, pady=(6,9))
csv_btn = ttk.Button(sel_frame, text=" CSV file ", command=self.sel_csv)
csv_btn.grid(row=0, column=0, sticky=tk.EW, padx=(3,3), pady=(3,3))
ss_btn = ttk.Button(sel_frame, text="Spreadsheet", command=self.sel_spreadsheet)
ss_btn.grid(row=0, column=1, sticky=tk.EW, padx=(3,3), pady=(3,3))
db_btn = ttk.Button(sel_frame, text=" Database ", command=self.sel_database)
db_btn.grid(row=0, column=2, sticky=tk.EW, padx=(3,3), pady=(3,3))
# Help and Cancel buttons
btn_frame = tk.Frame(self.dlg, borderwidth=3, relief=tk.RIDGE)
btn_frame.columnconfigure(0, weight=1)
btn_frame.grid(row=2, column=0, sticky=tk.S+tk.EW, padx=(3,3), pady=(3,3))
btn_frame.columnconfigure(0, weight=1)
self.canceled = False
help_btn = ttk.Button(btn_frame, text="Help", command=self.do_help, underline=0)
help_btn.grid(row=0, column=0, sticky=tk.W, padx=(6,3))
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.do_cancel)
cancel_btn.grid(row=0, column=0, sticky=tk.E, padx=(3,6))
self.dlg.bind("", self.do_cancel)
def do_help(self, *args):
webbrowser.open("https://mapdata.osdn.io/", new=2, autoraise=True)
def sel_csv(self):
dfd = DataFileDialog()
self.rv = dfd.get_datafile()
if self.rv[0] is not None:
self.dlg.destroy()
def sel_spreadsheet(self):
dfd = ImportSpreadsheetDialog(self.dlg, self.mapui)
self.rv = dfd.get_datafile()
if self.rv[0] is not None:
self.dlg.destroy()
def sel_database(self):
dbd = DbConnectDialog(self.dlg, self.mapui)
self.rv = dbd.get_data()
if self.rv[0] is not None:
self.dlg.destroy()
def do_cancel(self, *args):
self.canceled = True
self.dlg.destroy()
def select(self):
self.dlg.grab_set()
center_window(self.dlg)
raise_window(self.dlg)
self.dlg.resizable(False, False)
self.dlg.focus()
self.dlg.wait_window(self.dlg)
return self.rv
class EncodedFile(object):
# A class providing an open method for an encoded file, allowing reading
# and writing using unicode, without explicit decoding or encoding.
def __repr__(self):
return u"EncodedFile(%r, %r)" % (self.filename, self.encoding)
def __init__(self, filename, file_encoding):
self.filename = filename
self.encoding = file_encoding
self.bom_length = 0
def detect_by_bom(path, default_enc):
with io.open(path, 'rb') as f:
raw = f.read(4)
for enc, boms, bom_len in (
('utf-8-sig', (codecs.BOM_UTF8,), 3),
('utf_16', (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE), 2),
('utf_32', (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE), 4)):
if any(raw.startswith(bom) for bom in boms):
return enc, bom_len
return default_enc, 0
if os.path.exists(filename):
self.encoding, self.bom_length = detect_by_bom(filename, file_encoding)
self.fo = None
def open(self, mode='r'):
self.fo = io.open(file=self.filename, mode=mode, encoding="UTF8", newline=None)
return self.fo
def close(self):
if self.fo is not None:
self.fo.close()
class LineDelimiter(object):
def __init__(self, delim, quote, escchar):
self.delimiter = delim
self.joinchar = delim if delim else u""
self.quotechar = quote
if quote:
if escchar:
self.quotedquote = escchar+quote
else:
self.quotedquote = quote+quote
else:
self.quotedquote = None
def delimited(self, datarow, add_newline=True):
global conf
if self.quotechar:
d_row = []
for e in datarow:
if isinstance(e, str):
if (self.quotechar in e) or (self.delimiter is not None and self.delimiter in e) or (u'\n' in e) or (u'\r' in e):
d_row.append(u"%s%s%s" % (self.quotechar, e.replace(self.quotechar, self.quotedquote), self.quotechar))
else:
d_row.append(e)
else:
if e is None:
d_row.append('')
else:
d_row.append(e)
text = self.joinchar.join([type(u"")(d) for d in d_row])
else:
d_row = []
for e in datarow:
if e is None:
d_row.append('')
else:
d_row.append(e)
text = self.joinchar.join([type(u"")(d) for d in d_row])
if add_newline:
text = text + u"\n"
return text
def write_delimited_file(outfile, filefmt, column_headers, rowsource, file_encoding='utf8', append=False):
delim = None
quote = None
escchar = None
if filefmt.lower() == 'csv':
delim = ","
quote = '"'
escchar = None
elif filefmt.lower() in ('tab', 'tsv'):
delim = "\t"
quote = None
escchar = None
elif filefmt.lower() in ('tabq', 'tsvq'):
delim = "\t"
quote = '"'
escchar = None
elif filefmt.lower() in ('unitsep', 'us'):
delim = chr(31)
quote = None
escchar = None
elif filefmt.lower() == 'plain':
delim = " "
quote = ''
escchar = None
elif filefmt.lower() == 'tex':
delim = "&"
quote = ''
escchar = None
line_delimiter = LineDelimiter(delim, quote, escchar)
fmode = "w" if not append else "a"
ofile = EncodedFile(outfile, file_encoding).open(mode=fmode)
fdesc = outfile
if not (filefmt.lower() == 'plain' or append):
datarow = line_delimiter.delimited(column_headers)
ofile.write(datarow)
for rec in rowsource:
datarow = line_delimiter.delimited(rec)
ofile.write(datarow)
ofile.close()
class OdsFile(object):
def __repr__(self):
return u"OdsFile()"
def __init__(self):
self.filename = None
self.wbk = None
self.cell_style_names = []
def open(self, filename):
self.filename = filename
if os.path.isfile(filename):
self.wbk = odf.opendocument.load(filename)
# Get a list of all cell style names used, so as not to re-define them.
# Adapted from http://www.pbertrand.eu/reading-an-odf-document-with-odfpy/
for sty in self.wbk.automaticstyles.childNodes:
try:
fam = sty.getAttribute("family")
if fam == "table-cell":
name = sty.getAttribute("name")
if not name in self.cell_style_names:
self.cell_style_names.append(name)
except:
pass
else:
self.wbk = odf.opendocument.OpenDocumentSpreadsheet()
def define_body_style(self):
st_name = "body"
if not st_name in self.cell_style_names:
body_style = odf.style.Style(name=st_name, family="table-cell")
body_style.addElement(odf.style.TableCellProperties(attributes={"verticalalign":"top"}))
self.wbk.styles.addElement(body_style)
self.cell_style_names.append(st_name)
def define_header_style(self):
st_name = "header"
if not st_name in self.cell_style_names:
header_style = odf.style.Style(name=st_name, family="table-cell")
header_style.addElement(odf.style.TableCellProperties(attributes={"borderbottom":"1pt solid #000000",
"verticalalign":"bottom"}))
self.wbk.styles.addElement(header_style)
self.cell_style_names.append(st_name)
def define_iso_datetime_style(self):
st_name = "iso_datetime"
if not st_name in self.cell_style_names:
dt_style = odf.number.DateStyle(name="iso-datetime")
dt_style.addElement(odf.number.Year(style="long"))
dt_style.addElement(odf.number.Text(text=u"-"))
dt_style.addElement(odf.number.Month(style="long"))
dt_style.addElement(odf.number.Text(text=u"-"))
dt_style.addElement(odf.number.Day(style="long"))
# odfpy collapses text elements that have only spaces, so trying to insert just a space between the date
# and time actually results in no space between them. Other Unicode invisible characters
# are also trimmed. The delimiter "T" is used instead, and conforms to ISO-8601 specifications.
dt_style.addElement(odf.number.Text(text=u"T"))
dt_style.addElement(odf.number.Hours(style="long"))
dt_style.addElement(odf.number.Text(text=u":"))
dt_style.addElement(odf.number.Minutes(style="long"))
dt_style.addElement(odf.number.Text(text=u":"))
dt_style.addElement(odf.number.Seconds(style="long", decimalplaces="3"))
self.wbk.styles.addElement(dt_style)
self.define_body_style()
dts = odf.style.Style(name=st_name, datastylename="iso-datetime", parentstylename="body", family="table-cell")
self.wbk.automaticstyles.addElement(dts)
self.cell_style_names.append(st_name)
def define_iso_date_style(self):
st_name = "iso_date"
if st_name not in self.cell_style_names:
dt_style = odf.number.DateStyle(name="iso-date")
dt_style.addElement(odf.number.Year(style="long"))
dt_style.addElement(odf.number.Text(text=u"-"))
dt_style.addElement(odf.number.Month(style="long"))
dt_style.addElement(odf.number.Text(text=u"-"))
dt_style.addElement(odf.number.Day(style="long"))
self.wbk.styles.addElement(dt_style)
self.define_body_style()
dts = odf.style.Style(name=st_name, datastylename="iso-date", parentstylename="body", family="table-cell")
self.wbk.automaticstyles.addElement(dts)
self.cell_style_names.append(st_name)
def sheetnames(self):
# Returns a list of the worksheet names in the specified ODS spreadsheet.
return [sheet.getAttribute("name") for sheet in self.wbk.spreadsheet.getElementsByType(odf.table.Table)]
def sheet_named(self, sheetname):
# Return the sheet with the matching name. If the name is actually an integer,
# return that sheet number.
if isinstance(sheetname, int):
sheet_no = sheetname
else:
try:
sheet_no = int(sheetname)
if sheet_no < 1:
sheet_no = None
except:
sheet_no = None
if sheet_no is not None:
for i, sheet in enumerate(self.wbk.spreadsheet.getElementsByType(odf.table.Table)):
if i+1 == sheet_no:
return sheet
else:
sheet_no = None
if sheet_no is None:
for sheet in self.wbk.spreadsheet.getElementsByType(odf.table.Table):
if sheet.getAttribute("name").lower() == sheetname.lower():
return sheet
return None
def sheet_data(self, sheetname, junk_header_rows=0):
sheet = self.sheet_named(sheetname)
if not sheet:
warning("There is no sheet named %s" % sheetname, kwargs={})
raise
def row_data(sheetrow):
# Adapted from http://www.marco83.com/work/wp-content/uploads/2011/11/odf-to-array.py
cells = sheetrow.getElementsByType(odf.table.TableCell)
rowdata = []
for cell in cells:
p_content = []
repeat = cell.getAttribute("numbercolumnsrepeated")
if not repeat:
repeat = 1
spanned = int(cell.getAttribute("numbercolumnsspanned") or 0)
if spanned > 1:
repeat = spanned
ps = cell.getElementsByType(odf.text.P)
if len(ps) == 0:
for rr in range(int(repeat)):
p_content.append(None)
else:
for p in ps:
pval = type(u"")(p)
if len(pval) == 0:
for rr in range(int(repeat)):
p_content.append(None)
else:
for rr in range(int(repeat)):
p_content.append(pval)
if len(p_content) == 0:
for rr in range(int(repeat)):
rowdata.append(None)
elif p_content[0] != u'#':
rowdata.extend(p_content)
return rowdata
rows = sheet.getElementsByType(odf.table.TableRow)
if junk_header_rows > 0:
rows = rows[junk_header_rows: ]
return [row_data(r) for r in rows]
def new_sheet(self, sheetname):
# Returns a sheet (a named Table) that has not yet been added to the workbook
return odf.table.Table(name=sheetname)
def add_row_to_sheet(self, datarow, odf_table, header=False):
if header:
self.define_header_style()
style_name = "header"
else:
self.define_body_style()
style_name = "body"
tr = odf.table.TableRow()
odf_table.addElement(tr)
for item in datarow:
if isinstance(item, bool):
# Booleans must be evaluated before numbers.
# Neither of the first two commented-out lines actually work (a bug in odfpy?).
# Booleans *can* be written as either integers or strings; integers are chosen below.
#tc = odf.table.TableCell(booleanvalue='true' if item else 'false')
#tc = odf.table.TableCell(valuetype="boolean", value='true' if item else 'false')
tc = odf.table.TableCell(valuetype="boolean", value=1 if item else 0, stylename=style_name)
#tc = odf.table.TableCell(valuetype="string", stringvalue='True' if item else 'False')
elif isinstance(item, float) or isinstance(item, int):
tc = odf.table.TableCell(valuetype="float", value=item, stylename=style_name)
elif isinstance(item, datetime.datetime):
self.define_iso_datetime_style()
tc = odf.table.TableCell(valuetype="date", datevalue=item.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3], stylename="iso_datetime")
elif isinstance(item, datetime.date):
self.define_iso_date_style()
tc = odf.table.TableCell(valuetype="date", datevalue=item.strftime("%Y-%m-%d"), stylename="iso_date")
elif isinstance(item, datetime.time):
self.define_iso_datetime_style()
timeval = datetime.datetime(1899, 12, 30, item.hour, item.minute, item.second, item.microsecond, item.tzinfo)
tc = odf.table.TableCell(timevalue=timeval.strftime("PT%HH%MM%S.%fS"), stylename="iso_datetime")
tc.addElement(odf.text.P(text=timeval.strftime("%H:%M:%S.%f")))
elif isinstance(item, str):
item = item.replace(u'\n', u' ').replace(u'\r', u' ')
tc = odf.table.TableCell(valuetype="string", stringvalue=item, stylename=style_name)
else:
tc = odf.table.TableCell(value=item, stylename=style_name)
if item is not None:
tc.addElement(odf.text.P(text=item))
tr.addElement(tc)
def add_sheet(self, odf_table):
self.wbk.spreadsheet.addElement(odf_table)
def save_close(self):
ofile = io.open(self.filename, "wb")
self.wbk.write(ofile)
ofile.close()
self.filename = None
self.wbk = None
def close(self):
self.filename = None
self.wbk = None
def ods_data(filename, sheetname, junk_header_rows=0):
# Returns the data from the specified worksheet as a list of headers and a list of lists of rows.
wbk = OdsFile()
try:
wbk.open(filename)
except:
warning("%s is not a valid OpenDocument spreadsheet." % filename, kwargs={})
raise
try:
alldata = wbk.sheet_data(sheetname, junk_header_rows)
except:
warning("%s is not a worksheet in %s." % (sheetname, filename), kwargs={})
raise
colhdrs = alldata[0]
if any([x is None or len(x.strip())==0 for x in colhdrs]):
if conf.del_empty_cols:
blanks = [i for i in range(len(colhdrs)) if colhdrs[i] is None or len(colhdrs[i].strip())==0]
while len(blanks) > 0:
b = blanks.pop()
for r in range(len(alldata)):
del(alldata[r][b])
colhdrs = alldata[0]
else:
if conf.create_col_hdrs:
for i in range(len(colhdrs)):
if colhdrs[i] is None or len(colhdrs[i]) == 0:
colhdrs[i] = "Col%s" % str(i+1)
else:
warning("The input file %s, sheet %s has missing column headers." % (filename, sheetname), kwargs={})
raise
#if conf.clean_col_hdrs:
# colhdrs = clean_words(colhdrs)
#if conf.trim_col_hdrs != 'none':
# colhdrs = trim_words(colhdrs, conf.trim_col_hdrs)
#if conf.fold_col_hdrs != 'no':
# colhdrs = fold_words(colhdrs, conf.fold_col_hdrs)
#if conf.dedup_col_hdrs:
# colhdrs = dedup_words(colhdrs)
return colhdrs, alldata[1:]
def export_ods(outfile, hdrs, rows, append=False, querytext=None, sheetname=None, desc=None):
# If not given, determine the worksheet name to use. The pattern is "Sheetx", where x is
# the first integer for which there is not already a sheet name.
if append and os.path.isfile(outfile):
wbk = OdsFile()
wbk.open(outfile)
sheet_names = wbk.sheetnames()
name = sheetname or u"Sheet"
sheet_name = name
sheet_no = 1
while True:
if sheet_name not in sheet_names:
break
sheet_no += 1
sheet_name = u"%s%d" % (name, sheet_no)
wbk.close()
else:
sheet_name = sheetname or u"Sheet1"
if os.path.isfile(outfile):
os.unlink(outfile)
wbk = OdsFile()
wbk.open(outfile)
# Add the data to a new sheet.
tbl = wbk.new_sheet(sheet_name)
wbk.add_row_to_sheet(hdrs, tbl, header=True)
for row in rows:
wbk.add_row_to_sheet(row, tbl)
# Add sheet to workbook
wbk.add_sheet(tbl)
# Save and close the workbook.
wbk.save_close()
class XlsFile(object):
def __repr__(self):
return u"XlsFile()"
def __init__(self):
self.filename = None
self.encoding = None
self.wbk = None
self.datemode = 0
def open(self, filename, encoding=None, read_only=False):
self.filename = filename
self.encoding = encoding
self.read_only = read_only
self.wbk = xlrd.open_workbook(filename, encoding_override=self.encoding)
self.datemode = self.wbk.datemode
def sheetnames(self):
return self.wbk.sheet_names()
def sheet_named(self, sheetname):
# Return the sheet with the matching name. If the name is actually an integer,
# return that sheet number.
if isinstance(sheetname, int):
sheet_no = sheetname
else:
try:
sheet_no = int(sheetname)
if sheet_no < 1:
sheet_no = None
except:
sheet_no = None
if sheet_no is None:
sheet = self.wbk.sheet_by_name(sheetname)
else:
# User-specified sheet numbers should be 1-based; xlrd sheet indexes are 0-based
sheet = self.wbk.sheet_by_index(max(0, sheet_no-1))
return sheet
def sheet_data(self, sheetname, junk_header_rows=0):
sheet = self.sheet_named(sheetname)
# Don't rely on sheet.ncols and sheet.nrows, because Excel will count columns
# and rows that have ever been filled, even if they are now empty. Base the column count
# on the number of contiguous non-empty cells in the first row, and process the data up to nrows until
# a row is entirely empty.
def row_data(sheetrow, columns=None):
cells = sheet.row_slice(sheetrow)
if columns:
d = [cells[c] for c in range(columns)]
else:
d = [cell for cell in cells]
datarow = []
for c in d:
if c.ctype == 0:
# empty
datarow.append(None)
elif c.ctype == 1:
datarow.append(c.value)
elif c.ctype == 2:
# float, but maybe should be int
if c.value - int(c.value) == 0:
datarow.append(int(c.value))
else:
datarow.append(c.value)
elif c.ctype == 3:
# date
dt = xlrd.xldate_as_tuple(c.value, self.datemode)
# Convert to time or datetime
if not any(dt[:3]):
# No date values
datarow.append(datetime.time(*dt[3:]))
else:
datarow.append(datetime.datetime(*dt))
elif c.ctype == 4:
# Boolean
datarow.append(bool(c.value))
elif c.ctype == 5:
# Error code
datarow.append(xlrd.error_text_from_code(c.value))
elif c.ctype == 6:
# blank
datarow.append(None)
else:
datarow.append(c.value)
return datarow
hdr_row = row_data(junk_header_rows)
ncols = 0
for c in range(len(hdr_row)):
if not hdr_row[c]:
break
ncols += 1
sheet_data = []
for r in range(junk_header_rows, sheet.nrows - junk_header_rows):
datarow = row_data(r, ncols)
if datarow.count(None) == len(datarow):
break
sheet_data.append(datarow)
return sheet_data
class XlsxFile(object):
def __repr__(self):
return u"XlsxFile()"
def __init__(self):
self.filename = None
self.encoding = None
self.wbk = None
self.read_only = False
def open(self, filename, encoding=None, read_only=False):
self.filename = filename
self.encoding = encoding
self.read_only = read_only
if os.path.isfile(filename):
if read_only:
self.wbk = openpyxl.load_workbook(filename, read_only=True)
else:
self.wbk = openpyxl.load_workbook(filename)
def close(self):
if self.wbk is not None:
self.wbk.close()
self.wbk = None
self.filename = None
self.encoding = None
def sheetnames(self):
return self.wbk.sheetnames
def sheet_named(self, sheetname):
# Return the sheet with the matching name. If the name is actually an integer,
# return that sheet number.
if isinstance(sheetname, int):
sheet_no = sheetname
else:
try:
sheet_no = int(sheetname)
if sheet_no < 1:
sheet_no = None
except:
sheet_no = None
if sheet_no is not None:
# User-specified sheet numbers should be 1-based
sheet = self.wbk[self.wbk.sheetnames[sheet_no - 1]]
else:
sheet = self.wbk[sheetname]
return sheet
def sheet_data(self, sheetname, junk_header_rows=0):
sheet = self.sheet_named(sheetname)
# Don't rely on sheet.max_column and sheet.max_row, because Excel will count columns
# and rows that have ever been filled, even if they are now empty. Base the column count
# on the number of contiguous non-empty cells in the first row, and process the data up to nrows until
# a row is entirely empty.
# Get the header row, skipping junk rows
rowsrc = sheet.iter_rows(max_row = junk_header_rows + 1, values_only = True)
for hdr_row in rowsrc:
pass
# Get the number of columns
ncols = 0
for c in range(len(hdr_row)):
if not hdr_row[c]:
break
ncols += 1
# Get all the data rows
sheet_data = []
rowsrc = sheet.iter_rows(min_row = junk_header_rows + 1, values_only = True)
for r in rowsrc:
if not any(r):
break
sheet_data.append(list(r))
for r in range(len(sheet_data)):
rd = sheet_data[r]
for c in range(len(rd)):
if isinstance(rd[c], str):
if rd[c] == '=FALSE()':
rd[c] = False
elif rd[c] == '=TRUE()':
rd[c] = True
return sheet_data
def xls_data(filename, sheetname, junk_header_rows, encoding=None):
# Returns the data from the specified worksheet as a list of headers and a list of lists of rows.
root, ext = os.path.splitext(filename)
ext = ext.lower()
if ext == ".xls":
wbk = XlsFile()
else:
wbk = XlsxFile()
try:
wbk.open(filename, encoding, read_only=True)
except:
warning("%s is not a valid Excel spreadsheet." % filename, kwargs={})
raise
alldata = wbk.sheet_data(sheetname, junk_header_rows)
if len(alldata) == 0:
raise ErrInfo(type="cmd", other_msg="There are no data on worksheet %s of file %s." % (sheetname, filename))
if ext == 'xlsx':
wbk.close()
if len(alldata) == 1:
return alldata[0], []
colhdrs = alldata[0]
# Delete columns with missing headers
if any([x is None or (isinstance(x, str) and len(x.strip())==0) for x in colhdrs]):
blanks = [i for i in range(len(colhdrs)) if colhdrs[i] is None or len(colhdrs[i].strip())==0]
while len(blanks) > 0:
b = blanks.pop()
for r in range(len(alldata)):
del(alldata[r][b])
colhdrs = alldata[0]
#if conf.clean_col_hdrs:
# colhdrs = clean_words(colhdrs)
#if conf.trim_col_hdrs != 'none':
# colhdrs = trim_words(colhdrs, conf.trim_col_hdrs)
#if conf.fold_col_hdrs != 'no':
# colhdrs = fold_words(colhdrs, conf.fold_col_hdrs)
#if conf.dedup_col_hdrs:
# colhdrs = dedup_words(colhdrs)
return colhdrs, alldata[1:]
def file_data(filename, junk_headers=0):
# Get headers and rows from the specified CSV file
csvreader = CsvFile(filename, junk_header_lines=junk_headers)
headers = csvreader.next()
rows = []
for line in csvreader:
rows.append(line)
return headers, rows
#***************************************************************************************************
#*************************** SQL Scripting Extensions ********************************************
#***************************************************************************************************
# Support for SQL scripts used to obtain a data table from a database.
# These are a subset of features in execsql.py.
#===============================================================================================
#----- GLOBAL VARIABLES FOR SQL INTERPRETYER
# Other variables are defined in the context of further class and function definitions.
# A list of errors found while processing the SQL script. Each item in this list is
# a two-element list consiting of a) a description of the error, and b) the line number of the error.
script_errors = []
# The last command run. This should be a ScriptCmd object.
last_command = None
# A compiled regex to match prefixed regular expressions, used to check
# for unsubstituted variables. This is global rather than local to SqlStmt and
# MetacommandStmt objects because Python 2 can't deepcopy a compiled regex.
varlike = re.compile(r'!![$@&~#]?\w+!!', re.I)
# A ScriptExecSpec object for a script to be executed when the program halts due to an error.
# This is intially None, but may be set and re-set by metacommands.
err_halt_exec = None
# A ScriptExecSpec object for a script to be executed when the program halts due
# user cancellation.
# This is intially None, but may be set and re-set by metacommands.
cancel_halt_exec = None
# A stack of the CommandList objects currently in the queue to be executed.
# The list on the top of the stack is the currently executing script.
commandliststack = []
# A dictionary of CommandList objects (ordinarily created by
# BEGIN/END SCRIPT metacommands) that may be inserted into the
# commandliststack.
savedscripts = {}
# A stack of CommandList objects that are used when compiling the
# statements within a loop (between LOOP and END LOOP metacommands).
loopcommandstack = []
# A global flag to indicate that commands should be compiled into
# the topmost entry in the loopcommandstack rather than executed.
compiling_loop = False
# Compiled regex for END LOOP metacommand, which is immediate.
endloop_rx = re.compile(r'^\s*END\s+LOOP\s*$', re.I)
# Compiled regex for *start of* LOOP metacommand, for testing
# while compiling commands within a loop.
loop_rx = re.compile(r'\s*LOOP\s+', re.I)
# Nesting counter, to ensure loops are only ended when nesting
# level is zero.
loop_nest_level = 0
# A count of all of the commands run.
cmds_run = 0
# Pattern for deferred substitution, e.g.: "!{somevar}!"
defer_rx = re.compile(r'(!{([$@&~#]?[a-z0-9_]+)}!)', re.I)
# End of global variables (1) for execsql interpreter
#===============================================================================================
#===============================================================================================
#----- CONFIGURATION
class ConfigData(object):
def __init__(self):
self.db_encoding = None
self.script_encoding = 'utf8'
self.output_encoding = 'utf8'
self.import_encoding = 'utf8'
self.empty_rows = True
self.del_empty_cols = False
self.create_col_hdrs = False
self.trim_col_hdrs = 'none'
self.clean_col_hdrs = False
self.fold_col_hdrs = 'no'
self.dedup_col_hdrs = False
self.trim_strings = False
self.replace_newlines = False
self.export_row_buffer = 1000
# End of configuration for execsql interpreter
#===============================================================================================
#===============================================================================================
#----- SUPPORT FUNCTIONS AND CLASSES (1)
def ins_rxs(rx_list, fragment1, fragment2):
# Returns a tuple of all strings consisting of elements of the 'rx_list' tuple
# inserted between 'fragment1' and 'fragment2'. The fragments may themselves
# be tuples.
if type(fragment1) != tuple:
fragment1 = (fragment1, )
if fragment2 is None:
fragment2 = ('', )
if type(fragment2) != tuple:
fragment2 = (fragment2, )
rv = []
for te in rx_list:
for f1 in fragment1:
for f2 in fragment2:
rv.append(f1 + te + f2)
return tuple(rv)
def ins_quoted_rx(fragment1, fragment2, rx):
return ins_rxs((rx, r'"%s"' % rx), fragment1, fragment2)
def ins_schema_rxs(fragment1, fragment2, suffix=None):
schema_exprs = (r'"(?P[A-Za-z0-9_\- ]+)"',
r'(?P[A-Za-z0-9_\-]+)',
r'\[(?P[A-Za-z0-9_\- ]+)\]'
)
if suffix:
schema_exprs = tuple([s.replace("schema", "schema"+suffix) for s in schema_exprs])
return ins_rxs(schema_exprs, fragment1, fragment2)
def ins_table_rxs(fragment1, fragment2, suffix=None):
tbl_exprs = (r'(?:"(?P[A-Za-z0-9_\- ]+)"\.)?"(?P[A-Za-z0-9_\-\# ]+)"',
r'(?:(?P[A-Za-z0-9_\-]+)\.)?(?P[A-Za-z0-9_\-\#]+)',
r'(?:"(?P[A-Za-z0-9_\- ]+)"\.)?(?P[A-Za-z0-9_\-\#]+)',
r'(?:(?P[A-Za-z0-9_\-]+)\.)?"(?P[A-Za-z0-9_\-\# ]+)"',
r'(?:\[(?P[A-Za-z0-9_\- ]+)\]\.)?\[(?P[A-Za-z0-9_\-\# ]+)\]',
r'(?:(?P[A-Za-z0-9_\-]+)\.)?(?P[A-Za-z0-9_\-\#]+)',
r'(?:\[(?P[A-Za-z0-9_\- ]+)\]\.)?(?P[A-Za-z0-9_\-\#]+)',
r'(?:(?P[A-Za-z0-9_\-]+)\.)?\[(?P[A-Za-z0-9_\-\# ]+)\]'
)
if suffix:
tbl_exprs = tuple([s.replace("schema", "schema"+suffix).replace("table", "table"+suffix) for s in tbl_exprs])
return ins_rxs(tbl_exprs, fragment1, fragment2)
def ins_table_list_rxs(fragment1, fragment2):
tbl_exprs = (r'(?:(?P(?:"[A-Za-z0-9_\- ]+"\.)?"[A-Za-z0-9_\-\# ]+"(?:\s*,\s*(?:"[A-Za-z0-9_\- ]+"\.)?"[A-Za-z0-9_\-\# ]+")*))',
r'(?:(?P(?:[A-Za-z0-9_\-]+\.)?[A-Za-z0-9_\-\#]+(?:\s*,\s*(?:[A-Za-z0-9_\-]+\.)?[A-Za-z0-9_\-\#]+)*))'
)
return ins_rxs(tbl_exprs, fragment1, fragment2)
def ins_fn_rxs(fragment1, fragment2, symbolicname="filename"):
if os.name == 'posix':
fns = (r'(?P<%s>[\w\.\-\\\/\'~`!@#$^&()+={}\[\]:;,]*[\w\.\-\\\/\'~`!@#$^&(+={}\[\]:;,])' % symbolicname, r'"(?P<%s>[\w\s\.\-\\\/\'~`!@#$^&()+={}\[\]:;,]+)"' % symbolicname)
else:
fns = (r'(?P<%s>([A-Z]\:)?[\w+\,()!@#$^&\+=;\'{}\[\]~`\.\-\\\/]*[\w+\,(!@#$^&\+=;\'{}\[\]~`\.\-\\\/])' % symbolicname, r'"(?P<%s>([A-Z]\:)?[\w+\,()!@#$^&\+=;\'{}\[\]~`\s\.\-\\\/]+)"' % symbolicname)
return ins_rxs(fns, fragment1, fragment2)
dt_fmts = collections.deque((
"%x %X",
"%m/%d/%Y %H:%M",
"%m/%d/%Y %H%M",
"%m/%d/%Y %H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H%M",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %I:%M%p",
"%Y-%m-%d %I:%M %p",
"%Y-%m-%d %I:%M:%S%p",
"%Y-%m-%d %I:%M:%S %p",
"%m/%d/%Y %I:%M%p",
"%m/%d/%Y %I:%M %p",
"%m/%d/%Y %I:%M:%S%p",
"%m/%d/%Y %I:%M:%S %p",
"%Y/%m/%d %H%M",
"%Y/%m/%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y/%m/%d %I:%M%p",
"%Y/%m/%d %I:%M %p",
"%Y/%m/%d %I:%M:%S%p",
"%Y/%m/%d %I:%M:%S %p",
"%Y/%m/%d %X",
"%c",
"%b %d, %Y %X",
"%b %d, %Y %I:%M %p",
"%b %d %Y %X",
"%b %d %Y %I:%M %p",
"%d %b, %Y %X",
"%d %b, %Y %I:%M %p",
"%d %b %Y %X",
"%d %b %Y %I:%M %p",
"%b. %d, %Y %X",
"%b. %d, %Y %I:%M %p",
"%b. %d %Y %X",
"%b. %d %Y %I:%M %p",
"%d %b., %Y %X",
"%d %b., %Y %I:%M %p",
"%d %b. %Y %X",
"%d %b. %Y %I:%M %p",
"%B %d, %Y %X",
"%B %d, %Y %I:%M %p",
"%B %d %Y %X",
"%B %d %Y %I:%M %p",
"%d %B, %Y %X",
"%d %B, %Y %I:%M %p",
"%d %B %Y %X",
"%d %B %Y %I:%M %p"
))
def parse_datetime(datestr):
if type(datestr) == datetime.datetime:
return datestr
if not isinstance(datestr, str):
try:
datestr = str(datestr)
except:
return None
dt = None
for i, f in enumerate(dt_fmts):
try:
dt = datetime.datetime.strptime(datestr, f)
except:
continue
break
if i:
del dt_fmts[i]
dt_fmts.appendleft(f)
return dt
dtzrx = re.compile(u"(.+)\s*([+-])(\d{1,2}):?(\d{2})$")
timestamptz_fmts = collections.deque((
"%Y-%m-%d %H%M%Z", "%Y-%m-%d %H%M %Z",
"%m/%d/%Y%Z", "%m/%d/%Y %Z",
"%m/%d/%y%Z", "%m/%d/%y %Z",
"%m/%d/%Y %H%M%Z", "%m/%d/%Y %H%M %Z",
"%m/%d/%Y %H:%M%Z", "%m/%d/%Y %H:%M %Z",
"%Y-%m-%dT%H%M%Z", "%Y-%m-%dT%H%M %Z",
"%Y-%m-%d %H:%M%Z", "%Y-%m-%d %H:%M %Z",
"%Y-%m-%dT%H:%M%Z", "%Y-%m-%dT%H:%M %Z",
"%Y-%m-%d %H:%M:%S%Z", "%Y-%m-%d %H:%M:%S %Z",
"%Y-%m-%dT%H:%M:%S%Z", "%Y-%m-%dT%H:%M:%S %Z",
"%Y-%m-%d %I:%M%p%Z", "%Y-%m-%d %I:%M%p %Z",
"%Y-%m-%dT%I:%M%p%Z", "%Y-%m-%dT%I:%M%p %Z",
"%Y-%m-%d %I:%M %p%Z", "%Y-%m-%d %I:%M %p %Z",
"%Y-%m-%dT%I:%M %p%Z", "%Y-%m-%dT%I:%M %p %Z",
"%Y-%m-%d %I:%M:%S%p%Z", "%Y-%m-%d %I:%M:%S%p %Z",
"%Y-%m-%dT%I:%M:%S%p%Z", "%Y-%m-%dT%I:%M:%S%p %Z",
"%Y-%m-%d %I:%M:%S %p%Z", "%Y-%m-%d %I:%M:%S %p %Z",
"%Y-%m-%dT%I:%M:%S %p%Z", "%Y-%m-%dT%I:%M:%S %p %Z",
"%c%Z", "%c %Z",
"%x %X%Z", "%x %X %Z",
"%m/%d/%Y %H:%M:%S%Z", "%m/%d/%Y %H:%M:%S %Z",
"%m/%d/%Y %I:%M%p%Z", "%m/%d/%Y %I:%M%p %Z",
"%m/%d/%Y %I:%M %p%Z", "%m/%d/%Y %I:%M %p %Z",
"%m/%d/%Y %I:%M:%S%p%Z", "%m/%d/%Y %I:%M:%S%p %Z",
"%m/%d/%Y %I:%M:%S %p%Z", "%m/%d/%Y %I:%M:%S %p %Z",
"%Y/%m/%d %H%M%Z", "%Y/%m/%d %H%M %Z",
"%Y/%m/%d %H:%M%Z", "%Y/%m/%d %H:%M %Z",
"%Y/%m/%d %H:%M:%S%Z", "%Y/%m/%d %H:%M:%S %Z",
"%Y/%m/%d %I:%M%p%Z", "%Y/%m/%d %I:%M%p %Z",
"%Y/%m/%d %I:%M %p%Z", "%Y/%m/%d %I:%M %p %Z",
"%Y/%m/%d %I:%M:%S%p%Z", "%Y/%m/%d %I:%M:%S%p %Z",
"%Y/%m/%d %I:%M:%S %p%Z", "%Y/%m/%d %I:%M:%S %p %Z",
"%Y/%m/%d %X%Z", "%Y/%m/%d %X %Z",
"%b %d, %Y %X%Z", "%b %d, %Y %X %Z",
"%b %d, %Y %I:%M %p%Z", "%b %d, %Y %I:%M %p %Z",
"%b %d %Y %X%Z", "%b %d %Y %X %Z",
"%b %d %Y %I:%M %p%Z", "%b %d %Y %I:%M %p %Z",
"%d %b, %Y %X%Z", "%d %b, %Y %X %Z",
"%d %b, %Y %I:%M %p%Z", "%d %b, %Y %I:%M %p %Z",
"%d %b %Y %X%Z", "%d %b %Y %X %Z",
"%d %b %Y %I:%M %p%Z", "%d %b %Y %I:%M %p %Z",
"%b. %d, %Y %X%Z", "%b. %d, %Y %X %Z",
"%b. %d, %Y %I:%M %%Z", "%b. %d, %Y %I:%M %p %Z",
"%b. %d %Y %X%Z", "%b. %d %Y %X %Z",
"%b. %d %Y %I:%M %p%Z", "%b. %d %Y %I:%M %p %Z",
"%d %b., %Y %X%Z", "%d %b., %Y %X %Z",
"%d %b., %Y %I:%M %p%Z", "%d %b., %Y %I:%M %p %Z",
"%d %b. %Y %X%Z", "%d %b. %Y %X %Z",
"%d %b. %Y %I:%M %p%Z", "%d %b. %Y %I:%M %p %Z",
"%B %d, %Y %X%Z", "%B %d, %Y %X %Z",
"%B %d, %Y %I:%M %p%Z", "%B %d, %Y %I:%M %p %Z",
"%B %d %Y %X%Z", "%B %d %Y %X %Z",
"%B %d %Y %I:%M %p%Z", "%B %d %Y %I:%M %p %Z",
"%d %B, %Y %X%Z", "%d %B, %Y %X %Z",
"%d %B, %Y %I:%M %p%Z", "%d %B, %Y %I:%M %p %Z",
"%d %B %Y %X%Z", "%d %B %Y %X %Z",
"%d %B %Y %I:%M %p%Z", "%d %B %Y %I:%M %p %Z"
))
def parse_datetimetz(data):
if type(data) == type(datetime.datetime.now()):
if data.tzinfo is None or data.tzinfo.utcoffset(data) is None:
return None
return data
if not isinstance(data, str):
return None
dt = None
# Check for numeric timezone
try:
datestr, sign, hr, min = dtzrx.match(data).groups()
dt = parse_datetime(datestr)
if not dt:
return None
sign = -1 if sign=='-' else 1
return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, tzinfo=Tz(sign, int(hr), int(min)))
except:
# Check for alphabetic timezone
for i,f in enumerate(timestamptz_fmts):
try:
dt = datetime.datetime.strptime(data, f)
except:
continue
break
if i:
del timestamptz_fmts[i]
timestamptz_fmts.appendleft(f)
return dt
date_fmts = collections.deque(("%x",
"%Y-%m-%d",
"%Y/%m/%d",
"%m/%d/%Y",
"%d/%m/%Y",
"%b %d, %Y",
"%b %d %Y",
"%d %b, %Y",
"%d %b %Y",
"%b. %d, %Y",
"%b. %d %Y",
"%d %b., %Y",
"%d %b. %Y",
"%B %d, %Y",
"%B %d %Y",
"%d %B, %Y",
"%d %B %Y"
))
def parse_date(data):
if data is None:
return None
if isinstance(data, datetime.date):
return data
if not isinstance(data, str):
return None
for i,f in enumerate(date_fmts):
try:
dt = datetime.datetime.strptime(data, f)
dtt = datetime.date(dt.year, dt.month, dt.day)
except:
continue
break
else:
return None
if i:
del date_fmts[i]
date_fmts.appendleft(f)
return dtt
def parse_boolean(data):
if data is None:
return None
true_strings = ('yes', 'true', '1')
bool_strings = ('yes', 'no', 'true', 'false', '1', '0')
if type(data) == bool:
return data
elif isinstance(data, int) and data in (0, 1):
return data == 1
elif isinstance(data, str) and data.lower() in bool_strings:
return data.lower() in true_strings
else:
return None
# End of support functions (1)
#===============================================================================================
#===============================================================================================
#----- STATUS RECORDING
class StatObj(object):
# A generic object to maintain status indicators. These status
# indicators are primarily those used in the metacommand
# environment rather than for the program as a whole.
def __init__(self):
self.halt_on_err = True
self.sql_error = False
self.halt_on_metacommand_err = True
self.metacommand_error = False
self.cancel_halt = True
self.batch = BatchLevels()
# End of status recording class.
#===============================================================================================
#===============================================================================================
#----- ERROR HANDLING
class ErrInfo(Exception):
def __repr__(self):
return u"ErrInfo(%r, %r, %r, %r)" % (self.type, self.command, self.exception, self.other)
def __init__(self, type, command_text=None, exception_msg=None, other_msg=None):
# Argument 'type' should be "db", "cmd", "log", "error", or "exception".
# Arguments for each type are as follows:
# "db" : command_text, exception_msg
# "cmd" : command_text,
# "log" : other_msg [, exception_msg]
# "error" : other_msg [, exception_msg]
# "systemexit" : other_msg
# "exception" : exception_msg [, other_msg]
self.type = type
self.command = command_text
self.exception = None if not exception_msg else exception_msg.replace(u'\n', u'\n ')
self.other = None if not other_msg else other_msg.replace(u'\n', u'\n ')
if last_command is not None:
self.script_line_no = current_script_line()
self.cmd = last_command.command.statement
self.cmdtype = last_command.command_type
else:
self.script_file = None
self.script_line_no = None
self.cmd = None
self.cmdtype = None
self.error_message = None
subvars.add_substitution("$ERROR_MESSAGE", self.errmsg())
def script_info(self):
if self.script_line_no:
return u"Line %d of script" % self.script_line_no
else:
return None
def cmd_info(self):
if self.cmdtype:
if self.cmdtype == "cmd":
em = u"Metacommand: %s" % self.cmd
else:
em = u"SQL statement: \n %s" % self.cmd.replace(u'\n', u'\n ')
return em
else:
return None
def eval_err(self):
if self.type == 'db':
self.error_message = u"**** Error in SQL statement."
elif self.type == 'cmd':
self.error_message = u"**** Error in metacommand."
elif self.type == 'log':
self.error_message = u"**** Error in logging."
elif self.type == 'error':
self.error_message = u"**** General error."
elif self.type == 'systemexit':
self.error_message = u"**** Exit."
elif self.type == 'exception':
self.error_message = u"**** Exception."
else:
self.error_message = u"**** Error of unknown type: %s" % self.type
sinfo = self.script_info()
cinfo = self.cmd_info()
if sinfo:
self.error_message += u"\n %s" % sinfo
if self.exception:
self.error_message += u"\n %s" % self.exception
if self.other:
self.error_message += u"\n %s" % self.other
if self.command:
self.error_message += u"\n %s" % self.command
if cinfo:
self.error_message += u"\n %s" % cinfo
self.error_message += u"\n Error occurred at %s UTC." % time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
return self.error_message
def write(self):
errmsg = self.eval_err()
output.write_err(errmsg)
return errmsg
def errmsg(self):
return self.eval_err()
def exception_info():
# Returns the exception type, value, source file name, source line number, and source line text.
strace = traceback.extract_tb(sys.exc_info()[2])[-1:]
traces = traceback.extract_tb(sys.exc_info()[2])
xline = 0
for trace in traces:
if u"mapdata" in trace[0]:
xline = trace[1]
exc_message = u''
exc_param = sys.exc_info()[1]
if isinstance(exc_param, str):
exc_message = exc_param
else:
if hasattr(exc_param, 'message') and isinstance(exc_param.message, str) and len(exc_param.message) > 0:
exc_message = exc_param.message
elif hasattr(exc_param, 'value') and isinstance(exc_param.value, str) and len(exc_param.value) > 0:
exc_message = exc_param.value
else:
exc_message = type(u"")(exc_param)
try:
exc_message = type(u"")(exc_message)
except:
exc_message = repr(exc_message)
xinfo = sys.exc_info()[0]
xname = getattr(xinfo, "__name__", "")
return xname, exc_message, strace[0][0], xline, strace[0][3]
def exception_desc():
exc_type, exc_strval, exc_filename, exc_lineno, exc_linetext = exception_info()
return u"%s: %s in %s on line %s of program." % (exc_type, exc_strval, exc_filename, exc_lineno)
#----- End of ERROR HANDLING
#===============================================================================================
#===============================================================================================
#----- DATABASE TYPES
class DbTypeError(Exception):
def __init__(self, dbms_id, data_type, error_msg):
self.dbms_id = dbms_id
self.data_type = data_type
self.error_msg = error_msg or "Unspecified error"
def __repr__(self):
return u"DbTypeError(%r, %r)" % (self.dbms_id, self.data_type, self.error_msg)
def __str__(self):
if self.data_type:
return "%s DBMS type error with data type %s: %s" % (self.dbms_id, self.data_type.data_type_name, self.error_msg)
else:
return "%s DBMS type error: %s" % (self.dbms_id, self.error_msg)
class DbType(object):
def __init__(self, DBMS_id, db_obj_quotes=u'""'):
# The DBMS_id is the name by which this DBMS is identified.
# db_obj_quotechars is a string of two characters that are the opening and closing quotes
# for identifiers (schema, table, and column names) that need to be quoted.
self.dbms_id = DBMS_id
self.quotechars = db_obj_quotes
# The dialect is a dictionary of DBMS-specific names for each column type.
# Dialect keys are DataType classes.
# Dialect objects are 4-tuples consisting of:
# 0. a data type name (str)--non-null
# 1. a Boolean indicating whether or not the length is part of the data type definition
# (e.g., for varchar)--non-null
# 2. a name to use with the 'cast' operator as an alternative to the data type name--nullable.
# 3. a function to perform a dbms-specific modification of the type conversion result produced
# by the 'from_data()' method of the data type.
# 4. the precision for numeric data types.
# 5. the scale for numeric data types.
self.dialect = None
# The dt_xlate dictionary translates one data type to another.
# This is specifically needed for Access pre v. 4.0, which has no numeric type, and which
# therefore requires the numeric data type to be treated as a float data type.
self.dt_xlate = {}
def __repr__(self):
return u"DbType(%r, %r)" % (self.dbms_id, self.quotechars)
def name_datatype(self, data_type, dbms_name, length_required=False, casting_name=None, conv_mod_fn=None, precision=None, scale=None):
# data_type is a DataType class object.
# dbms_name is the DBMS-specific name for this data type.
# length_required indicates whether length information is required.
# casting_name is an alternate to the data type name to use in SQL "cast(x as )" expressions.
# conv_mod_fn is a function that modifies the result of data_type().from_data(x).
if self.dialect is None:
self.dialect = {}
self.dialect[data_type] = (dbms_name, length_required, casting_name, conv_mod_fn, precision, scale)
def datatype_name(self, data_type):
# A convenience function to simplify access to data type namess.
#if not isinstance(data_type, DataType):
# raise DbTypeError(self.dbms_id, None, "Unrecognized data type: %s" % data_type)
try:
return self.dialect[data_type][0]
except:
raise DbTypeError(self.dbms_id, data_type, "%s DBMS type has no specification for data type %s" % (self.dbms_id, data_type.data_type_name))
def quoted(self, dbms_object):
if re.search(r'\W', dbms_object):
if self.quotechars[0] == self.quotechars[1] and self.quotechars[0] in dbms_object:
dbms_object = dbms_object.replace(self.quotechars[0], self.quotechars[0]+self.quotechars[0])
return self.quotechars[0] + dbms_object + self.quotechars[1]
return dbms_object
def spec_type(self, data_type):
# Returns a translated data type or the original if there is no translation.
if data_type in self.dt_xlate:
return self.dt_xlate[data_type]
return data_type
def column_spec(self, column_name, data_type, max_len=None, is_nullable=False, precision=None, scale=None):
# Returns a column specification as it would be used in a CREATE TABLE statement.
# The arguments conform to those returned by Column().column_type
#if not isinstance(data_type, DataType):
# raise DbTypeError(self.dbms_id, None, "Unrecognized data type: %s" % data_type)
data_type = self.spec_type(data_type)
try:
dts = self.dialect[data_type]
except:
raise DbTypeError(self.dbms_id, data_type, "%s DBMS type has no specification for data type %s" % (self.dbms_id, data_type.data_type_name))
if max_len and max_len > 0 and dts[1]:
spec = "%s %s(%d)" % (self.quoted(column_name), dts[0], max_len)
elif data_type.precspec and precision and scale:
# numeric
spec = "%s %s(%s,%s)" % (self.quoted(column_name), dts[0], precision, scale)
else:
spec = "%s %s" % (self.quoted(column_name), dts[0])
if not is_nullable:
spec += " NOT NULL"
return spec
# Create a DbType object for each DBMS supported by execsql.
dbt_postgres = DbType("PostgreSQL")
dbt_sqlite = DbType("SQLite")
dbt_duckdb = DbType("DuckDB")
dbt_sqlserver = DbType("SQL Server")
dbt_mysql = DbType("MySQL")
dbt_firebird = DbType("Firebird")
dbt_oracle = DbType("Oracle")
#----- End of DATABASE TYPES
#===============================================================================================
#===============================================================================================
#----- DATABASE CONNECTIONS
class DatabaseNotImplementedError(Exception):
def __init__(self, db_name, method):
self.db_name = db_name
self.method = method
def __repr__(self):
return u"DatabaseNotImplementedError(%r, %r)" % (self.db_name, self.method)
def __str__(self):
return "Method %s is not implemented for database %s" % (self.method, self.db_name)
class Database(object):
def __init__(self, server_name, db_name, user_name=None, need_passwd=None, port=None, encoding=None):
self.type = None
self.server_name = server_name
self.db_name = db_name
self.user = user_name
self.need_passwd = need_passwd
self.password = None
self.port = port
self.encoding = encoding
self.encode_commands = True
self.paramstr = '?'
self.conn = None
self.autocommit = True
def __repr__(self):
return u"Database(%r, %r, %r, %r, %r, %r)" % (self.server_name, self.db_name, self.user,
self.need_passwd, self.port, self.encoding)
def name(self):
if self.server_name:
return "%s(server %s; database %s)" % (self.type.dbms_id, self.server_name, self.db_name)
else:
return "%s(file %s)" % (self.type.dbms_id, self.db_name)
def open_db(self):
raise DatabaseNotImplementedError(self.name(), 'open_db')
def cursor(self):
if self.conn is None:
self.open_db()
return self.conn.cursor()
def close(self):
if self.conn:
self.conn.close()
self.conn = None
def paramsubs(self, paramcount):
return ",".join((self.paramstr,) * paramcount)
def execute(self, sql, paramlist=None):
# A shortcut to self.cursor().execute() that handles encoding.
# Whether or not encoding is needed depends on the DBMS.
global subvars
if type(sql) in (tuple, list):
sql = u" ".join(sql)
try:
curs = self.cursor()
if self.encoding and self.encode_commands and sys.version_info < (3,):
curs.execute(sql.encode(self.encoding))
else:
if paramlist is None:
curs.execute(sql)
else:
curs.execute(sql, paramlist)
try:
# DuckDB does not support the 'rowcount' attribute
subvars.add_substitution("$LAST_ROWCOUNT", curs.rowcount)
except:
pass
except Exception as e:
try:
self.rollback()
except:
pass
raise e
def autocommit_on(self):
self.autocommit = True
def autocommit_off(self):
self.autocommit = False
def commit(self):
if self.conn and self.autocommit:
self.conn.commit()
def rollback(self):
if self.conn:
try:
self.conn.rollback()
except:
pass
def schema_qualified_table_name(self, schema_name, table_name):
table_name = self.type.quoted(table_name)
if schema_name:
schema_name = self.type.quoted(schema_name)
return u'%s.%s' % (schema_name, table_name)
return table_name
def select_data(self, sql):
# Returns the results of the sql select statement.
curs = self.cursor()
try:
curs.execute(sql)
except:
self.rollback()
raise
try:
subvars.add_substitution("$LAST_ROWCOUNT", curs.rowcount)
except:
pass
rows = curs.fetchall()
return [d[0] for d in curs.description], rows
def select_rowsource(self, sql):
# Return 1) a list of column names, and 2) an iterable that yields rows.
curs = self.cursor()
try:
# DuckDB cursors have no 'arraysize' attribute
curs.arraysize = conf.export_row_buffer
except:
pass
try:
curs.execute(sql)
except:
self.rollback()
raise
try:
subvars.add_substitution("$LAST_ROWCOUNT", curs.rowcount)
except:
pass
def decode_row():
while True:
rows = curs.fetchmany()
if not rows:
break
else:
for row in rows:
if self.encoding:
if sys.version_info < (3,):
yield [c.decode(self.encoding, "backslashreplace") if type(c) == type("") else c for c in row]
else:
yield [c.decode(self.encoding, "backslashreplace") if type(c) == type(b'') else c for c in row]
else:
yield row
return [d[0] for d in curs.description], decode_row()
def schema_exists(self, schema_name):
curs = self.cursor()
curs.execute(u"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s';" % schema_name)
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def table_exists(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select table_name from information_schema.tables where table_name = '%s'%s;" % (table_name, "" if not schema_name else " and table_schema='%s'" % schema_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of table %s in %s" % (table_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def column_exists(self, table_name, column_name, schema_name=None):
curs = self.cursor()
sql = "select column_name from information_schema.columns where table_name='%s'%s and column_name='%s';" % (table_name, "" if not schema_name else " and table_schema='%s'" % schema_name, column_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of column %s in table %s of %s" % (column_name, table_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def table_columns(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select column_name from information_schema.columns where table_name='%s'%s order by ordinal_position;" % (table_name, "" if not schema_name else " and table_schema='%s'" % schema_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed to get column names for table %s of %s" % (table_name, self.name()))
rows = curs.fetchall()
curs.close()
return [row[0] for row in rows]
def view_exists(self, view_name, schema_name=None):
curs = self.cursor()
sql = "select table_name from information_schema.views where table_name = '%s'%s;" % (view_name, "" if not schema_name else " and table_schema='%s'" % schema_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of view %s in %s" % (view_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def role_exists(self, rolename):
raise DatabaseNotImplementedError(self.name(), 'role_exists')
def drop_table(self, tablename):
# The 'tablename' argument should be schema-qualified and quoted as necessary.
self.execute(u"drop table if exists %s cascade;" % tablename)
self.commit()
class SqlServerDatabase(Database):
def __init__(self, server_name, db_name, user_name, need_passwd=False, port=1433, encoding='latin1', password=None):
global pyodbc
try:
import pyodbc
except:
fatal_error(u"The pyodbc module is required. See http://github.com/mkleehammer/pyodbc", kwargs={})
self.type = dbt_sqlserver
self.server_name = server_name
self.db_name = db_name
self.user = user_name
self.need_passwd = need_passwd
self.password = password
self.port = port if port else 1433
self.encoding = encoding or 'latin1' # Default on installation of SQL Server
self.encode_commands = True
self.paramstr = '?'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"SqlServerDatabase(%r, %r, %r, %r, %r, %r)" % (self.server_name, self.db_name, self.user,
self.need_passwd, self.port, self.encoding)
def open_db(self):
if self.conn is None:
if self.user and self.need_passwd and not self.password:
raise ErrInfo("error", other_msg="Password required but not provided")
# Use pyodbc to connect. Try different driver versions from newest to oldest.
ssdrivers = ('ODBC Driver 17 for SQL Server', 'ODBC Driver 13.1 for SQL Server',
'ODBC Driver 13 for SQL Server', 'ODBC Driver 11 for SQL Server',
'SQL Server Native Client 11.0', 'SQL Server Native Client 10.0',
'SQL Native Client', 'SQL Server')
for drv in ssdrivers:
if self.user:
if self.password:
connstr = "DRIVER={%s};SERVER=%s;MARS_Connection=Yes; DATABASE=%s;Uid=%s;Pwd=%s" % (drv, self.server_name, self.db_name, self.user, self.password)
else:
connstr = "DRIVER={%s};SERVER=%s;MARS_Connection=Yes; DATABASE=%s;Uid=%s" % (drv, self.server_name, self.db_name, self.user)
else:
connstr = "DRIVER={%s};SERVER=%s;MARS_Connection=Yes; DATABASE=%s;Trusted_Connection=yes" % (drv, self.server_name, self.db_name)
try:
self.conn = pyodbc.connect(connstr)
except:
pass
else:
break
if not self.conn:
raise ErrInfo(type="error", other_msg=u"Can't open SQL Server database %s on %s" % (self.db_name, self.server_name))
curs = self.conn.cursor()
curs.execute("SET IMPLICIT_TRANSACTIONS OFF;")
curs.execute("SET ANSI_NULLS ON;")
curs.execute("SET ANSI_PADDING ON;")
curs.execute("SET ANSI_WARNINGS ON;")
curs.execute("SET QUOTED_IDENTIFIER ON;")
self.conn.commit()
def schema_exists(self, schema_name):
curs = self.cursor()
curs.execute(u"select * from sys.schemas where name = '%s';" % schema_name)
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def role_exists(self, rolename):
curs = self.cursor()
curs.execute(u"select name from sys.database_principals where type in ('R', 'S') and name = '%s';" % rolename)
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def drop_table(self, tablename):
# SQL Server and Firebird will throw an error if there are foreign keys to the table.
tablename = self.type.quoted(tablename)
self.execute(u"drop table %s;" % tablename)
class PostgresDatabase(Database):
def __init__(self, server_name, db_name, user_name, need_passwd=False, port=5432, new_db=False, encoding='UTF8', password=None):
global psycopg2
try:
import psycopg2
except:
fatal_error(u"The psycopg2 module is required to connect to PostgreSQL.", kwargs={})
self.type = dbt_postgres
self.server_name = server_name
self.db_name = db_name
self.user = user_name
self.need_passwd = need_passwd
self.password = password
self.port = port if port else 5432
self.new_db = new_db
self.encoding = encoding or 'UTF8'
self.encode_commands = False
self.paramstr = '%s'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"PostgresDatabase(%r, %r, %r, %r, %r, %r, %r)" % (self.server_name, self.db_name, self.user,
self.need_passwd, self.port, self.new_db, self.encoding)
def open_db(self):
def db_conn(db, db_name):
if db.user and db.password:
return psycopg2.connect(host=str(db.server_name), database=str(db_name), port=db.port, user=db.user, password=db.password)
else:
return psycopg2.connect(host=str(db.server_name), database=db_name, port=db.port)
def create_db(db):
conn = db_conn(db, 'postgres')
conn.autocommit = True
curs = conn.cursor()
curs.execute("create database %s encoding '%s';" % (db.db_name, db.encoding))
conn.close()
if self.conn is None:
try:
if self.user and self.need_passwd and not self.password:
raise ErrInfo("error", "Password required but not provided")
if self.new_db:
create_db(self)
self.conn = db_conn(self, self.db_name)
except ErrInfo:
raise
except:
msg = u"Failed to open PostgreSQL database %s on %s" % (self.db_name, self.server_name)
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=msg)
# (Re)set the encoding to match the database.
self.encoding = self.conn.encoding
def role_exists(self, rolename):
curs = self.cursor()
curs.execute(u"select rolname from pg_roles where rolname = '%s';" % rolename)
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def table_exists(self, table_name, schema_name=None):
curs = self.cursor()
if schema_name is not None:
sql = "select table_name from information_schema.tables where table_name = '%s'%s;" % (table_name, "" if not schema_name else " and table_schema='%s'" % schema_name)
else:
sql = """select table_name from information_schema.tables where table_name = '%s' and
table_schema in (select nspname from pg_namespace where oid = pg_my_temp_schema()
union
select trim(unnest(string_to_array(replace(setting, '"$user"', CURRENT_USER), ',')))
from pg_settings where name = 'search_path');""" % table_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of table %s in %s" % (table_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def view_exists(self, view_name, schema_name=None):
curs = self.cursor()
if schema_name is not None:
sql = "select table_name from information_schema.views where table_name = '%s'%s;" % (view_name, "" if not schema_name else " and table_schema='%s'" % schema_name)
else:
sql = """select table_name from information_schema.views where table_name = '%s' and
table_schema in (select nspname from pg_namespace where oid = pg_my_temp_schema()
union
select trim(unnest(string_to_array(replace(setting, '"$user"', CURRENT_USER), ',')))
from pg_settings where name = 'search_path');""" % view_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of view %s in %s" % (view_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def vacuum(self, argstring):
self.commit()
self.conn.set_session(autocommit=True)
self.conn.cursor().execute("VACUUM %s;" % argstring)
self.conn.set_session(autocommit=False)
class OracleDatabase(Database):
def __init__(self, server_name, db_name, user_name, need_passwd=False, port=5432, encoding='UTF8', password=None):
global cx_Oracle
try:
import cx_Oracle
except:
fatal_error(u"The cx-Oracle module is required to connect to Oracle. See https://pypi.org/project/cx-Oracle/", kwargs={})
self.type = dbt_oracle
self.server_name = server_name
self.db_name = db_name
self.user = user_name
self.need_passwd = need_passwd
self.password = password
self.port = port if port else 1521
self.encoding = encoding or 'UTF8'
self.encode_commands = False
self.paramstr = ':1'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"OracleDatabase(%r, %r, %r, %r, %r, %r)" % (self.server_name, self.db_name, self.user,
self.need_passwd, self.port, self.encoding)
def open_db(self):
def db_conn(db, db_name):
dsn = cx_Oracle.makedsn(db.server_name, db.port, service_name=db_name)
if db.user and db.password:
return cx_Oracle.connect(user=db.user, password=db.password, dsn=dsn)
else:
return cx_Oracle.connect(dsn=dsn)
if self.conn is None:
try:
if self.user and self.need_passwd and not self.password:
raise ErrInfo("error", other_msg="Password required but not provided")
self.conn = db_conn(self, self.db_name)
except ErrInfo:
raise
except:
msg = u"Failed to open Oracle database %s on %s" % (self.db_name, self.server_name)
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=msg)
def execute(self, sql, paramlist=None):
# Strip any semicolon off the end and pass to the parent method.
if sql[-1:] == ";":
super(OracleDatabase, self).execute(sql[:-1], paramlist)
else:
super(OracleDatabase, self).execute(sql, paramlist)
def select_data(self, sql):
if sql[-1:] == ";":
return super(OracleDatabase, self).select_data(sql[:-1])
else:
return super(OracleDatabase, self).select_data(sql)
def select_rowsource(self, sql):
if sql[-1:] == ";":
return super(OracleDatabase, self).select_rowsource(sql[:-1])
else:
return super(OracleDatabase, self).select_rowsource(sql)
def schema_exists(self, schema_name):
raise DatabaseNotImplementedError(self.name(), 'schema_exists')
def table_exists(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select table_name from sys.all_tables where table_name = '%s'%s" % (table_name, "" if not schema_name else " and owner ='%s'" % schema_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of table %s in %s" % (table_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def column_exists(self, table_name, column_name, schema_name=None):
curs = self.cursor()
sql = "select column_name from all_tab_columns where table_name='%s'%s and column_name='%s'" % (table_name, "" if not schema_name else " and owner ='%s'" % schema_name, column_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of column %s in table %s of %s" % (column_name, table_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def table_columns(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select column_name from all_tab_columns where table_name='%s'%s order by column_id" % (table_name, "" if not schema_name else " and owner='%s'" % schema_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed to get column names for table %s of %s" % (table_name, self.name()))
rows = curs.fetchall()
curs.close()
return [row[0] for row in rows]
def view_exists(self, view_name, schema_name=None):
curs = self.cursor()
sql = "select view_name from sys.all_views where view_name = '%s'%s" % (view_name, "" if not schema_name else " and owner ='%s'" % schema_name)
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed test for existence of view %s in %s" % (view_name, self.name()))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def role_exists(self, rolename):
curs = self.cursor()
curs.execute(u"select role from dba_roles where role = '%s' union " \
" select username from all_users where username = '%s';" % (rolename, rolename))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def drop_table(self, tablename):
tablename = self.type.quoted(tablename)
self.execute(u"drop table %s cascade constraints" % tablename)
def paramsubs(self, paramcount):
return ",".join(":"+str(d) for d in range(1, paramcount+1))
class SQLiteDatabase(Database):
def __init__(self, SQLite_fn):
global sqlite3
self.type = dbt_sqlite
self.server_name = None
self.db_name = SQLite_fn
self.user = None
self.need_passwd = False
self.encoding = 'UTF-8'
self.encode_commands = False
self.paramstr = '?'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"SQLiteDabase(%r)" % self.db_name
def open_db(self):
if self.conn is None:
try:
self.conn = sqlite3.connect(self.db_name)
except ErrInfo:
raise
except:
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=u"Can't open SQLite database %s" % self.db_name)
pragma_cols, pragma_data = self.select_data("pragma encoding;")
self.encoding = pragma_data[0][0]
def table_exists(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select name from sqlite_master where type='table' and name='%s';" % table_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(), other_msg=u'Failed test for existence of SQLite table "%s";' % table_name)
rows = curs.fetchall()
return len(rows) > 0
def column_exists(self, table_name, column_name, schema_name=None):
curs = self.cursor()
sql = "select %s from %s limit 1;" % (column_name, table_name)
try:
curs.execute(sql)
except:
return False
return True
def table_columns(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select * from %s where 1=0;" % table_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed to get column names for table %s of %s" % (table_name, self.name()))
return [d[0] for d in curs.description]
def view_exists(self, view_name):
curs = self.cursor()
sql = "select name from sqlite_master where type='view' and name='%s';" % view_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(), other_msg=u'Failed test for existence of SQLite view "%s";' % view_name)
rows = curs.fetchall()
return len(rows) > 0
def schema_exists(self, schema_name):
return False
def drop_table(self, tablename):
tablename = self.type.quoted(tablename)
self.execute(u"drop table if exists %s;" % tablename)
class DuckDBDatabase(Database):
def __init__(self, DuckDB_fn):
global duckdb
try:
import duckdb
except:
fatal_error(u"The duckdb module is required.", kwargs={})
self.type = dbt_duckdb
self.server_name = None
self.db_name = DuckDB_fn
self.catalog_name = os.path.splitext(DuckDB_fn)[0]
self.user = None
self.need_passwd = False
self.encoding = 'UTF-8'
self.encode_commands = False
self.paramstr = '?'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"DuckDBDabase(%r)" % self.db_name
def open_db(self):
if self.conn is None:
try:
self.conn = duckdb.connect(self.db_name, read_only=False)
except ErrInfo:
raise
except:
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=u"Can't open DuckDB database %s" % self.db_name)
def view_exists(self, view_name):
# DuckDB information_schema has no 'views' table; views are listed in 'tables'
return self.table_exists(view_name)
def schema_exists(self, schema_name):
# In DuckDB, the 'schemata' view is not limited to the current database.
curs = self.cursor()
curs.execute(u"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s' and catalog_name = '%s';" % (schema_name, self.catalog_name))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def execute(self, sql, paramlist=None):
if type(sql) in (tuple, list):
sql = u" ".join(sql)
try:
curs = self.cursor()
if paramlist is None:
curs.execute(sql)
else:
curs.execute(sql, paramlist)
# DuckDB does not support the 'rowcount' attribute, so $LAST_ROWCOUNT is not set
except Exception as e:
try:
self.rollback()
except:
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=u"Can't open DuckDB database %s" % self.db_name)
class MySQLDatabase(Database):
def __init__(self, server_name, db_name, user_name, need_passwd=False, port=3306, encoding='latin1', password=None):
global mysql_lib
try:
import pymysql as mysql_lib
except:
fatal_error(u"The pymysql module is required to connect to MySQL. See https://pypi.python.org/pypi/PyMySQL", kwargs={})
self.type = dbt_mysql
self.server_name = str(server_name)
self.db_name = str(db_name)
self.user = str(user_name)
self.need_passwd = need_passwd
self.password = password
self.port = 3306 if not port else port
self.encoding = encoding or 'latin1'
self.encode_commands = True
self.paramstr = '%s'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"MySQLDatabase(%r, %r, %r, %r, %r, %r)" % (self.server_name, self.db_name, self.user,
self.need_passwd, self.port, self.encoding)
def open_db(self):
def db_conn():
if self.user and self.password:
return mysql_lib.connect(host=self.server_name, database=self.db_name, port=self.port, user=self.user, password=self.password, charset=self.encoding, local_infile=True)
else:
return mysql_lib.connect(host=self.server_name, database=self.db_name, port=self.port, charset=self.encoding, local_infile=True)
if self.conn is None:
try:
if self.user and self.need_passwd and not self.password:
raise ErrInfo("error", other_msg="Password required but not provided")
self.conn = db_conn()
self.execute("set session sql_mode='ANSI';")
except ErrInfo:
raise
except:
msg = u"Failed to open MySQL database %s on %s" % (self.db_name, self.server_name)
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=msg)
def schema_exists(self, schema_name):
return False
def role_exists(self, rolename):
curs = self.cursor()
curs.execute(u"select distinct user as role from mysql.user where user = '%s'" \
" union select distinct role_name as role from information_schema.applicable_roles" \
" where role_name = '%s'" % (rolename, rolename))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
class FirebirdDatabase(Database):
def __init__(self, server_name, db_name, user_name, need_passwd=False, port=3050, encoding='latin1', password=None):
global firebird_lib
try:
import fdb as firebird_lib
except:
fatal_error(u"The fdb module is required to connect to MySQL. See https://pypi.python.org/pypi/fdb/", kwargs={})
self.type = dbt_firebird
self.server_name = str(server_name)
self.db_name = str(db_name)
self.user = str(user_name)
self.need_passwd = need_passwd
self.password = password
self.port = 3050 if not port else port
self.encoding = encoding or 'latin1'
self.encode_commands = True
self.paramstr = '?'
self.conn = None
self.autocommit = True
self.open_db()
def __repr__(self):
return u"FirebirdDatabase(%r, %r, %r, %r, %r, %r)" % (self.server_name, self.db_name, self.user,
self.need_passwd, self.port, self.encoding)
def open_db(self):
def db_conn():
if self.user and self.password:
return firebird_lib.connect(host=self.server_name, database=self.db_name, port=self.port, user=self.user, password=self.password, charset=self.encoding)
else:
return firebird_lib.connect(host=self.server_name, database=self.db_name, port=self.port, charset=self.encoding)
if self.conn is None:
try:
if self.user and self.need_passwd and not self.password:
raise ErrInfo("error", other_msg="Password required but not provided")
self.conn = db_conn()
#self.execute('set autoddl off;')
except ErrInfo:
raise
except:
msg = u"Failed to open Firebird database %s on %s" % (self.db_name, self.server_name)
raise ErrInfo(type="exception", exception_msg=exception_desc(), other_msg=msg)
def table_exists(self, table_name, schema_name=None):
curs = self.cursor()
sql = "SELECT RDB$RELATION_NAME FROM RDB$RELATIONS WHERE RDB$SYSTEM_FLAG=0 AND RDB$VIEW_BLR IS NULL AND RDB$RELATION_NAME='%s';" % table_name.upper()
try:
curs.execute(sql)
except ErrInfo:
raise
except:
e = ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(), other_msg=u"Failed test for existence of Firebird table %s" % table_name)
try:
self.rollback()
except:
pass
raise e
rows = curs.fetchall()
self.conn.commit()
curs.close()
return len(rows) > 0
def column_exists(self, table_name, column_name, schema_name=None):
curs = self.cursor()
sql = "select first 1 %s from %s;" % (column_name, table_name)
try:
curs.execute(sql)
except:
return False
return True
def table_columns(self, table_name, schema_name=None):
curs = self.cursor()
sql = "select first 1 * from %s;" % table_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(),
other_msg=u"Failed to get column names for table %s of %s" % (table_name, self.name()))
return [d[0] for d in curs.description]
def view_exists(self, view_name, schema_name=None):
curs = self.cursor()
sql = "select distinct rdb$view_name from rdb$view_relations where rdb$view_name = '%s';" % view_name
try:
curs.execute(sql)
except ErrInfo:
raise
except:
self.rollback()
raise ErrInfo(type="db", command_text=sql, exception_msg=exception_desc(), other_msg=u"Failed test for existence of Firebird view %s" % view_name)
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def schema_exists(self, schema_name):
return False
def role_exists(self, rolename):
curs = self.cursor()
curs.execute(u"SELECT DISTINCT USER FROM RDB$USER_PRIVILEGES WHERE USER = '%s' union " \
" SELECT DISTINCT RDB$ROLE_NAME FROM RDB$ROLES WHERE RDB$ROLE_NAME = '%s';" % (rolename, rolename))
rows = curs.fetchall()
curs.close()
return len(rows) > 0
def drop_table(self, tablename):
# Firebird will thrown an error if there are foreign keys into the table.
tablename = self.type.quoted(tablename)
self.execute(u"DROP TABLE %s;" % tablename)
self.conn.commit()
class DatabasePool(object):
# Define an object that maintains a set of database connection objects, each with
# a name (alias), and with the current and initial databases identified.
def __init__(self):
self.pool = {}
self.initial_db = None
self.current_db = None
self.do_rollback = True
def __repr__(self):
return u"DatabasePool()"
def add(self, db_alias, db_obj):
db_alias = db_alias.lower()
if db_alias == 'initial' and len(self.pool) > 0:
raise ErrInfo(type="error", other_msg="You may not use the name 'INITIAL' as a database alias.")
if len(self.pool) == 0:
self.initial_db = db_alias
self.current_db = db_alias
if db_alias in self.pool:
# Don't allow reassignment of a database that is used in any batch.
if status.batch.uses_db(self.pool[db_alias]):
raise ErrInfo(type="error", other_msg="You may not reassign the alias of a database that is currently used in a batch.")
self.pool[db_alias].close()
self.pool[db_alias] = db_obj
def aliases(self):
# Return a list of the currently defined aliases
return list(self.pool)
def current(self):
# Return the current db object.
return self.pool[self.current_db]
def current_alias(self):
# Return the alias of the current db object.
return self.current_db
def initial(self):
return self.pool[self.initial_db]
def aliased_as(self, db_alias):
return self.pool[db_alias]
def make_current(self, db_alias):
# Change the current database in use.
db_alias = db_alias.lower()
if not db_alias in self.pool:
raise ErrInfo(type="error", other_msg=u"Database alias '%s' is unrecognized; cannnot use it." % db_alias)
self.current_db = db_alias
def disconnect(self, alias):
if alias == self.current_db or (alias == 'initial' and 'initial' in self.pool):
raise ErrInfo(type="error", other_msg=u"Database alias %s can't be removed or redefined while it is in use." % alias)
if alias in self.pool:
self.pool[alias].close()
del self.pool[alias]
def closeall(self):
for alias, db in self.pool.items():
nm = db.name()
try:
if self.do_rollback:
db.rollback()
db.close()
except:
pass
self.__init__()
# End of database connections
#===============================================================================================
#===============================================================================================
#----- SCRIPTING
class BatchLevels(object):
# A stack to keep a record of the databases used in nested batches.
class Batch(object):
def __init__(self):
self.dbs_used = []
def __init__(self):
self.batchlevels = []
def in_batch(self):
return len(self.batchlevels) > 0
def new_batch(self):
self.batchlevels.append(self.Batch())
def using_db(self, db):
if len(self.batchlevels) > 0 and not db in self.batchlevels[-1].dbs_used:
self.batchlevels[-1].dbs_used.append(db)
def uses_db(self, db):
if len(self.batchlevels) == 0:
return False
for batch in self.batchlevels:
if db in batch.dbs_used:
return True
def rollback_batch(self):
if len(self.batchlevels) > 0:
b = self.batchlevels[-1]
for db in b.dbs_used:
db.rollback()
def end_batch(self):
b = self.batchlevels.pop()
for db in b.dbs_used:
db.commit()
class IfItem(object):
# An object representing an 'if' level, with context data.
def __init__(self, tf_value):
self.tf_value = tf_value
self.scriptline = current_script_line()
def value(self):
return self.tf_value
def invert(self):
self.tf_value = not self.tf_value
def change_to(self, tf_value):
self.tf_value = tf_value
def script_line(self):
return self.scriptline
class IfLevels(object):
# A stack of True/False values corresponding to a nested set of conditionals,
# with methods to manipulate and query the set of conditional states.
# This stack is used by the IF metacommand and related metacommands.
def __init__(self):
self.if_levels = []
def nest(self, tf_value):
self.if_levels.append(IfItem(tf_value))
def unnest(self):
if len(self.if_levels) == 0:
raise ErrInfo(type="error", other_msg="Can't exit an IF block; no IF block is active.")
else:
self.if_levels.pop()
def invert(self):
if len(self.if_levels) == 0:
raise ErrInfo(type="error", other_msg="Can't change the IF state; no IF block is active.")
else:
self.if_levels[-1].invert()
def replace(self, tf_value):
if len(self.if_levels) == 0:
raise ErrInfo(type="error", other_msg="Can't change the IF state; no IF block is active.")
else:
self.if_levels[-1].change_to(tf_value)
def current(self):
if len(self.if_levels) == 0:
raise ErrInfo(type="error", other_msg="No IF block is active.")
else:
return self.if_levels[-1].value()
def all_true(self):
if self.if_levels == []:
return True
return all([tf.value() for tf in self.if_levels])
def only_current_false(self):
# Returns True if the current if level is false and all higher levels are True.
# Metacommands such as ELSE and ENDIF are executed in this state.
if len(self.if_levels) == 0:
return False
elif len(self.if_levels) == 1:
return not self.if_levels[-1].value()
else:
return not self.if_levels[-1].value() and all([tf.value() for tf in self.if_levels[:-1]])
def script_lines(self, top_n):
# Returns a list of tuples containing the script name and line number
# for the topmost 'top_n' if levels, in bottom-up order.
if len(self.if_levels) < top_n:
raise ErrInfo(type="error", other_msg="Invalid IF stack depth reference.")
levels = self.if_levels[len(self.if_levels) - top_n:]
return [lvl.script_line() for lvl in levels]
class CounterVars(object):
# A dictionary of dynamically created named counter variables. Counter
# variables are created when first referenced, and automatically increment
# the integer value returned with each reference.
_COUNTER_RX = re.compile(r'!!\$(COUNTER_\d+)!!', re.I)
def __init__(self):
self.counters = {}
def _ctrid(self, ctr_no):
return u'counter_%d' % ctr_no
def set_counter(self, ctr_no, ctr_val):
self.counters[self._ctrid(ctr_no)] = ctr_val
def remove_counter(self, ctr_no):
ctr_id = self._ctrid(ctr_no)
if ctr_id in self.counters:
del self.counters[ctr_id]
def remove_all_counters(self):
self.counters = {}
def substitute(self, command_str):
# Substitutes any counter variable references with the counter
# value and returns the modified command string and a flag
# indicating whether any replacements were made.
match_found = False
m = self._COUNTER_RX.search(command_str, re.I)
if m:
ctr_id = m.group(1).lower()
if not ctr_id in self.counters:
self.counters[ctr_id] = 0
new_count = self.counters[ctr_id] + 1
self.counters[ctr_id] = new_count
return command_str.replace(u'!!$'+m.group(1)+u'!!', str(new_count)), True
return command_str, False
def substitute_all(self, any_text):
subbed = True
any_subbed = False
while subbed:
any_text, subbed = self.substitute(any_text)
if subbed:
any_subbed = True
return any_text, any_subbed
class SubVarSet(object):
# A pool of substitution variables. Each variable consists of a name and
# a (string) value. All variable names are stored as lowercase text.
# This is implemented as a list of tuples rather than a dictionary to enforce
# ordered substitution.
def __init__(self):
self.substitutions = []
#List of acceptable single-character variable name prefixes
self.prefix_list = ['$','&','@']
# Regex for matching
# Don't construct/compile on init because deepcopy() can't handle compiled regexes.
# 'Regular' variables, dereferenced with "!!"
self.var_rx = None
def compile_var_rx(self):
# Compile regex to validate variable name, using the prefix list
# This is: any character from the prefix (optionally), followed by one or more word chars
self.var_rx_str = r'^[' + "".join(self.prefix_list) + r']?\w+$'
self.var_rx = re.compile(self.var_rx_str, re.I)
def var_name_ok(self, varname):
if self.var_rx is None:
self.compile_var_rx()
return self.var_rx.match(varname) is not None
def check_var_name(self, varname):
if not self.var_name_ok(varname.lower()):
raise ErrInfo("error", other_msg="Invalid variable name (%s) in this context." % varname)
def remove_substitution(self, template_str):
self.check_var_name(template_str)
old_sub = template_str.lower()
self.substitutions = [sub for sub in self.substitutions if sub[0] != old_sub]
def add_substitution(self, varname, repl_str):
self.check_var_name(varname)
varname = varname.lower()
self.remove_substitution(varname)
self.substitutions.append((varname, repl_str))
def append_substitution(self, varname, repl_str):
self.check_var_name(varname)
varname = varname.lower()
oldsub = [x for x in self.substitutions if x[0] == varname]
if len(oldsub) == 0:
self.add_substitution(varname, repl_str)
else:
self.add_substitution(varname, "%s\n%s" % (oldsub[0][1], repl_str))
def varvalue(self, varname):
self.check_var_name(varname)
vname = varname.lower()
for vardef in self.substitutions:
if vardef[0] == vname:
return vardef[1]
return None
def increment_by(self, varname, numeric_increment):
self.check_var_name(varname)
varvalue = self.varvalue(varname)
if varvalue is None:
varvalue = "0"
self.add_substitution(varname, varvalue)
numvalue = as_numeric(varvalue)
numinc = as_numeric(numeric_increment)
if numvalue is None or numinc is None:
newval = "%s+%s" % (varvalue, numeric_increment)
else:
newval = str(numvalue + numinc)
self.add_substitution(varname, newval)
def sub_exists(self, template_str):
self.check_var_name(template_str)
test_str = template_str.lower()
return test_str in [s[0] for s in self.substitutions]
def merge(self, other_subvars):
# Return a new SubVarSet object with this object's variables
# merged with the 'other_subvars' substitutions; the latter
# takes precedence.
# Also merges the prefix lists
if other_subvars is not None:
newsubs = SubVarSet()
newsubs.substitutions = self.substitutions
newsubs.prefix_list = list(set(self.prefix_list + other_subvars.prefix_list))
newsubs.compile_var_rx()
for vardef in other_subvars.substitutions:
newsubs.add_substitution(vardef[0], vardef[1])
return newsubs
return self
def substitute(self, command_str):
# Replace any substitution variables in the command string.
# This does only one round of replacements: if the first round of replacements
# produces more substitution variables that could be replaced, those derived
# matching strings are not replaced. The second value returned by this
# function indicates whether any substitutions were made, so that this
# method can be called repeatedly.
match_found = False
if isinstance(command_str, str):
for match, sub in self.substitutions:
if sub is None:
sub = ''
sub = str(sub)
if match[0] == "$":
match = "\\"+match
if os.name != 'posix':
sub = sub.replace("\\", "\\\\")
pat = "!!%s!!" % match
patq = "!'!%s!'!" % match
patdq = '!"!%s!"!' % match
if re.search(pat, command_str, re.I):
return re.sub(pat, sub, command_str, flags=re.I), True
if re.search(patq, command_str, re.I):
sub = sub.replace("'", "''")
return re.sub(patq, sub, command_str, flags=re.I), True
if re.search(patdq, command_str, re.I):
sub = '"' + sub + '"'
return re.sub(patdq, sub, command_str, flags=re.I), True
return command_str, False
def substitute_all(self, any_text):
subbed = True
any_subbed = False
while subbed:
any_text, subbed = self.substitute(any_text)
if subbed:
any_subbed = True
return any_text, any_subbed
class LocalSubVarSet(SubVarSet):
# A pool of local substitution variables.
# Inherits everything from the base class except the allowed prefix list.
# For local variables, only '~' is allowed as a prefix and MUST be present
def __init__(self):
SubVarSet.__init__(self)
self.prefix_list = ['~']
def compile_var_rx(self):
# This is different from the base class because the prefix is required, not optional
self.var_rx_str = r'^[' + "".join(self.prefix_list) + r']\w+$'
self.var_rx = re.compile(self.var_rx_str, re.I)
class ScriptArgSubVarSet(SubVarSet):
# A pool of script argument names.
# Inherits everything from the base class except the allowed prefix list.
# For script arguments, only '#' is allowed as a prefix and MUST be present
def __init__(self):
SubVarSet.__init__(self)
self.prefix_list = ['#']
def compile_var_rx(self):
# This is different from the base class because the prefix is required, not optional
self.var_rx_str = r'^[' + "".join(self.prefix_list) + r']\w+$'
self.var_rx = re.compile(self.var_rx_str, re.I)
class MetaCommand(object):
# A compiled metacommand that can be run if it matches a metacommand command string in the input.
def __init__(self, rx, exec_func, description=None, run_in_batch=False, run_when_false=False, set_error_flag=True):
# rx: a compiled regular expression
# exec_func: a function object that carries out the work of the metacommand.
# This function must take keyword arguments corresponding to those named
# in the regex, and must return a value (which is used only for conditional
# metacommands) or None.
# run_in_batch: determines whether a metacommand should be run inside a batch. Only 'END BATCH'
# should be run inside a batch.
# run_when_false: determines whether a metacommand should be run when the exec state is False.
# only 'ELSE', 'ELSEIF', 'ORIF', and 'ENDIF' should be run when False, and only when
# all higher levels are True. This condition is evaluated by the script processor.
# set_error_flag: When run, sets or clears status.metacommand_error.
self.next_node = None
self.rx = rx
self.exec_fn = exec_func
self.description = description
self.run_in_batch = run_in_batch
self.run_when_false = run_when_false
self.set_error_flag = set_error_flag
self.hitcount = 0
def __repr__(self):
return u"MetaCommand(%r, %r, %r, %r, %r)" % (self.rx.pattern, self.exec_fn, self.description,
self.run_in_batch, self.run_when_false)
def run(self, cmd_str):
# Runs the metacommand if the command string matches the regex.
# Returns a 2-tuple consisting of:
# 0. True or False indicating whether the metacommand applies. If False, the
# remaining return value is None and has no meaning.
# 1. The return value of the metacommand function.
# Exceptions are caught and converted to ErrInfo exceptions.
m = self.rx.match(cmd_str.strip())
if m:
cmdargs = m.groupdict()
cmdargs['metacommandline'] = cmd_str
er = None
# Breakpoint for debugging metacommands.
#breakpoint()
try:
rv = self.exec_fn(**cmdargs)
except ErrInfo as errinf:
# This variable reassignment is required by Python 3;
# if the line is "except ErrInfo as er:" then an
# UnboundLocalError occurs at the "if er:" statement.
er = errinf
except:
er = ErrInfo("cmd", command_text=cmd_str, exception_msg=exception_desc())
if er:
if status.halt_on_metacommand_err:
raise er
if self.set_error_flag:
status.metacommand_error = True
return True, None
else:
if self.set_error_flag:
status.metacommand_error = False
self.hitcount += 1
return True, rv
return False, None
class MetaCommandList(object):
# The head node for a linked list of MetaCommand objects.
def __init__(self):
self.next_node = None
def __iter__(self):
n1 = self.next_node
while n1 is not None:
yield n1
n1 = n1.next_node
def insert_node(self, new_node):
new_node.next_node = self.next_node
self.next_node = new_node
def add(self, matching_regexes, exec_func, description=None, run_in_batch=False, run_when_false=False, set_error_flag=True):
# Creates a new Metacomman and adds it at the head of the linked list.
if type(matching_regexes) in (tuple, list):
self.regexes = [re.compile(rx, re.I) for rx in tuple(matching_regexes)]
else:
self.regexes = [re.compile(matching_regexes, re.I)]
for rx in self.regexes:
self.insert_node(MetaCommand(rx, exec_func, description, run_in_batch, run_when_false, set_error_flag))
def eval(self, cmd_str):
# Evaluates the given metacommand string (line from the SQL script).
# Searches the linked list of MetaCommand objects. If a match is found, the metacommand
# is run, and that MetaCommand is moved to the head of the list.
# Returns a 2-tuple consisting of:
# 0. True or False indicating whether the metacommand applies. If False, the
# remaining return value is None and has no meaning.
# 1. The return value of the metacommand function.
# Exceptions are caught and converted to ErrInfo exceptions.
n1 = self
node_no = 0
while n1 is not None:
n2 = n1.next_node
if n2 is not None:
node_no += 1
if if_stack.all_true() or n2.run_when_false:
success, value = n2.run(cmd_str)
if success:
# Move n2 to the head of the list.
n1.next_node = n2.next_node
n2.next_node = self.next_node
self.next_node = n2
return True, value
n1 = n2
return False, None
def get_match(self, cmd):
# Tries to match the command 'cmd' to any MetaCommand. If a match
# is found, returns a tuple containing the MetaCommand object and
# the match object; if not, returns None.
n1 = self.next_node
while n1 is not None:
m = n1.rx.match(cmd.strip())
if m is not None:
return (n1, m)
n1 = n1.next_node
return None
# Global linked lists of MetaCommand objects (commands and conditional tests).
# These are filled in the 'MetaCommand Functions' and 'Conditional Tests for Metacommands' sections.
metacommandlist = MetaCommandList()
conditionallist = MetaCommandList()
class SqlStmt(object):
# A SQL statement to be passed to a database to execute.
# The purpose of storing a SQL statement as a SqlStmt object rather
# than as a simple string is to allow the definition of a 'run()'
# method that is different from the 'run()' method of a MetacommandStmt.
# In effect, the SqlStmt and MetacommandStmt classes are both
# subclasses of a Stmt class, but the Stmt class, and subclassing,
# are not implemented because the Stmt class would be trivial: just
# an assignment in the init method.
def __init__(self, sql_statement):
self.statement = re.sub(r'\s*;(\s*;\s*)+$', ';', sql_statement)
def __repr__(self):
return u"SqlStmt(%s)" % self.statement
def run(self, localvars=None, commit=True):
# Run the SQL statement on the current database. The current database
# is obtained from the global database pool "dbs".
# 'localvars' must be a SubVarSet object.
if if_stack.all_true():
e = None
status.sql_error = False
cmd = substitute_vars(self.statement, localvars)
if varlike.search(cmd):
lno = current_script_line()
script_errors.append(["There is a potential un-substituted variable in the command %s" % cmd, lno])
try:
db = dbs.current()
db.execute(cmd)
if commit:
db.commit()
except ErrInfo as errinfo:
# This variable reassignment is required by Python 3;
# if the line is "except ErrInfo as e:" then an
# UnboundLocalError occurs at the "if e:" statement.
e = errinfo
except:
e = ErrInfo(type="exception", exception_msg=exception_desc())
if e:
subvars.add_substitution("$LAST_ERROR", cmd)
status.sql_error = True
if status.halt_on_err:
raise e
return
subvars.add_substitution("$LAST_SQL", cmd)
def commandline(self):
return self.statement
class MetacommandStmt(object):
# A metacommand to be handled by execsql.
def __init__(self, metacommand_statement):
self.statement = metacommand_statement
def __repr__(self):
return u"MetacommandStmt(%s)" % self.statement
def run(self, localvars=None, commit=False):
# Tries all metacommands in "metacommandlist" until one runs.
# Returns the result of the metacommand that was run, or None.
# Arguments:
# localvars: a SubVarSet object.
# commit : not used; included to allow an isomorphic interface with SqlStmt.run().
errmsg = "Unknown metacommand"
cmd = substitute_vars(self.statement, localvars)
if if_stack.all_true() and varlike.search(cmd):
lno = current_script_line()
script_errors.append(["There is a potential un-substituted variable in the command %s" % cmd, lno])
e = None
try:
applies, result = metacommandlist.eval(cmd)
if applies:
return result
except ErrInfo as errinfo:
# This variable reassignment is required by Python 3;
# if the line is "except ErrInfo as e:" then an
# UnboundLocalError occurs at the "if e:" statement.
e = errinfo
except:
e = ErrInfo(type="exception", exception_msg=exception_desc())
if e:
status.metacommand_error = True
subvars.add_substitution("$LAST_ERROR", cmd)
if status.halt_on_metacommand_err:
raise e
#raise ErrInfo(type="cmd", command_text=cmd, other_msg=errmsg)
if if_stack.all_true():
# but nothing applies, because we got here.
status.metacommand_error = True
lno = current_script_line()
script_errors.append(["%s: %s" % (errmsg, cmd), lno])
#raise ErrInfo(type="cmd", command_text=cmd, other_msg=errmsg)
return None
def commandline(self):
# Returns the SQL or metacommand as in a script
return u"-- !x! " + self.statement
class ScriptCmd(object):
# A SQL script object that is either a SQL statement (SqlStmt object)
# or an execsql metacommand (MetacommandStmt object).
# This is the basic uniform internal representation of a single
# command or statement from an execsql script file.
# The object attributes include source file information.
# 'command_type' is "sql" or "cmd".
# 'script_command' is either a SqlStmt or a MetacommandStmt object.
def __init__(self, command_source_name, command_line_no, command_type, script_command):
self.source = command_source_name
self.line_no = command_line_no
self.command_type = command_type
self.command = script_command
def __repr__(self):
return u"ScriptCmd(%r, %r, %r, %r)" % (self.source, self.line_no, self.command_type, repr(self.command))
def current_script_line(self):
return self.line_no
def commandline(self):
# Returns the SQL or metacommand as in a script
return self.command.statement if self.command_type == "sql" else u"-- !x! " + self.command.statement
class CommandList(object):
# A list of ScriptCmd objects, including an index into the list, an
# optional list of parameter names, and an optional set of parameter
# values (SubvarSet). This is the basic internal representation of
# a list of interspersed SQL commands and metacommands.
def __init__(self, cmdlist, listname, paramnames=None):
# Arguments:
# cmdlist : A Python list of ScriptCmd objects. May be an empty list.
# listname : A string to identify the list (e.g., a source file name or SCRIPT name).
# paramnames : A list of strings identifying parameters the script expects.
# Parameter names will be used to check the names of actual arguments
# if they are specified, but are optional: a sub-script may take
# arguments even if parameter names have not been specified.
if cmdlist is None:
raise ErrInfo("error", other_msg="Initiating a command list without any commands.")
self.listname = listname
self.cmdlist = cmdlist
self.cmdptr = 0
self.paramnames = paramnames
self.paramvals = None
# Local variables must start with a tilde. Other types are not allowed.
self.localvars = LocalSubVarSet()
self.init_if_level = None
def add(self, script_command):
# Adds the given ScriptCmd object to the end of the command list.
self.cmdlist.append(script_command)
def set_paramvals(self, paramvals):
# Parameter values should ordinarily set immediately before the script
# (command list) is run.
# Arguments:
# paramvals : A SubVarSet object.
self.paramvals = paramvals
if self.paramnames is not None:
# Check that all named parameters are provided.
# Strip '#' off passed parameter names
passed_paramnames = [p[0][1:] if p[0][0]=='#' else p[0][1:] for p in paramvals.substitutions]
if not all([p in passed_paramnames for p in self.paramnames]):
raise ErrInfo("error", other_msg="Formal and actual parameter name mismatch in call to %s." % self.listname)
def current_command(self):
if self.cmdptr > len(self.cmdlist) - 1:
return None
return self.cmdlist[self.cmdptr]
def check_iflevels(self):
if_excess = len(if_stack.if_levels) - self.init_if_level
if if_excess > 0:
sources = if_stack.script_lines(if_excess)
src_msg = ", ".join(["input line %s" % src for src in sources])
raise ErrInfo(type="error", other_msg="IF level mismatch at beginning and end of script; origin at or after: %s." % src_msg)
def run_and_increment(self):
global last_command
global loop_nest_level
cmditem = self.cmdlist[self.cmdptr]
if compiling_loop:
# Don't run this command, but save it or complete the loop and add the loop's set of commands to the stack.
if cmditem.command_type == 'cmd' and loop_rx.match(cmditem.command.statement):
loop_nest_level += 1
# Substitute any deferred substitution variables with regular substition var flags, e.g.: "!!somevar!!"
m = defer_rx.findall(cmditem.command.statement)
if m is not None:
for dv in m:
rep = "!!" + dv[1] + "!!"
cmditem.command.statement = cmditem.command.statement.replace(dv[0], rep)
loopcommandstack[-1].add(cmditem)
elif cmditem.command_type == 'cmd' and endloop_rx.match(cmditem.command.statement):
if loop_nest_level == 0:
endloop()
else:
loop_nest_level -= 1
loopcommandstack[-1].add(cmditem)
else:
loopcommandstack[-1].add(cmditem)
else:
last_command = cmditem
if cmditem.command_type == "sql" and status.batch.in_batch():
status.batch.using_db(dbs.current())
subvars.add_substitution("$CURRENT_SCRIPT", cmditem.source)
subvars.add_substitution("$CURRENT_SCRIPT_PATH", os.path.dirname(os.path.abspath(cmditem.source)) + os.sep)
subvars.add_substitution("$CURRENT_SCRIPT_NAME", os.path.basename(cmditem.source))
subvars.add_substitution("$CURRENT_SCRIPT_LINE", str(cmditem.line_no))
subvars.add_substitution("$SCRIPT_LINE", str(cmditem.line_no))
cmditem.command.run(self.localvars.merge(self.paramvals), not status.batch.in_batch())
self.cmdptr += 1
def run_next(self):
global last_command
if self.cmdptr == 0:
self.init_if_level = len(if_stack.if_levels)
if self.cmdptr > len(self.cmdlist) - 1:
self.check_iflevels()
raise StopIteration
self.run_and_increment()
def __iter__(self):
return self
def __next__(self):
if self.cmdptr > len(self.cmdlist) - 1:
raise StopIteration
scriptcmd = self.cmdlist[self.cmdptr]
self.cmdptr += 1
return scriptcmd
class CommandListWhileLoop(CommandList):
# Subclass of CommandList() that will loop WHILE a condition is met.
# Additional argument:
# loopcondition : A string containing the conditional for continuing the WHILE loop.
def __init__(self, cmdlist, listname, paramnames, loopcondition):
super(CommandListWhileLoop, self).__init__(cmdlist, listname, paramnames)
self.loopcondition = loopcondition
def run_next(self):
global last_command
if self.cmdptr == 0:
self.init_if_level = len(if_stack.if_levels)
if not CondParser(substitute_vars(self.loopcondition)).parse().eval():
raise StopIteration
if self.cmdptr > len(self.cmdlist) - 1:
self.check_iflevels()
self.cmdptr = 0
else:
self.run_and_increment()
class CommandListUntilLoop(CommandList):
# Subclass of CommandList() that will loop UNTIL a condition is met.
# Additional argument:
# loopcondition : A string containing the conditional for terminating the UNTIL loop.
def __init__(self, cmdlist, listname, paramnames, loopcondition):
super(CommandListUntilLoop, self).__init__(cmdlist, listname, paramnames)
self.loopcondition = loopcondition
def run_next(self):
global last_command
if self.cmdptr == 0:
self.init_if_level = len(if_stack.if_levels)
if self.cmdptr > len(self.cmdlist) - 1:
self.check_iflevels()
if CondParser(substitute_vars(self.loopcondition)).parse().eval():
raise StopIteration
self.cmdptr = 0
else:
self.run_and_increment()
class ScriptFile(EncodedFile):
# A file reader that returns lines and records the line number.
def __init__(self, scriptfname, file_encoding):
super(ScriptFile, self).__init__(scriptfname, file_encoding)
self.lno = 0
self.f = self.open("r")
def __repr__(self):
return u"ScriptFile(%r, %r)" % (super(ScriptFile, self).filename, super(ScriptFile, self).encoding)
def __iter__(self):
return self
def __next__(self):
l = next(self.f)
self.lno += 1
return l
class ScriptExecSpec(object):
# An object that stores the specifications for executing a SCRIPT,
# for later use. This is specifically intended to be used by
# ON ERROR_HALT EXECUTE SCRIPT and ON CANCEL_HALT EXECUTE SCRIPT.
args_rx = re.compile(r'(?P#?\w+)\s*=\s*(?P(?:(?:[^"\'\[][^,\)]*)|(?:"[^"]*")|(?:\'[^\']*\')|(?:\[[^\]]*\])))', re.I)
def __init__(self, **kwargs):
self.script_id = kwargs["script_id"].lower()
if self.script_id not in savedscripts.keys():
raise ErrInfo("cmd", other_msg="There is no SCRIPT named %s." % self.script_id)
self.arg_exp = kwargs["argexp"]
self.looptype = kwargs["looptype"].upper() if "looptype" in kwargs and kwargs["looptype"] is not None else None
self.loopcond = kwargs["loopcond"] if "loopcond" in kwargs else None
def execute(self):
# Copy the saved script because otherwise the memory-recovery nullification
# of completed commands will erase the saved script commands.
cl = copy.deepcopy(savedscripts[self.script_id])
# If looping is specified, redirect to appropriate CommandList() subclass
if self.looptype is not None:
cl = CommandListWhileLoop(cl.cmdlist, cl.listname, cl.paramnames, self.loopcond) if self.looptype == 'WHILE' else CommandListUntilLoop(cl.cmdlist, cl.listname, cl.paramnames, self.loopcond)
# If there are any argument expressions, parse the arguments
if self.arg_exp is not None:
# Clean arg_exp
all_args = re.findall(self.args_rx, self.arg_exp)
all_cleaned_args = [(ae[0], wo_quotes(ae[1])) for ae in all_args]
# Prepend '#' on each param name if the user did not include one
all_prepared_args = [(ae[0] if ae[0][0]=='#' else '#' + ae[0], ae[1]) for ae in all_cleaned_args]
scriptvarset = ScriptArgSubVarSet()
for param, arg in all_prepared_args:
scriptvarset.add_substitution(param, arg)
cl.set_paramvals(scriptvarset)
# If argument expressions were NOT found, confirm that the command list is not expecting named params
else:
# because if it IS, there's a problem.
if cl.paramnames is not None:
raise ErrInfo("error", other_msg="Missing expected parameters (%s) in call to %s." % (", ".join(cl.paramnames), cl.listname))
commandliststack.append(cl)
# End of scripting classes.
#===============================================================================================
#===============================================================================================
#----- Parsers
#
# Parsers for conditional and numeric expressions.
#-------------------------------------------------------------------------------------
# Source string objects. These are strings (metacommands arguments) with
# a pointer into the string.
#-------------------------------------------------------------------------------------
class SourceString(object):
def __init__(self, source_string):
self.str = source_string
self.currpos = 0
def eoi(self):
# Returns True or False indicating whether or not there is any of
# the source string left to be consumed.
return self.currpos >= len(self.str)
def eat_whitespace(self):
while not self.eoi() and self.str[self.currpos] in [' ', '\t', '\n']:
self.currpos += 1
def match_str(self, str):
# Tries to match the 'str' argument at the current position in the
# source string. Matching is case-insensitive. If matching succeeds,
# the matched string is returned and the internal pointer is incremented.
# If matching fails, None is returned and the internal pointer is unchanged.
self.eat_whitespace()
if self.eoi():
return None
else:
found = self.str.lower().startswith(str.lower(), self.currpos)
if found:
matched = self.str[self.currpos:self.currpos+len(str)]
self.currpos += len(str)
return matched
else:
return None
def match_regex(self, regex):
# Tries to match the 'regex' argument at the current position in the
# source string. If it succeeds, a dictionary of all of the named
# groups is returned, and the internal pointer is incremented.
self.eat_whitespace()
if self.eoi():
return None
else:
m = regex.match(self.str[self.currpos:])
if m:
self.currpos += m.end(0)
return m.groupdict() or {}
else:
return None
def match_metacommand(self, commandlist):
# Tries to match text at the current position to any metacommand
# in the specified commandlist.
# If it succeeds, the return value is a tuple of the MetaCommand object
# and a dictionary of all of the named groups. The internal pointer is
# incremented past the match.
self.eat_whitespace()
if self.eoi():
return None
else:
m = commandlist.get_match(self.str[self.currpos:])
if m is not None:
self.currpos += m[1].end(0)
return (m[0], m[1].groupdict() or {})
else:
return None
def remainder(self):
return self.str[self.currpos:]
#-------------------------------------------------------------------------------------
# Classes for AST operator types.
#-------------------------------------------------------------------------------------
class CondTokens(object):
AND, OR, NOT, CONDITIONAL = range(4)
class NumTokens(object):
MUL, DIV, ADD, SUB, NUMBER = range(5)
#-------------------------------------------------------------------------------------
# AST for conditional expressions
#-------------------------------------------------------------------------------------
class CondAstNode(CondTokens, object):
def __init__(self, type, cond1, cond2):
# 'type' should be one of the constants AND, OR, NOT, CONDITIONAL.
# For AND and OR types, 'cond1' and 'cond2' should be a subtree (a CondAstNode)
# For NOT type, 'cond1' should be a CondAstNOde and 'cond2' should be None
# For CONDITIONAL type, cond1' should be a tuple consisting of metacommand object and
# its dictionary of named groups (mcmd, groupdict) and 'cond2' should be None.
self.type = type
self.left = cond1
if type not in (self.CONDITIONAL, self.NOT):
self.right = cond2
else:
self.right = None
def eval(self):
# Evaluates the subtrees and/or conditional value for this node,
# returning True or False.
if self.type == self.CONDITIONAL:
exec_fn = self.left[0].exec_fn
cmdargs = self.left[1]
return exec_fn(**cmdargs)
if self.type == self.NOT:
return not self.left.eval()
lcond = self.left.eval()
if self.type == self.AND:
if not lcond: return False
return self.right.eval()
if self.type == self.OR:
if lcond: return True
return self.right.eval()
#-------------------------------------------------------------------------------------
# AST for numeric expressions
#-------------------------------------------------------------------------------------
class NumericAstNode(NumTokens, object):
def __init__(self, type, value1, value2):
# 'type' should be one of the constants MUL, DIV, ADD, SUB, OR NUMBER.
# 'value1' and 'value2' should each be either a subtree (a
# NumericAstNode) or (only 'value1' should be) a number.
self.type = type
self.left = value1
if type != self.NUMBER:
self.right = value2
else:
self.right = None
def eval(self):
# Evaluates the subtrees and/or numeric value for this node,
# returning a numeric value.
if self.type == self.NUMBER:
return self.left
else:
lnum = self.left.eval()
rnum = self.right.eval()
if self.type == self.MUL:
return lnum * rnum
elif self.type == self.DIV:
return lnum / rnum
elif self.type == self.ADD:
return lnum + rnum
else:
return lnum - rnum
#-------------------------------------------------------------------------------------
# Conditional Parser
#-------------------------------------------------------------------------------------
class CondParserError(Exception):
def __init__(self, msg):
self.value = msg
def __repr__(self):
return "ConditionalParserError(%r)" % self.value
class CondParser(CondTokens, object):
# Takes a conditional expression string.
def __init__(self, condexpr):
self.condexpr = condexpr
self.cond_expr = SourceString(condexpr)
def match_not(self):
# Try to match 'NOT' operator. If not found, return None
m1 = self.cond_expr.match_str('NOT')
if m1 is not None:
return self.NOT
return None
def match_andop(self):
# Try to match 'AND' operator. If not found, return None
m1 = self.cond_expr.match_str('AND')
if m1 is not None:
return self.AND
return None
def match_orop(self):
# Try to match 'OR' operator. If not found, return None
m1 = self.cond_expr.match_str('OR')
if m1 is not None:
return self.OR
return None
def factor(self):
m1 = self.match_not()
if m1 is not None:
m1 = self.factor()
return CondAstNode(self.NOT, m1, None)
# Find the matching metacommand -- get a tuple consisting of (metacommand, groupdict)
m1 = self.cond_expr.match_metacommand(conditionallist)
if m1 is not None:
m1[1]["metacommandline"] = self.condexpr
return CondAstNode(self.CONDITIONAL, m1, None)
else:
if self.cond_expr.match_str("(") is not None:
m1 = self.expression()
rp = self.cond_expr.match_str(")")
if rp is None:
raise CondParserError("Expected closing parenthesis at position %s of %s." % (self.cond_expr.currpos, self.cond_expr.str))
return m1
else:
raise CondParserError("Can't parse a factor at position %s of %s." % (self.cond_expr.currpos, self.cond_expr.str))
def term(self):
m1 = self.factor()
andop = self.match_andop()
if andop is not None:
m2 = self.term()
return CondAstNode(andop, m1, m2)
else:
return m1
def expression(self):
e1 = self.term()
orop = self.match_orop()
if orop is not None:
e2 = self.expression()
return CondAstNode(orop, e1, e2)
else:
return e1
def parse(self):
exp = self.expression()
if not self.cond_expr.eoi():
raise CondParserError("Conditional expression parser did not consume entire string; remainder = %s." % self.cond_expr.remainder())
return exp
#-------------------------------------------------------------------------------------
# Numeric Parser
#-------------------------------------------------------------------------------------
class NumericParserError(Exception):
def __init__(self, msg):
self.value = msg
def __repr__(self):
return "NumericParserError(%r)" % self.value
class NumericParser(NumTokens, object):
# Takes a numeric expression string
def __init__(self, numexpr):
self.num_expr = SourceString(numexpr)
self.rxint = re.compile(r'(?P[+-]?[0-9]+)')
self.rxfloat = re.compile(r'(?P[+-]?(?:(?:[0-9]*\.[0-9]+)|(?:[0-9]+\.[0-9]*)))')
def match_number(self):
# Try to match a number in the source string.
# Return it if matched, return None if unmatched.
m1 = self.num_expr.match_regex(self.rxfloat)
if m1 is not None:
return float(m1['float_num'])
else:
m2 = self.num_expr.match_regex(self.rxint)
if m2 is not None:
return int(m2['int_num'])
return None
def match_mulop(self):
# Try to match a multiplication or division operator in the source string.
# if found, return the matching operator type. If not found, return None.
m1 = self.num_expr.match_str("*")
if m1 is not None:
return self.MUL
else:
m2 = self.num_expr.match_str("/")
if m2 is not None:
return self.DIV
return None
def match_addop(self):
# Try to match an addition or division operator in the source string.
# if found, return the matching operator type. If not found, return None.
m1 = self.num_expr.match_str("+")
if m1 is not None:
return self.ADD
else:
m2 = self.num_expr.match_str("-")
if m2 is not None:
return self.SUB
return None
def factor(self):
# Parses a factor out of the source string and returns the
# AST node that is created.
m1 = self.match_number()
if m1 is not None:
return NumericAstNode(self.NUMBER, m1, None)
else:
if self.num_expr.match_str("(") is not None:
m1 = self.expression()
rp = self.num_expr.match_str(")")
if rp is None:
raise NumericParserError("Expected closing parenthesis at position %s of %s." % (self.num_expr.currpos, self.num_expr.str))
else:
return m1
else:
raise NumericParserError("Can't parse a factor at position %s of %s." % (self.num_expr.currpos, self.num_expr.str))
def term(self):
# Parses a term out of the source string and returns the
# AST node that is created.
m1 = self.factor()
mulop = self.match_mulop()
if mulop is not None:
m2 = self.term()
return NumericAstNode(mulop, m1, m2)
else:
return m1
def expression(self):
# Parses an expression out of the source string and returns the
# AST node that is created.
e1 = self.term()
if e1 is None:
return
addop = self.match_addop()
if addop is not None:
e2 = self.expression()
return NumericAstNode(addop, e1, e2)
else:
return e1
def parse(self):
exp = self.expression()
if not self.num_expr.eoi():
raise NumericParserError("Numeric expression parser did not consume entire string; remainder = %s." % self.num_expr.remainder())
return exp
# End of Parser classes
#===============================================================================================
#===============================================================================================
#----- METACOMMAND FUNCTIONS
#**** DEBUG WRITE METACOMMANDLIST
# Undocumented; used to acquire data used to set the ordering of metacommands.
def x_debug_write_metacommands(**kwargs):
output_dest = kwargs['filename']
if output_dest is None or output_dest == 'stdout':
ofile = output
else:
ofile = EncodedFile(output_dest, conf.output_encoding).open("w")
for m in metacommandlist:
ofile.write(u"(%s) %s\n" % (m.hitcount, m.rx.pattern))
metacommandlist.add(ins_fn_rxs(r'^\s*DEBUG\s+WRITE\s+METACOMMANDLIST\s+TO\s+', r'\s*$'), x_debug_write_metacommands)
#**** BREAK
def x_break(**kwargs):
global commandlistack
global loopcommandstack
if len(commandliststack) == 1:
line_no = current_script_line()
script_errors.append(["BREAK metacommand with no command nesting", line_no])
else:
if_stack.if_levels = if_stack.if_levels[:commandliststack[-1].init_if_level]
commandliststack.pop()
return None
metacommandlist.add(r'^\s*BREAK\s*$', x_break)
#**** CD
def x_cd(**kwargs):
new_dir = unquoted(kwargs['dir'])
if not os.path.isdir(new_dir):
raise ErrInfo(type="cmd", command_text=kwargs['metacommandline'], other_msg='Directory does not exist')
os.chdir(new_dir)
lno = current_script_line()
return None
metacommandlist.add(r'^\s*CD\s+(?P.+)\s*$', x_cd)
#**** SUB_LOCAL
# Define a local variable. Local variables must start with a tilde. As a convenience, one
# will be added if missing.
def x_sub_local(**kwargs):
varname = kwargs['match']
if varname[0] != '~':
varname = '~' + varname
global commandliststack
commandliststack[-1].localvars.add_substitution(varname, kwargs['repl'])
return None
metacommandlist.add(r'^\s*SUB_LOCAL\s+(?P~?\w+)\s+(?P.+)$', x_sub_local, "SUB", "Define a local variable consisting of a string to match and a replacement for it.")
#**** WAIT_UNTIL
def x_wait_until(**kwargs):
countdown = int(kwargs['seconds'])
while countdown > 0:
if xcmd_test(kwargs['condition']):
return
time.sleep(1)
countdown -= 1
if kwargs['end'].lower() == 'halt':
return None
return None
metacommandlist.add(r'^\s*WAIT_UNTIL\s+(?P.+)\s+(?PHALT|CONTINUE)\s+AFTER\s+(?P\d+)\s+SECONDS\s*$', x_wait_until)
#**** ON ERROR_HALT EXECUTE SCRIPT CLEAR
def x_error_halt_exec_clear(**kwargs):
global err_halt_exec
err_halt_exec = None
metacommandlist.add(r'^\s*ON\s+ERROR_HALT\s+EXEC\s+CLEAR\s*$', x_error_halt_exec_clear)
#**** RESET COUNTER
def x_reset_counter(**kwargs):
ctr_no = int(kwargs["counter_no"])
counters.remove_counter(ctr_no)
metacommandlist.add(r'^\s*RESET\s+COUNTER\s+(?P\d+)\s*$', x_reset_counter)
#**** RESET COUNTERS
def x_reset_counters(**kwargs):
counters.remove_all_counters()
metacommandlist.add(r'^\s*RESET\s+COUNTERS\s*$', x_reset_counters)
#**** SET COUNTER
def x_set_counter(**kwargs):
ctr_no = int(kwargs["counter_no"])
ctr_expr = kwargs["value"]
counters.set_counter(ctr_no, int(math.floor(NumericParser(ctr_expr).parse().eval())))
metacommandlist.add(r'^\s*SET\s+COUNTER\s+(?P\d+)\s+TO\s+(?P[0-9+\-*/() ]+)\s*$', x_set_counter)
#**** TIMER
def x_timer(**kwargs):
onoff = kwargs["onoff"].lower()
if onoff == 'on':
timer.start()
else:
timer.stop()
metacommandlist.add(r'^\s*TIMER\s+(?PON|OFF)\s*$', x_timer)
#**** BEGIN BATCH
def x_begin_batch(**kwargs):
status.batch.new_batch()
return None
metacommandlist.add(r'^\s*BEGIN\s+BATCH\s*$', x_begin_batch)
#**** END BATCH
def x_end_batch(**kwargs):
status.batch.end_batch()
return None
# Set a name so this can be found and evaluated during processing, when all other metacommands are ignored.
metacommandlist.add(r'^\s*END\s+BATCH\s*$', x_end_batch, "END BATCH", run_in_batch=True)
#**** ROLLBACK BATCH
def x_rollback(**kwargs):
status.batch.rollback_batch()
metacommandlist.add(r'^\s*ROLLBACK(:?\s+BATCH)?\s*$', x_rollback, "ROLLBACK BATCH", run_in_batch=True)
#**** ERROR_HALT
def x_error_halt(**kwargs):
flag = kwargs['onoff'].lower()
if not flag in ('on', 'off', 'yes', 'no', 'true', 'false'):
raise ErrInfo(type="cmd", command_text=kwargs["metacommandline"], other_msg=u"Unrecognized flag for error handling: %s" % flag)
status.halt_on_err = flag in ('on', 'yes', 'true')
return None
metacommandlist.add(r'\s*ERROR_HALT\s+(?PON|OFF|YES|NO|TRUE|FALSE)\s*$', x_error_halt)
#**** METACOMMAND_ERROR_HALT
def x_metacommand_error_halt(**kwargs):
flag = kwargs['onoff'].lower()
if not flag in ('on', 'off', 'yes', 'no', 'true', 'false'):
raise ErrInfo(type="cmd", command_text=kwargs["metacommandline"], other_msg=u"Unrecognized flag for metacommand error handling: %s" % flag)
status.halt_on_metacommand_err = flag in ('on', 'yes', 'true')
return None
metacommandlist.add(r'\s*METACOMMAND_ERROR_HALT\s+(?PON|OFF|YES|NO|TRUE|FALSE)\s*$', x_metacommand_error_halt, set_error_flag=False)
#**** LOOP
def x_loop(**kwargs):
global compiling_loop
compiling_loop = True
looptype = kwargs["looptype"].upper()
loopcond = kwargs["loopcond"]
listname = 'loop'+str(len(loopcommandstack)+1)
if looptype == 'WHILE':
loopcommandstack.append(CommandListWhileLoop([], listname, paramnames=None, loopcondition=loopcond))
else:
loopcommandstack.append(CommandListUntilLoop([], listname, paramnames=None, loopcondition=loopcond))
metacommandlist.add(r'^\s*LOOP\s+(?PWHILE|UNTIL)\s*\(\s*(?P.+)\s*\)\s*$', x_loop)
#**** END LOOP
def endloop():
if len(loopcommandstack) == 0:
raise ErrInfo("error", other_msg="END LOOP metacommand without a matching preceding LOOP metacommand.")
global compiling_loop
compiling_loop = False
commandliststack.append(loopcommandstack[-1])
loopcommandstack.pop()
#**** SUB_EMPTY
def x_sub_empty(**kwargs):
varname = kwargs['match']
# Get subvarset assignment and cleansed variable name
subvarset, varname = get_subvarset(varname, kwargs['metacommandline'])
subvarset.add_substitution(varname, u'')
return None
metacommandlist.add(r'^\s*SUB_EMPTY\s+(?P[+~]?\w+)\s*$', x_sub_empty)
#**** SUB_ADD
def x_sub_add(**kwargs):
varname = kwargs["match"]
increment_expr = kwargs["increment"]
# Get subvarset assignment and cleansed variable name
subvarset, varname = get_subvarset(varname, kwargs['metacommandline'])
subvarset.increment_by(varname, NumericParser(increment_expr).parse().eval())
return None
metacommandlist.add(r'^\s*SUB_ADD\s+(?P[+~]?\w+)\s+(?P[+\-0-9\.*/() ]+)\s*$', x_sub_add)
#**** SUB_APPEND
def x_sub_append(**kwargs):
varname = kwargs["match"]
# Get subvarset assignment and cleansed variable name
subvarset, varname = get_subvarset(varname, kwargs['metacommandline'])
subvarset.append_substitution(varname, kwargs['repl'])
return None
metacommandlist.add(r'^\s*SUB_APPEND\s+(?P[+~]?\w+)\s(?P(.|\n)*)$', x_sub_append)
#**** BLOCK ORIF
def x_if_orif(**kwargs):
if if_stack.all_true():
return None # Short-circuit evaluation
if if_stack.only_current_false():
if_stack.replace(xcmd_test(kwargs['condtest']))
return None
metacommandlist.add(r'^\s*ORIF\s*\(\s*(?P.+)\s*\)\s*$', x_if_orif, run_when_false=True)
#**** EXTEND SCRIPT WITH SCRIPT
#**** APPEND SCRIPT
def x_extendscript(**kwargs):
script1 = kwargs["script1"].lower()
if script1 not in savedscripts:
raise ErrInfo("cmd", other_msg="There is no SCRIPT named %s." % script1)
script2 = kwargs["script2"].lower()
if script1 not in savedscripts:
raise ErrInfo("cmd", other_msg="There is no SCRIPT named %s." % script2)
s1 = savedscripts[script1]
s2 = savedscripts[script2]
for cmd in s1.cmdlist:
s2.add(cmd)
if s1.paramnames is not None:
if s2.paramnames is None:
s2.paramnames = []
for param in s1.paramnames:
if param not in s2.paramnames:
s2.paramnames.append(param)
metacommandlist.add(r'\s*EXTEND\s+SCRIPT\s+(?P\w+)\s+WITH\s+SCRIPT\s+(?P\w+)\s*$', x_extendscript)
metacommandlist.add(r'\s*APPEND\s+SCRIPT\s+(?P\w+)\s+TO\s+(?P\w+)\s*$', x_extendscript)
#**** EXTEND SCRIPT WITH METACOMMAND
def x_extendscript_metacommand(**kwargs):
script = kwargs["script"].lower()
if script not in savedscripts:
raise ErrInfo("cmd", other_msg="There is no SCRIPT named %s." % script)
script_line_no = current_script_line()
savedscripts[script].add(ScriptCmd(script_file, script_line_no, 'cmd', MetacommandStmt(kwargs["cmd"])))
metacommandlist.add(r'\s*EXTEND\s+SCRIPT\s+(?P