Module delta.subcommands.classify

Classify input images given a model.

Expand source code
# Copyright © 2020, United States Government, as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All rights reserved.
#
# The DELTA (Deep Earth Learning, Tools, and Analysis) platform is
# licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Classify input images given a model.
"""
import os.path

import sys
import time
import math
import csv

import shapely
from shapely import wkt
from osgeo import gdal

import numpy as np
import matplotlib
from packaging import version
import tensorflow as tf
import tensorflow.keras.metrics #pylint: disable=no-name-in-module

from delta.config import config
from delta.config.extensions import image_writer
from delta.imagery.rectangle import Rectangle
from delta.ml import predict
from delta.ml.io import load_model
from delta.ml.config_parser import metric_from_dict


matplotlib.use('Agg')
import matplotlib.pyplot as plt #pylint: disable=wrong-import-order,wrong-import-position,ungrouped-imports

def save_confusion(cm, class_labels, filename):
    f = plt.figure()
    ax = f.add_subplot(1, 1, 1)
    image = ax.imshow(cm, interpolation='nearest', cmap=plt.get_cmap('inferno'))
    ax.set_title('Confusion Matrix')
    f.colorbar(image)
    ax.set_xlim(-0.5, cm.shape[0] - 0.5)
    ax.set_ylim(-0.5, cm.shape[0] - 0.5)
    ax.set_xticks(range(cm.shape[0]))
    ax.set_yticks(range(cm.shape[0]))
    ax.set_xticklabels(class_labels)
    ax.set_yticklabels(class_labels)
    m = cm.max()
    total = cm.sum()

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, '%d\n%.2g%%' % (cm[i, j], cm[i, j] / total * 100), horizontalalignment='center',
                    color='white' if cm[i, j] < m / 2 else 'black')
    ax.set_ylabel('True Label')
    ax.set_xlabel('Predicted Label')
    f.savefig(filename)

def ae_convert(data):
    r = np.clip((data[:, :, [4, 2, 1]]  * np.float32(100.0)), 0.0, 255.0).astype(np.uint8)
    return r

def print_classes(output_file, cm, metrics, comment):

    if output_file is not None:
        file_handle = open(output_file, 'a')

    print(comment)
    if output_file is not None:
        file_handle.write(comment + '\n')
    for i in range(cm.shape[0]):
        name = config.dataset.classes[i].name if \
               len(config.dataset.classes) == cm.shape[0] else ('Class %d' % (i))
        with np.errstate(invalid='ignore'):
            precision_percent = np.nan_to_num(cm[i,i] / np.sum(cm[:, i]) * 100) # Column = predictions
            recall_percent = np.nan_to_num(cm[i,i] / np.sum(cm[i, :]) * 100) # Row = actual values
            s = ('%s--- Precision: %6.2f%%    Recall: %6.2f%%        Frequency: %6.2f%%' %
                 (name.ljust(20), precision_percent, recall_percent,
                  float(np.sum(cm[i, :] / np.sum(cm)) * 100)))
            print(s)
            if output_file is not None:
                file_handle.write(s + '\n')
    s = '%6.2f%% Accuracy' % (float(np.sum(np.diag(cm)) / np.sum(cm) * 100))
    print(s)
    if output_file is not None:
        file_handle.write(s + '\n')

    s = ''
    for m in metrics:
        s += '%s = %8.4f ' % (m.name, float(m.result()))
    print(s)
    if output_file is not None:
        file_handle.write(s + '\n')
        file_handle.close()


class LossToMetricWrapper(tensorflow.keras.metrics.Metric):
    """Wrap a Loss object to make it behave like a Metric object"""
    def __init__(self, loss_object):
        super().__init__(name=loss_object.name)
        self._loss_object = loss_object
        self._moving_average = 0.0
        self._total_count = 0

    def update_state(self, y_true, y_pred, sample_weight=None): #pylint: disable=unused-argument, arguments-differ
        this_loss = self._loss_object.call(y_true, y_pred)
        if isinstance(this_loss, tf.Tensor):
            this_loss = this_loss.numpy()
        elif not isinstance(this_loss, np.ndarray):
            this_loss = np.ndarray(this_loss)
        self._total_count += y_true.size
        self._moving_average += (this_loss.mean() - self._moving_average) * (y_true.size / self._total_count)

    def result(self):
        return self._moving_average

def get_metrics():
    """Returns a list of specified metrics, wrapping up losses as metrics"""
    if config.classify.metrics() is None:
        return []
    metrics = [metric_from_dict(m) for m in config.classify.metrics()]
    metrics = [LossToMetricWrapper(m) if issubclass(type(m), tf.keras.losses.Loss) else m for m in metrics]
    return metrics

def classify_image(model, image, label, path, net_name, options,
                   shapes=None, persistent_metrics=None):
    '''Classify an image and return the confusion matrix and metrics if labels were provided'''
    out_path, base_name = os.path.split(path)
    base_name = os.path.splitext(base_name)[0]
    base_out = (options.outprefix if options.outprefix else net_name + '_') + base_name + '.tiff'


    # check if is subdirectory
    if options.basedir and os.path.abspath(options.basedir) == \
            os.path.commonpath([os.path.abspath(options.basedir), os.path.abspath(out_path)]):
        out_path = os.path.relpath(out_path, options.basedir)
    else:
        out_path = ''
    if options.outdir:
        out_path = os.path.join(options.outdir, out_path)
    if out_path:
        os.makedirs(out_path, exist_ok=True)

    writer = image_writer('tiff')

    error_image = None
    if label:
        assert image.size() == label.size(), 'Image and label do not match.'
        if options.error_abs:
            error_image = writer(os.path.join(out_path, 'error_abs_' + base_out))
        elif options.error:
            error_image = writer(os.path.join(out_path, 'error_' + base_out))

    prob_image = writer(os.path.join(out_path, base_out)) if config.classify.prob_image() else None
    output_image = writer(os.path.join(out_path, base_out)) if not config.classify.prob_image() else None

    ts = config.io.tile_size()
    roi = get_roi_containing_shapes(shapes)

    metrics = None
    num_temp_metrics = 0
    if options.autoencoder:
        label = image
        predictor = predict.ImagePredictor(model, ts, output_image, True, base_name,
                                           None if options.noColormap else (ae_convert, np.uint8, 3))
    else:
        colors = list(map(lambda x: x.color, config.dataset.classes))
        if options.noColormap:
            colors=None # Forces raw one channel output
        if label:
            metrics = get_metrics() # For single images
            num_temp_metrics = len(metrics)
            if persistent_metrics is not None: # Persistent metrics over all images
                metrics = metrics + persistent_metrics
        predictor = predict.LabelPredictor(model, ts, output_image, True, base_name, colormap=colors,
                                           prob_image=prob_image, error_image=error_image, error_abs=options.error_abs,
                                           metrics=metrics)

    overlap = (config.classify.overlap(), config.classify.overlap())
    try:
        if config.general.gpus() == 0:
            with tf.device('/cpu:0'):
                predictor.predict(image, label, overlap=overlap, input_bounds=roi, roi_shapes=shapes)
        else:
            predictor.predict(image, label, overlap=overlap, input_bounds=roi, roi_shapes=shapes)
    except KeyboardInterrupt:
        print('\nAborted.')
        sys.exit(0)

    #if options.autoencoder:
    #    write_tiff('orig_' + net_name + '_' + base_name + '.tiff',
    #               image.read() if options.noColormap else ae_convert(image.read()),
    #               metadata=image.metadata())

    if label:
        cm = predictor.confusion_matrix()
        class_names = list(map(lambda x: x.name, config.dataset.classes))
        if len(config.dataset.classes) != cm.shape[0]:
            class_names = list(map(lambda x: 'Class %d' % (x), range(cm.shape[0])))
        if options.confusion and (not shapes):
            save_confusion(cm, class_names,
                           os.path.join(out_path, 'confusion_' + os.path.splitext(base_out)[0] + '.pdf'))
        metrics = predictor.metrics()
        metrics, persistent_metrics = (metrics[0:num_temp_metrics], metrics[num_temp_metrics:])
        return cm, metrics, persistent_metrics
    return None, None, None

def get_wkt_path(image_path, wkt_folder=None):
    '''Return the path to where the WKT file for an image should be'''
    WKT_EXTENSION = '.wkt.csv'
    if wkt_folder:
        p = os.path.join(wkt_folder, os.path.basename(image_path))
        path = os.path.splitext(p)[0] + WKT_EXTENSION
    else:
        path = os.path.splitext(image_path)[0] + WKT_EXTENSION
    return path

def load_shapes_matching_tag(wkt_path, tag):
    '''Returns a list of all the shapes defined for this tag in the WKT file.
       If tag is None, return untagged regions'''
    shapes = []
    with open(wkt_path, 'r') as f:
        reader = csv.reader(f, skipinitialspace=True)
        for row in reader:
            raw_line = ', '.join(row)
            if ('POLYGON' not in raw_line) or (len(row) != 2):
                continue
            names = row[1].strip().split(',')
            if not tag and (names == ['']): # Match non-tagged regions
                s = wkt.loads(row[0])
                shapes.append(s)
            else:
                for n in names: # Check each tag of the region
                    if tag == n.strip():
                        s = wkt.loads(row[0])
                        shapes.append(s)
                        continue
    return shapes

# MOVE
def get_roi_containing_shapes(shapes) -> Rectangle:
    '''Return a Rectangle containing all the shapes or None if none were passed in'''
    if not shapes:
        return None
    roi = Rectangle(*shapes[0].bounds)
    for i in range(1,len(shapes)):
        new_roi = Rectangle(*shapes[i].bounds)
        roi.expand_to_contain_rect(new_roi)
    # Convert to integer values
    return Rectangle(int(math.floor(roi.min_x)),
                     int(math.floor(roi.min_y)),
                     int(math.ceil(roi.max_x)),
                     int(math.ceil(roi.max_y)))

def shapes_to_pixel_coordinates(shapes, image_path):
    '''Convert any shapes not in pixel coordinates to pixel coordinates.
       Always returns a list of Polygon objects, even if the input list
       contains MultiPolygon objects.'''
    handle = gdal.Open(image_path)
    transform = handle.GetGeoTransform()

    def apply_transform(coord, transform):
        return ((coord[0] - transform[0]) / transform[1],
                (coord[1] - transform[3]) / transform[5])

    output_shapes = []
    for s in shapes:
        if s.geom_type == 'Polygon':
            coord_list = [apply_transform(c, transform) for c in s.exterior.coords]
            interior_coord_lists = []
            for i in s.interiors:
                these_coords = [apply_transform(c, transform) for c in i.coords]
                interior_coord_lists.append(these_coords)
            output_shapes.append(shapely.geometry.Polygon(coord_list, interior_coord_lists))
            continue
        if s.geom_type == 'MultiPolygon':
            for g in s.geoms:
                coord_list = [apply_transform(c, transform) for c in g.exterior.coords]
                interior_coord_lists = []
                for i in g.interiors:
                    these_coords = [apply_transform(c, transform) for c in i.coords]
                    interior_coord_lists.append(these_coords)
                output_shapes.append(shapely.geometry.Polygon(coord_list, interior_coord_lists))
            continue
        raise Exception('Unrecognized shape type: ' + s.geom_type)
    return output_shapes

def load_wkt_shapes(wkt_path, image_path, region_name):
    '''Loads shapes (in image coordinates) from an image's WKT file'''

    if os.path.isfile(wkt_path):
        geo_shapes = load_shapes_matching_tag(wkt_path, region_name)
        if geo_shapes:
            return shapes_to_pixel_coordinates(geo_shapes, image_path)
    return []

def main(options): #pylint: disable=R0912
    if version.parse(tf.__version__) < version.parse('2.2'): # eager execution not default
        tf.config.experimental_run_functions_eagerly(True)

    model = load_model(options.model)

    start_time = time.time()
    images = config.dataset.images()
    labels = config.dataset.labels()
    net_name = os.path.splitext(os.path.basename(options.model))[0]

    if len(images) == 0:
        print('No images specified.')
        return 0

    result_file = net_name + '_results.txt'
    if result_file and os.path.exists(result_file):
        os.remove(result_file)

    if options.autoencoder or not labels:
        labels = None
        regions = ['all'] # Each whole image
    else:
        regions = ['all', 'no_label'] # Also individual regions without labels
        specified_regions = config.classify.regions()
        if specified_regions:
            regions += specified_regions

    for region_name in regions:
        # If there are multiple images we need to maintain separate metric objects
        # to keep summary statistics over all the images
        full_cm = None
        full_metrics = get_metrics() if len(images) > 1 else None
        for (i, image_path) in enumerate(images):
            this_image = images.load(i)
            wkt_path = get_wkt_path(image_path, config.classify.wkt_dir())
            shapes = None

            if region_name == 'no_label':
                # Individually compute untagged regions
                shapes = load_wkt_shapes(wkt_path, image_path, None)
                for s in shapes:
                    cm, metrics, full_metrics = classify_image(model, this_image, labels.load(i),
                                                               image_path, net_name, options,
                                                               shapes=[s], persistent_metrics=None)
                    print_classes(result_file, cm, metrics, 'For image ' + image_path + ',  shape: ' + str(s))
                continue

            # Load shapes from file if they are specified for this image/region pair
            if region_name != 'all':
                shapes = load_wkt_shapes(wkt_path, image_path, region_name)
                if not shapes:
                    continue # This region name not specified for this image

            cm, metrics, full_metrics = classify_image(model, this_image,
                                                       labels.load(i) if labels else None,
                                                       image_path, net_name, options,
                                                       shapes, persistent_metrics=full_metrics)
            if cm is not None:
                if (region_name == 'all') and (len(images) > 1):
                    print_classes(result_file, cm, metrics, 'Image: %s' % (image_path))
                if full_cm is None:
                    full_cm = np.copy(cm).astype(np.int64)
                else:
                    full_cm += cm
                if len(images) == 1: # So the statistics are printed properly outside the loop
                    full_metrics = metrics
        if labels and (full_cm is not None):
            print_classes(result_file, full_cm, full_metrics, 'Overall:' if region_name == 'all' else region_name + ':')
    stop_time = time.time()
    print('Elapsed time = ', stop_time - start_time)
    return 0

Functions

def ae_convert(data)
Expand source code
def ae_convert(data):
    r = np.clip((data[:, :, [4, 2, 1]]  * np.float32(100.0)), 0.0, 255.0).astype(np.uint8)
    return r
def classify_image(model, image, label, path, net_name, options, shapes=None, persistent_metrics=None)

Classify an image and return the confusion matrix and metrics if labels were provided

Expand source code
def classify_image(model, image, label, path, net_name, options,
                   shapes=None, persistent_metrics=None):
    '''Classify an image and return the confusion matrix and metrics if labels were provided'''
    out_path, base_name = os.path.split(path)
    base_name = os.path.splitext(base_name)[0]
    base_out = (options.outprefix if options.outprefix else net_name + '_') + base_name + '.tiff'


    # check if is subdirectory
    if options.basedir and os.path.abspath(options.basedir) == \
            os.path.commonpath([os.path.abspath(options.basedir), os.path.abspath(out_path)]):
        out_path = os.path.relpath(out_path, options.basedir)
    else:
        out_path = ''
    if options.outdir:
        out_path = os.path.join(options.outdir, out_path)
    if out_path:
        os.makedirs(out_path, exist_ok=True)

    writer = image_writer('tiff')

    error_image = None
    if label:
        assert image.size() == label.size(), 'Image and label do not match.'
        if options.error_abs:
            error_image = writer(os.path.join(out_path, 'error_abs_' + base_out))
        elif options.error:
            error_image = writer(os.path.join(out_path, 'error_' + base_out))

    prob_image = writer(os.path.join(out_path, base_out)) if config.classify.prob_image() else None
    output_image = writer(os.path.join(out_path, base_out)) if not config.classify.prob_image() else None

    ts = config.io.tile_size()
    roi = get_roi_containing_shapes(shapes)

    metrics = None
    num_temp_metrics = 0
    if options.autoencoder:
        label = image
        predictor = predict.ImagePredictor(model, ts, output_image, True, base_name,
                                           None if options.noColormap else (ae_convert, np.uint8, 3))
    else:
        colors = list(map(lambda x: x.color, config.dataset.classes))
        if options.noColormap:
            colors=None # Forces raw one channel output
        if label:
            metrics = get_metrics() # For single images
            num_temp_metrics = len(metrics)
            if persistent_metrics is not None: # Persistent metrics over all images
                metrics = metrics + persistent_metrics
        predictor = predict.LabelPredictor(model, ts, output_image, True, base_name, colormap=colors,
                                           prob_image=prob_image, error_image=error_image, error_abs=options.error_abs,
                                           metrics=metrics)

    overlap = (config.classify.overlap(), config.classify.overlap())
    try:
        if config.general.gpus() == 0:
            with tf.device('/cpu:0'):
                predictor.predict(image, label, overlap=overlap, input_bounds=roi, roi_shapes=shapes)
        else:
            predictor.predict(image, label, overlap=overlap, input_bounds=roi, roi_shapes=shapes)
    except KeyboardInterrupt:
        print('\nAborted.')
        sys.exit(0)

    #if options.autoencoder:
    #    write_tiff('orig_' + net_name + '_' + base_name + '.tiff',
    #               image.read() if options.noColormap else ae_convert(image.read()),
    #               metadata=image.metadata())

    if label:
        cm = predictor.confusion_matrix()
        class_names = list(map(lambda x: x.name, config.dataset.classes))
        if len(config.dataset.classes) != cm.shape[0]:
            class_names = list(map(lambda x: 'Class %d' % (x), range(cm.shape[0])))
        if options.confusion and (not shapes):
            save_confusion(cm, class_names,
                           os.path.join(out_path, 'confusion_' + os.path.splitext(base_out)[0] + '.pdf'))
        metrics = predictor.metrics()
        metrics, persistent_metrics = (metrics[0:num_temp_metrics], metrics[num_temp_metrics:])
        return cm, metrics, persistent_metrics
    return None, None, None
def get_metrics()

Returns a list of specified metrics, wrapping up losses as metrics

Expand source code
def get_metrics():
    """Returns a list of specified metrics, wrapping up losses as metrics"""
    if config.classify.metrics() is None:
        return []
    metrics = [metric_from_dict(m) for m in config.classify.metrics()]
    metrics = [LossToMetricWrapper(m) if issubclass(type(m), tf.keras.losses.Loss) else m for m in metrics]
    return metrics
def get_roi_containing_shapes(shapes) ‑> Rectangle

Return a Rectangle containing all the shapes or None if none were passed in

Expand source code
def get_roi_containing_shapes(shapes) -> Rectangle:
    '''Return a Rectangle containing all the shapes or None if none were passed in'''
    if not shapes:
        return None
    roi = Rectangle(*shapes[0].bounds)
    for i in range(1,len(shapes)):
        new_roi = Rectangle(*shapes[i].bounds)
        roi.expand_to_contain_rect(new_roi)
    # Convert to integer values
    return Rectangle(int(math.floor(roi.min_x)),
                     int(math.floor(roi.min_y)),
                     int(math.ceil(roi.max_x)),
                     int(math.ceil(roi.max_y)))
def get_wkt_path(image_path, wkt_folder=None)

Return the path to where the WKT file for an image should be

Expand source code
def get_wkt_path(image_path, wkt_folder=None):
    '''Return the path to where the WKT file for an image should be'''
    WKT_EXTENSION = '.wkt.csv'
    if wkt_folder:
        p = os.path.join(wkt_folder, os.path.basename(image_path))
        path = os.path.splitext(p)[0] + WKT_EXTENSION
    else:
        path = os.path.splitext(image_path)[0] + WKT_EXTENSION
    return path
def load_shapes_matching_tag(wkt_path, tag)

Returns a list of all the shapes defined for this tag in the WKT file. If tag is None, return untagged regions

Expand source code
def load_shapes_matching_tag(wkt_path, tag):
    '''Returns a list of all the shapes defined for this tag in the WKT file.
       If tag is None, return untagged regions'''
    shapes = []
    with open(wkt_path, 'r') as f:
        reader = csv.reader(f, skipinitialspace=True)
        for row in reader:
            raw_line = ', '.join(row)
            if ('POLYGON' not in raw_line) or (len(row) != 2):
                continue
            names = row[1].strip().split(',')
            if not tag and (names == ['']): # Match non-tagged regions
                s = wkt.loads(row[0])
                shapes.append(s)
            else:
                for n in names: # Check each tag of the region
                    if tag == n.strip():
                        s = wkt.loads(row[0])
                        shapes.append(s)
                        continue
    return shapes
def load_wkt_shapes(wkt_path, image_path, region_name)

Loads shapes (in image coordinates) from an image's WKT file

Expand source code
def load_wkt_shapes(wkt_path, image_path, region_name):
    '''Loads shapes (in image coordinates) from an image's WKT file'''

    if os.path.isfile(wkt_path):
        geo_shapes = load_shapes_matching_tag(wkt_path, region_name)
        if geo_shapes:
            return shapes_to_pixel_coordinates(geo_shapes, image_path)
    return []
def main(options)
Expand source code
def main(options): #pylint: disable=R0912
    if version.parse(tf.__version__) < version.parse('2.2'): # eager execution not default
        tf.config.experimental_run_functions_eagerly(True)

    model = load_model(options.model)

    start_time = time.time()
    images = config.dataset.images()
    labels = config.dataset.labels()
    net_name = os.path.splitext(os.path.basename(options.model))[0]

    if len(images) == 0:
        print('No images specified.')
        return 0

    result_file = net_name + '_results.txt'
    if result_file and os.path.exists(result_file):
        os.remove(result_file)

    if options.autoencoder or not labels:
        labels = None
        regions = ['all'] # Each whole image
    else:
        regions = ['all', 'no_label'] # Also individual regions without labels
        specified_regions = config.classify.regions()
        if specified_regions:
            regions += specified_regions

    for region_name in regions:
        # If there are multiple images we need to maintain separate metric objects
        # to keep summary statistics over all the images
        full_cm = None
        full_metrics = get_metrics() if len(images) > 1 else None
        for (i, image_path) in enumerate(images):
            this_image = images.load(i)
            wkt_path = get_wkt_path(image_path, config.classify.wkt_dir())
            shapes = None

            if region_name == 'no_label':
                # Individually compute untagged regions
                shapes = load_wkt_shapes(wkt_path, image_path, None)
                for s in shapes:
                    cm, metrics, full_metrics = classify_image(model, this_image, labels.load(i),
                                                               image_path, net_name, options,
                                                               shapes=[s], persistent_metrics=None)
                    print_classes(result_file, cm, metrics, 'For image ' + image_path + ',  shape: ' + str(s))
                continue

            # Load shapes from file if they are specified for this image/region pair
            if region_name != 'all':
                shapes = load_wkt_shapes(wkt_path, image_path, region_name)
                if not shapes:
                    continue # This region name not specified for this image

            cm, metrics, full_metrics = classify_image(model, this_image,
                                                       labels.load(i) if labels else None,
                                                       image_path, net_name, options,
                                                       shapes, persistent_metrics=full_metrics)
            if cm is not None:
                if (region_name == 'all') and (len(images) > 1):
                    print_classes(result_file, cm, metrics, 'Image: %s' % (image_path))
                if full_cm is None:
                    full_cm = np.copy(cm).astype(np.int64)
                else:
                    full_cm += cm
                if len(images) == 1: # So the statistics are printed properly outside the loop
                    full_metrics = metrics
        if labels and (full_cm is not None):
            print_classes(result_file, full_cm, full_metrics, 'Overall:' if region_name == 'all' else region_name + ':')
    stop_time = time.time()
    print('Elapsed time = ', stop_time - start_time)
    return 0
def print_classes(output_file, cm, metrics, comment)
Expand source code
def print_classes(output_file, cm, metrics, comment):

    if output_file is not None:
        file_handle = open(output_file, 'a')

    print(comment)
    if output_file is not None:
        file_handle.write(comment + '\n')
    for i in range(cm.shape[0]):
        name = config.dataset.classes[i].name if \
               len(config.dataset.classes) == cm.shape[0] else ('Class %d' % (i))
        with np.errstate(invalid='ignore'):
            precision_percent = np.nan_to_num(cm[i,i] / np.sum(cm[:, i]) * 100) # Column = predictions
            recall_percent = np.nan_to_num(cm[i,i] / np.sum(cm[i, :]) * 100) # Row = actual values
            s = ('%s--- Precision: %6.2f%%    Recall: %6.2f%%        Frequency: %6.2f%%' %
                 (name.ljust(20), precision_percent, recall_percent,
                  float(np.sum(cm[i, :] / np.sum(cm)) * 100)))
            print(s)
            if output_file is not None:
                file_handle.write(s + '\n')
    s = '%6.2f%% Accuracy' % (float(np.sum(np.diag(cm)) / np.sum(cm) * 100))
    print(s)
    if output_file is not None:
        file_handle.write(s + '\n')

    s = ''
    for m in metrics:
        s += '%s = %8.4f ' % (m.name, float(m.result()))
    print(s)
    if output_file is not None:
        file_handle.write(s + '\n')
        file_handle.close()
def save_confusion(cm, class_labels, filename)
Expand source code
def save_confusion(cm, class_labels, filename):
    f = plt.figure()
    ax = f.add_subplot(1, 1, 1)
    image = ax.imshow(cm, interpolation='nearest', cmap=plt.get_cmap('inferno'))
    ax.set_title('Confusion Matrix')
    f.colorbar(image)
    ax.set_xlim(-0.5, cm.shape[0] - 0.5)
    ax.set_ylim(-0.5, cm.shape[0] - 0.5)
    ax.set_xticks(range(cm.shape[0]))
    ax.set_yticks(range(cm.shape[0]))
    ax.set_xticklabels(class_labels)
    ax.set_yticklabels(class_labels)
    m = cm.max()
    total = cm.sum()

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, '%d\n%.2g%%' % (cm[i, j], cm[i, j] / total * 100), horizontalalignment='center',
                    color='white' if cm[i, j] < m / 2 else 'black')
    ax.set_ylabel('True Label')
    ax.set_xlabel('Predicted Label')
    f.savefig(filename)
def shapes_to_pixel_coordinates(shapes, image_path)

Convert any shapes not in pixel coordinates to pixel coordinates. Always returns a list of Polygon objects, even if the input list contains MultiPolygon objects.

Expand source code
def shapes_to_pixel_coordinates(shapes, image_path):
    '''Convert any shapes not in pixel coordinates to pixel coordinates.
       Always returns a list of Polygon objects, even if the input list
       contains MultiPolygon objects.'''
    handle = gdal.Open(image_path)
    transform = handle.GetGeoTransform()

    def apply_transform(coord, transform):
        return ((coord[0] - transform[0]) / transform[1],
                (coord[1] - transform[3]) / transform[5])

    output_shapes = []
    for s in shapes:
        if s.geom_type == 'Polygon':
            coord_list = [apply_transform(c, transform) for c in s.exterior.coords]
            interior_coord_lists = []
            for i in s.interiors:
                these_coords = [apply_transform(c, transform) for c in i.coords]
                interior_coord_lists.append(these_coords)
            output_shapes.append(shapely.geometry.Polygon(coord_list, interior_coord_lists))
            continue
        if s.geom_type == 'MultiPolygon':
            for g in s.geoms:
                coord_list = [apply_transform(c, transform) for c in g.exterior.coords]
                interior_coord_lists = []
                for i in g.interiors:
                    these_coords = [apply_transform(c, transform) for c in i.coords]
                    interior_coord_lists.append(these_coords)
                output_shapes.append(shapely.geometry.Polygon(coord_list, interior_coord_lists))
            continue
        raise Exception('Unrecognized shape type: ' + s.geom_type)
    return output_shapes

Classes

class LossToMetricWrapper (loss_object)

Wrap a Loss object to make it behave like a Metric object

Expand source code
class LossToMetricWrapper(tensorflow.keras.metrics.Metric):
    """Wrap a Loss object to make it behave like a Metric object"""
    def __init__(self, loss_object):
        super().__init__(name=loss_object.name)
        self._loss_object = loss_object
        self._moving_average = 0.0
        self._total_count = 0

    def update_state(self, y_true, y_pred, sample_weight=None): #pylint: disable=unused-argument, arguments-differ
        this_loss = self._loss_object.call(y_true, y_pred)
        if isinstance(this_loss, tf.Tensor):
            this_loss = this_loss.numpy()
        elif not isinstance(this_loss, np.ndarray):
            this_loss = np.ndarray(this_loss)
        self._total_count += y_true.size
        self._moving_average += (this_loss.mean() - self._moving_average) * (y_true.size / self._total_count)

    def result(self):
        return self._moving_average

Ancestors

  • keras.metrics.base_metric.Metric
  • keras.engine.base_layer.Layer
  • tensorflow.python.module.module.Module
  • tensorflow.python.training.tracking.autotrackable.AutoTrackable
  • tensorflow.python.training.tracking.base.Trackable
  • keras.utils.version_utils.LayerVersionSelector

Methods

def result(self)

Computes and returns the scalar metric value tensor or a dict of scalars.

Result computation is an idempotent operation that simply calculates the metric value using the state variables.

Returns

A scalar tensor, or a dictionary of scalar tensors.

Expand source code
def result(self):
    return self._moving_average
def update_state(self, y_true, y_pred, sample_weight=None)

Accumulates statistics for the metric.

Note: This function is executed as a graph function in graph mode. This means: a) Operations on the same resource are executed in textual order. This should make it easier to do things like add the updated value of a variable to another, for example. b) You don't need to worry about collecting the update ops to execute. All update ops added to the graph by this function will be executed. As a result, code should generally work the same way with graph or eager execution.

Args

*args:
**kwargs
A mini-batch of inputs to the Metric.
Expand source code
def update_state(self, y_true, y_pred, sample_weight=None): #pylint: disable=unused-argument, arguments-differ
    this_loss = self._loss_object.call(y_true, y_pred)
    if isinstance(this_loss, tf.Tensor):
        this_loss = this_loss.numpy()
    elif not isinstance(this_loss, np.ndarray):
        this_loss = np.ndarray(this_loss)
    self._total_count += y_true.size
    self._moving_average += (this_loss.mean() - self._moving_average) * (y_true.size / self._total_count)