Introduction

One of the common problems in software development is dealing with storage. When writing micro-services you often are required to store application data in some type of long term storage (filesystem, NFS, SQL, Cloud storage by X vendor, ...).

Instead of choosing a storage system and writing code to that storage system's API, it is often better to extract your use cases into a storage API and write concrete implementations that implement the API.

This provides multiple storage solution choices, simple migration strategies, multi-vendor support (in cloud or database vendors), ...

This is different than abstraction layers that your programming language may provide for a type of storage (Go for example has io.ReadWriter, io.Reader, io.Writer abstractions for filesystems and package sql for SQL storage systems). Our abstraction is for application specific data which may use these lower level abstractions when implemented.

NOTE: Like most articles I write, this article will use the Go language for all examples, but the methodology is valid for other languages as well.

A simplistic storage API

For this article we are going to implement a storage API for an application that reads and writes employee records. We will not detail the service itself, just the storage layer.

Below is our storage interface packaged as storage.Employee.
The Employee interface can be implemented by many storage systems.

package storage

// NotFoundError indicates that a record could not be located.
// This differentiates between not finding a record and the
// storage layer having an error.
type NotFoundError struct{
    error
}

func (n NotFoundError) isNotFound(){}

// NotFound indicates if the error is that the ID could
// not be found.
func NotFound(e error) bool {
	if _, ok := e.(NotFoundError); ok {
		return true
	}
	return false
}

// EmployeeRec represents an employee record.
type EmployeeRec struct {
	// ID is the employee ID.
	ID uint64
	// First and Last are the first and last name of the employee
	First, Last string
	// Title is the employee's title.
	Title string
	// Dept is the employee's department number.
	Dept uint8
}

// Validate validates the fields are valid.
func (e *EmployeeRec) Validate() error {
	if e.ID == 0 {
		return errors.New("ID field cannot be 0")
	}
	
	switch "" {
	case strings.TrimSpace(e.First):
		return errors.New("First field cannot be empty string")
	case strings.TrimSpace(e.Last):
		return errors.New("Last field cannot be empty string")
	case strings.TrimSpace(e.Title):
		return errors.New("Title field cannot be empty string")
	}
	
	if e.Dept == 0 {
		return errors.New("Dept field cannot be 0")
	}
	return nil
}

// EmployeeSearch returns a single result of a search of 
// employee records.
type EmployeeSearch struct {
	// Rec exists if a valid response was returned.
	Rec *EmployeeRec
	// Err exists if the storage system had an error mid search.
	Err error
}

// Employee allows access to the system storing employee records.	
type Employee interface {
	// Get retrieves an employee record by their employee ID.
	Get(id uint64) (*EmployeeRec, error)
	// Put stores a record.
	Put(r *EmployeeRec) error
	// Search searches for a record matching on all fields
	// that do not have the zero value for that field type.
	Search(r EmployeeRec) (chan *EmployeeSearch, error)
}

Development storage is not production storage

By abstracting storage into an API we now have the ability to use multiple storage implementations. For development purposes, the first implementation I write is an "in-memory" implementation.

The "in-memory" representation allows tests to use a storage implementation that provides automatic cleanup at the end of any tests. Other systems doing integration tests can spin up the service and not worry about system cleanup or storage setup.

Finally this method prevents a user running the system locally from the requirement to spin up the storage mechanisms, which might require access permissions or creation of databases and tables.

package inmemory

import ".../storage"

// Employee implements storage.Employee.
type Employee struct {
	store map[uint64]*storage.EmployeeRec
}

// New is the constructor for Employee.
func New() storage.Employee {
	return &Employee{store: map[uint64]*storage.EmployeeRec{}}
}

// Get implements storage.Employee.Get().
func (e *Employee) Get(id uint64) (*storage.EmployeeRec, error) {
	v, ok := e.store[id]
	if !ok {
		return nil, storage.NotFoundError{fmt.Errorf("could not find id %d", id)}
	}
	return v, nil
}

// Put implements storage.Employee.Put().
func (e *Employee) Put(r *storage.EmployeeRec) error {
	if err := r.Validate(); err != nil {
		return fmt.Errorf("cannot store record: %s", err)
	}
	e.store[r.ID] = r
	return nil
}

// Search implements storage.Employee.Search().
func (e *Employee) Search(s storage.EmployeeRec) (chan storage.EmployeeSearch, error) {
	ch := make(chan storage.EmploySearch, 10)
	go func() {
		defer close(ch)
		for _, v := range e.store {
			if s.ID != 0 {
				if s.ID != v.ID {
					continue
				}
			}
			if s.Last != "" {
				if s.Last != v.Last {
					continue
				}
			}
			if s.First != "" {
				if s.First != v.First {
					continue
				}
			}
			if s.Title != "" {
				if s.Title != v.Title{
					continue
				}
			}
			if s.Dept != 0 {
				if s.Dept != v.Dept{
					continue
				}
			}
			ch <- storage.EmployeeSearch{Rec: v}
		}
	}()
	return ch, nil
}

The above implements an "in-memory" storage implementation of storage.Employee. This implementation is not highly optimized, using O^n search for example, which is fine when n is small, such as the tests where this will be used.

Choosing storage based on flags

When starting our application it is easy to choose what type of storage. For example, say we have our "in-memory" implementation and a MySql implementation:

package main

import (
	"flag"
	"os"

	".../server"
	".../storage"
	".../storage/inmemory"
	".../storage/mysql"
)

var (
	inmemoryStore = flag.Bool("inmemory", false, "Use the in-memory storage implementation, useful for tests and experimentation.")
	mysqlStore = flag.Bool("mysql", false, "Use a MySQL storage layer.  Must set certain env variables.")
)

func main() {
	var store storage.Employee

	switch true {
	case *inmemoryStore:
		store = inmemory.New()
	case *mysqlStore:
		var err error
		u := os.Getenv("mysqlUsr")
		p := os.Getenv("mysqlPass")
		a := os.Getenv("mysqlAddr")
		store, err = mysql.New(a, u, p)
		if err != nil {
			panic(err)
		}
	default:
		panic("must set either --inmemory or --mysql")
	}
	
	s, err := server.New(store)
	if err != nil {
		panic(err)
	}

	// Blocks forever unless system error.
	if err := s.Run(); err != nil {
		panic(err)
	}
}

Our application can simply choose what storage system to use based on a passed flag. Adding additional storage layers is also as simple as adding new case statements.

Add new storage with ease

Various scenarios can occur that require changing your storage system. This could be:

  • A new storage system with less support costs becomes available
  • Your storage system is being phased out for a new storage system
  • The storage system no longer meet your needs
  • Rising storage costs from a vendor
  • Switching cloud vendors or utilizing multiple cloud vendors

By implementing storage behind an API, you only need to write the new implementation. Once the implementation is completed, you can:

  • Create simple migration tools between any two storage systems
  • Create a unified benchmark suite to test performance of each implementation

A simple migration tool might look like:

package main

import (
	".../storage"
	".../storage/mysql"
	".../storage/postres"
)

func main() {
	// Grab the address, user, and password to the mysql storage
	// from an environmental variable.
	fu := os.Getenv("mysqlUsr")
	fp := os.Getenv("mysqlPass")
	fAddr := os.Getenv("mysqlAddr")
	
	// Grab the address, user, and password to the postgres storage
	// from an environmental variable.
	tu := os.Getenv("postgresUser")
	tp := os.Getenv("postgresPass")
	tAddr := os.Getenv("postgresAddr")

	// Let's copy from a mysql version of the storage.
	from, err := mysql.New(fAddr, fu, fp)
	if err != nil {
		panic(err)
	}
	
	// Let's copy to a postgres version of the storage.
	to, err := postgres.New(tAddr, tu, tp)
	if err != nil {
		panic(err)
	}
	
	// Search for all records.
	ch, err := from.Search(storage.Record{})
	if err != nil {
		panic(err)
	}
	
	// Write all records.
	for sr := range ch {
		if sr.Err != nil {
			panic(err)
		}
		if err := to.Put(sr.Rec); err != nil {
			panic(err)
		}
	}
}

NOTE: Not highly optimized and does not include any retries in case of errors.

Only write the tests once

Testing storage systems is a complicated subject because of the requirements needed to test an implementation.

  • Do you have a real integration test or mock implementations of the storage system?
  • How does test turnup/turndown work?
  • ...

While you still have to figure out how that process works for any storage systems, the tests only must be written once for any implementation.

You write tests for the Storage API calls once and simply add storage implementations to the test suite. This greatly simplifies your testing if your application supports multiple storage mechanisms. No tests for MySQL storage and CloudSQL, just a single unified test against the API.

...

var stores = map[string]storage.Employee{}

func init() {
	stores["in-memory"] = inmemory.New()
}

func TestGet(t *testing.T) {
	rec := &EmployeeRec{
		First: "John",
		Last: "Doe",
		ID: 1,
		Title: "unknown",
		Dept: 1,
	}

	tests := []struct{
		desc string
		id uint64
		want *EmployeeRec
		err bool
		notFound bool
	}{
		{
			desc: "BadID",
			id: 0,
			err: true, 
		},
		{
			desc: "Not found",
			id: 3,
			err: true,
			notFound: true,
		},
		{
			desc: "Success",
			id: 1,
			want: rec,
		},
	}

	for k, store := range stores {
		if err := store.Put(rec); err != nil {
			t.Errorf("TestGet(%s): %s", k, err)
			continue
		}
		for _, tc := range tests {
			r, err := store.Get(tc.id)
			switch {
			case tc.err && err == nil:
				t.Errorf("TestGet(%s)(%s): got err == nil, want err != nil", tc.desc, k)
				continue
			case !tc.err && err != nil:
				t.Errorf("TestGet(%s)(%s): got err == %s, want err == nil", tc.desc, k, err)
				continue 
			case tc.err && tc.notFound:
				if _, ok := err.(storage.NotFoundError); !ok {
					t.Errorf("TestGet(%s)(%s): got error, but it was not of type NotFoundError", tc.desc, k)
                }
                continue
			case tc.err:
				continue
			}
		}
		if diff := pretty.Compare(tc.want, r); diff != "" {
			t.Errorf("TestGet(%s)(%s): -want/+got:\n%s", tc.desc, k, diff)
		}
	}
}

The above example will test the Get() method of a storage implementation. When adding a new storage implementation that requires a test, you simply need to add a new storage.Employee to the store variable in the init() function.

Summary

Wrapping your storage layer in abstraction allows:

  • Adding new storage solutions quickly
  • Development and integration tests can use storage more suited to those needs
  • Write tests and benchmarks once to support multiple storage solutions
  • Reusable code that provides migration support