diff --git a/cmd/dcrtime_dumpdb/config.go b/cmd/dcrtime_dumpdb/config.go new file mode 100644 index 0000000..d4a0065 --- /dev/null +++ b/cmd/dcrtime_dumpdb/config.go @@ -0,0 +1,67 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "path/filepath" + + "github.com/jessevdk/go-flags" +) + +const defaultConfigFilename = "dcrtimed.conf" + +var ( + defaultConfigFile = filepath.Join(defaultHomeDir, defaultConfigFilename) + defaultBackend = "filesystem" +) + +// config defines the configuration options for dcrtime_fsck +// +// See loadConfig for details on the configuration load process. +type config struct { + HomeDir string `short:"A" long:"appdata" description:"Path to application home directory"` + ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` + ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"` + DataDir string `short:"b" long:"datadir" description:"Directory to store data"` + LogDir string `long:"logdir" description:"Directory to log output."` + TestNet bool `long:"testnet" description:"Use the test network"` + SimNet bool `long:"simnet" description:"Use the simulation test network"` + Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` + CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"` + MemProfile string `long:"memprofile" description:"Write mem profile to the specified file"` + DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` + Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 49152, testnet: 59152)"` + WalletHost string `long:"wallethost" description:"Hostname for wallet server"` + WalletCert string `long:"walletcert" description:"Certificate path for wallet server"` + WalletPassphrase string `long:"walletpassphrase" description:"Passphrase for wallet server"` + Version string + HTTPSCert string `long:"httpscert" description:"File containing the https certificate file"` + HTTPSKey string `long:"httpskey" description:"File containing the https certificate key"` + StoreHost string `long:"storehost" description:"Enable proxy mode - send requests to the specified ip:port"` + StoreCert string `long:"storecert" description:"File containing the https certificate file for storehost"` + EnableCollections bool `long:"enablecollections" description:"Allow clients to query collection timestamps."` + APITokens []string `long:"apitoken" description:"Token used to grant access to privileged API resources"` + APIVersions string `long:"apiversions" description:"Enables API versions on the daemon"` + Backend string `long:"backend" description:"Sets the cache layer type 'filesystem'/'postgres'"` + PostgresHost string `long:"postgreshost" description:"Postgres ip:port"` + PostgresRootCert string `long:"postgresrootcert" description:"File containing the CA certificate for postgres"` + PostgresCert string `long:"postgrescert" description:"File containing the dcrtimed client certificate for postgres"` + PostgresKey string `long:"postgreskey" description:"File containing the dcrtimed client certificate key for postgres"` +} + +// loadConfig initializes and parses the config using a config file +func loadConfig() (*config, error) { + // Default config. + cfg := config{ + Backend: defaultBackend, + } + + err := flags.IniParse(defaultConfigFile, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} diff --git a/cmd/dcrtime_dumpdb/dcrtime_dumpdb.go b/cmd/dcrtime_dumpdb/dcrtime_dumpdb.go index 103af2c..326ae76 100644 --- a/cmd/dcrtime_dumpdb/dcrtime_dumpdb.go +++ b/cmd/dcrtime_dumpdb/dcrtime_dumpdb.go @@ -8,7 +8,9 @@ import ( "github.com/decred/dcrd/chaincfg/v2" "github.com/decred/dcrd/dcrutil/v2" + "github.com/decred/dcrtime/dcrtimed/backend" "github.com/decred/dcrtime/dcrtimed/backend/filesystem" + "github.com/decred/dcrtime/dcrtimed/backend/postgres" ) var ( @@ -24,18 +26,9 @@ var ( func _main() error { flag.Parse() - if *restore { - if *destination == "" { - return fmt.Errorf("-destination must be set") - } - - fs, err := filesystem.NewRestore(*destination) - if err != nil { - return err - } - defer fs.Close() - - return fs.Restore(os.Stdin, true, *destination) + loadedCfg, err := loadConfig() + if err != nil { + return fmt.Errorf("Could not load configuration file: %v", err) } root := *fsRoot @@ -48,17 +41,45 @@ func _main() error { } } - // Dump - - fmt.Printf("=== Root: %v\n", root) - - fs, err := filesystem.NewDump(root) + var b backend.Backend + switch (*loadedCfg).Backend { + case "filesystem": + if *restore { + if *destination == "" { + return fmt.Errorf("-destination must be set") + } + b, err = filesystem.NewRestore(*destination) + break + } + b, err = filesystem.NewDump(root) + if !*dumpJSON { + fmt.Printf("=== Root: %v\n", root) + } + case "postgres": + var net string + switch loadedCfg.TestNet { + case true: + net = "testnet" + default: + net = "mainnet" + } + b, err = postgres.NewDB(loadedCfg.PostgresHost, + net, + loadedCfg.PostgresRootCert, + loadedCfg.PostgresCert, + loadedCfg.PostgresKey) + default: + err = fmt.Errorf("Unsupported backend type: %v", (*loadedCfg).Backend) + } if err != nil { return err } - defer fs.Close() + defer b.Close() - return fs.Dump(os.Stdout, !*dumpJSON) + if *restore { + return b.Restore(os.Stdin, true, *destination) + } + return b.Dump(os.Stdout, !*dumpJSON) } func main() { diff --git a/cmd/dcrtime_fsck/README.md b/cmd/dcrtime_fsck/README.md index 10814db..bfa52ff 100644 --- a/cmd/dcrtime_fsck/README.md +++ b/cmd/dcrtime_fsck/README.md @@ -4,6 +4,10 @@ dcrtime_fsck The filesystem backend can under rare circumstances become incoherent. This tool iterates over all timestamp directories and corrects known failures. +PostgreSQL backend also supported, `dcrtime_fsck` digs in `dcrtimed.conf` +file to determine which backend to run, and uses the provided db params +in case postgres backend is running. + ## Flags ``` diff --git a/cmd/dcrtime_fsck/config.go b/cmd/dcrtime_fsck/config.go new file mode 100644 index 0000000..d4a0065 --- /dev/null +++ b/cmd/dcrtime_fsck/config.go @@ -0,0 +1,67 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "path/filepath" + + "github.com/jessevdk/go-flags" +) + +const defaultConfigFilename = "dcrtimed.conf" + +var ( + defaultConfigFile = filepath.Join(defaultHomeDir, defaultConfigFilename) + defaultBackend = "filesystem" +) + +// config defines the configuration options for dcrtime_fsck +// +// See loadConfig for details on the configuration load process. +type config struct { + HomeDir string `short:"A" long:"appdata" description:"Path to application home directory"` + ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` + ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"` + DataDir string `short:"b" long:"datadir" description:"Directory to store data"` + LogDir string `long:"logdir" description:"Directory to log output."` + TestNet bool `long:"testnet" description:"Use the test network"` + SimNet bool `long:"simnet" description:"Use the simulation test network"` + Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` + CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"` + MemProfile string `long:"memprofile" description:"Write mem profile to the specified file"` + DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` + Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 49152, testnet: 59152)"` + WalletHost string `long:"wallethost" description:"Hostname for wallet server"` + WalletCert string `long:"walletcert" description:"Certificate path for wallet server"` + WalletPassphrase string `long:"walletpassphrase" description:"Passphrase for wallet server"` + Version string + HTTPSCert string `long:"httpscert" description:"File containing the https certificate file"` + HTTPSKey string `long:"httpskey" description:"File containing the https certificate key"` + StoreHost string `long:"storehost" description:"Enable proxy mode - send requests to the specified ip:port"` + StoreCert string `long:"storecert" description:"File containing the https certificate file for storehost"` + EnableCollections bool `long:"enablecollections" description:"Allow clients to query collection timestamps."` + APITokens []string `long:"apitoken" description:"Token used to grant access to privileged API resources"` + APIVersions string `long:"apiversions" description:"Enables API versions on the daemon"` + Backend string `long:"backend" description:"Sets the cache layer type 'filesystem'/'postgres'"` + PostgresHost string `long:"postgreshost" description:"Postgres ip:port"` + PostgresRootCert string `long:"postgresrootcert" description:"File containing the CA certificate for postgres"` + PostgresCert string `long:"postgrescert" description:"File containing the dcrtimed client certificate for postgres"` + PostgresKey string `long:"postgreskey" description:"File containing the dcrtimed client certificate key for postgres"` +} + +// loadConfig initializes and parses the config using a config file +func loadConfig() (*config, error) { + // Default config. + cfg := config{ + Backend: defaultBackend, + } + + err := flags.IniParse(defaultConfigFile, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} diff --git a/cmd/dcrtime_fsck/dcrtime_fsck.go b/cmd/dcrtime_fsck/dcrtime_fsck.go index 194b9fd..8a725bd 100644 --- a/cmd/dcrtime_fsck/dcrtime_fsck.go +++ b/cmd/dcrtime_fsck/dcrtime_fsck.go @@ -11,6 +11,7 @@ import ( "github.com/decred/dcrd/dcrutil/v2" "github.com/decred/dcrtime/dcrtimed/backend" "github.com/decred/dcrtime/dcrtimed/backend/filesystem" + "github.com/decred/dcrtime/dcrtimed/backend/postgres" ) var ( @@ -28,6 +29,11 @@ var ( func _main() error { flag.Parse() + loadedCfg, err := loadConfig() + if err != nil { + return fmt.Errorf("Could not load configuration file: %v", err) + } + root := *fsRoot if root == "" { root = filepath.Join(defaultHomeDir, "data") @@ -52,13 +58,32 @@ func _main() error { fmt.Printf("=== Root: %v\n", root) - fs, err := filesystem.NewDump(root) + var b backend.Backend + switch (*loadedCfg).Backend { + case "filesystem": + b, err = filesystem.NewDump(root) + case "postgres": + var net string + switch loadedCfg.TestNet { + case true: + net = "testnet" + default: + net = "mainnet" + } + b, err = postgres.NewDB(loadedCfg.PostgresHost, + net, + loadedCfg.PostgresRootCert, + loadedCfg.PostgresCert, + loadedCfg.PostgresKey) + default: + err = fmt.Errorf("Unsupported backend type: %v", (*loadedCfg).Backend) + } if err != nil { return err } - defer fs.Close() + defer b.Close() - return fs.Fsck(&backend.FsckOptions{ + return b.Fsck(&backend.FsckOptions{ Verbose: *verbose, PrintHashes: *printHashes, Fix: *fix, diff --git a/cmd/dcrtime_timestamp/dcrtime_timestamp.go b/cmd/dcrtime_timestamp/dcrtime_timestamp.go index c028cb9..f2c6a53 100644 --- a/cmd/dcrtime_timestamp/dcrtime_timestamp.go +++ b/cmd/dcrtime_timestamp/dcrtime_timestamp.go @@ -28,7 +28,7 @@ func _main() error { continue } - // Try timestam second + // Try timestamp second timestamp, err := time.Parse(fStr, a) if err == nil { fmt.Printf("%v\n", timestamp.Unix()) diff --git a/dcrtimed/backend/filesystem/filesystem.go b/dcrtimed/backend/filesystem/filesystem.go index b871c2b..3f8d07c 100644 --- a/dcrtimed/backend/filesystem/filesystem.go +++ b/dcrtimed/backend/filesystem/filesystem.go @@ -934,33 +934,34 @@ func (fs *FileSystem) LastAnchor() (*backend.LastAnchorResult, error) { var fr *backend.FlushRecord var me backend.LastAnchorResult payload, err := db.Get([]byte(flushedKey), nil) - if err == nil { - fr, err = DecodeFlushRecord(payload) - if err != nil { - return &me, err - } - me.Tx = fr.Tx + if err != nil { + return &backend.LastAnchorResult{}, err + } - // Close db conection as we may - // write & update it - db.Close() + fr, err = DecodeFlushRecord(payload) + if err != nil { + return &me, err + } + me.Tx = fr.Tx - // Lookup anchored tx info, - // and update db if info changed. - txWalletInfo, err := fs.lazyFlush(flushedTs, fr) + // Close db conection as we may + // write & update it + db.Close() - // If no error, or no enough confirmations - // err continue, else return err. - if err != nil && err != errNotEnoughConfirmation { - return &backend.LastAnchorResult{}, err - } - me.ChainTimestamp = fr.ChainTimestamp - me.BlockHash = txWalletInfo.BlockHash.String() - me.BlockHeight = txWalletInfo.BlockHeight - return &me, nil + // Lookup anchored tx info, + // and update db if info changed. + txWalletInfo, err := fs.lazyFlush(flushedTs, fr) + + // If no error, or no enough confirmations + // err continue, else return err. + if err != nil && err != errNotEnoughConfirmation { + return &backend.LastAnchorResult{}, err } - return &backend.LastAnchorResult{}, err + me.ChainTimestamp = fr.ChainTimestamp + me.BlockHash = txWalletInfo.BlockHash.String() + me.BlockHeight = txWalletInfo.BlockHeight + return &me, nil } // GetBalance provides the balance of the wallet and satisfies the diff --git a/dcrtimed/backend/filesystem/filesystem_test.go b/dcrtimed/backend/filesystem/filesystem_test.go index 46de9e2..697293e 100644 --- a/dcrtimed/backend/filesystem/filesystem_test.go +++ b/dcrtimed/backend/filesystem/filesystem_test.go @@ -178,7 +178,7 @@ func TestGetDigests(t *testing.T) { gr.ErrorCode != foundGlobal) { t.Fatalf("invalid digest got %x want %x ErrorCode "+ "got %v want %v", gr.Digest[:], hashes[i][:], - gr.ErrorCode, foundLocal) + gr.ErrorCode, foundGlobal) } if i >= count && gr.ErrorCode != backend.ErrorNotFound { t.Fatalf("invalid ErrorCode got %x want %x", diff --git a/dcrtimed/backend/postgres/dump.go b/dcrtimed/backend/postgres/dump.go new file mode 100644 index 0000000..a811f6f --- /dev/null +++ b/dcrtimed/backend/postgres/dump.go @@ -0,0 +1,290 @@ +// Copyright (c) 2017-2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package postgres + +import ( + "bytes" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "net/url" + "os" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrtime/dcrtimed/backend" +) + +func NewDB(host, net, rootCert, cert, key string) (*Postgres, error) { + // Connect to database + dbName := net + "_dcrtime" + h := "postgresql://" + dbUser + "@" + host + "/" + dbName + u, err := url.Parse(h) + if err != nil { + return nil, fmt.Errorf("parse url '%v': %v", h, err) + } + + qs := buildQueryString(rootCert, cert, key) + addr := u.String() + "?" + qs + + db, err := sql.Open("postgres", addr) + if err != nil { + return nil, fmt.Errorf("connect to database '%v': %v", addr, err) + } + + pg := &Postgres{ + db: db, + } + + return pg, nil +} + +// Dump dumps database to the provided file descriptor. If the +// human flag is set to true it pretty prints the database content +// otherwise it dumps a JSON stream. +func (pg *Postgres) Dump(f *os.File, verbose bool) error { + err := pg.dumpTimestamps(f, verbose) + if err != nil { + return err + } + return nil +} + +func (pg *Postgres) dumpTimestamps(f *os.File, verbose bool) error { + tss, err := pg.getAllRecordsTimestamps() + if err != nil { + return err + } + + for _, ts := range *tss { + if verbose { + fmt.Fprintf(f, "--- Timestamp: %v\n", ts) + } + err = pg.dumpTimestamp(f, verbose, ts) + if err != nil { + return err + } + } + return nil +} + +func (pg *Postgres) dumpTimestamp(f *os.File, verbose bool, ts int64) error { + exists, records, err := pg.getRecordsByServerTs(ts) + if err != nil { + return err + } + // Non fatal error if there is nothing to do + if !exists { + return nil + } + + var ( + anchored bool + fr backend.FlushRecord + digests = make([]backend.DigestReceived, 0, 10000) + ) + // Iterate over records in reverse to keep the order they were inserted in + for i := len(records) - 1; i >= 0; i-- { + ar := records[i] + if !bytes.Equal(ar.Anchor.Merkle, []byte{}) && !anchored { + anchored = true + copy(fr.Root[:], ar.Anchor.Merkle[:sha256.Size]) + tx, err := chainhash.NewHash(ar.Anchor.TxHash[:]) + if err != nil { + return err + } + fr.Tx = *tx + fr.ChainTimestamp = ar.Anchor.ChainTimestamp + fr.FlushTimestamp = ar.Anchor.FlushTimestamp + } + var digest [sha256.Size]byte + copy(digest[:], ar.Record.Digest[:]) + fr.Hashes = append(fr.Hashes, &digest) + digests = append(digests, backend.DigestReceived{ + Digest: hex.EncodeToString(ar.Record.Digest[:]), + Timestamp: ar.Record.CollectionTimestamp, + }) + } + + if anchored { + if verbose { + dumpFlushRecord(f, &fr) + } else { + e := json.NewEncoder(f) + rt := backend.RecordType{ + Version: backend.RecordTypeVersion, + Type: backend.RecordTypeFlushRecord, + } + err := e.Encode(rt) + if err != nil { + return err + } + frj := backend.FlushRecordJSON{ + Root: fr.Root, + Hashes: fr.Hashes, + Tx: fr.Tx, + ChainTimestamp: fr.ChainTimestamp, + FlushTimestamp: fr.FlushTimestamp, + Timestamp: ts, + } + err = e.Encode(frj) + if err != nil { + return err + } + } + } + + for _, v := range digests { + err := dumpDigestTimestamp(f, verbose, + backend.RecordTypeDigestReceived, v) + if err != nil { + return err + } + } + + return nil +} + +func dumpDigestTimestamp(f *os.File, verbose bool, recordType string, dr backend.DigestReceived) error { + if verbose { + fmt.Fprintf(f, "Digest : %v\n", dr.Digest) + fmt.Fprintf(f, "Timestamp : %v\n", dr.Timestamp) + } else { + e := json.NewEncoder(f) + rt := backend.RecordType{ + Version: backend.RecordTypeVersion, + Type: recordType, + } + err := e.Encode(rt) + if err != nil { + return err + } + r := backend.DigestReceived{ + Digest: dr.Digest, + Timestamp: dr.Timestamp, + } + err = e.Encode(r) + if err != nil { + return err + } + } + return nil +} + +func dumpFlushRecord(f *os.File, flushRecord *backend.FlushRecord) { + fmt.Fprintf(f, "Merkle root : %x\n", + flushRecord.Root) + fmt.Fprintf(f, "Tx : %v\n", flushRecord.Tx) + fmt.Fprintf(f, "Chain timestamp: %v\n", + flushRecord.ChainTimestamp) + fmt.Fprintf(f, "Flush timestamp: %v\n", + flushRecord.FlushTimestamp) + for _, v := range flushRecord.Hashes { + fmt.Fprintf(f, " Flushed : %x\n", *v) + } +} + +func (pg *Postgres) restoreFlushRecord(verbose bool, fr backend.FlushRecordJSON) error { + a := Anchor{ + Merkle: fr.Root[:], + TxHash: fr.Tx[:], + ChainTimestamp: fr.ChainTimestamp, + FlushTimestamp: fr.FlushTimestamp, + } + + err := pg.insertAnchor(a) + if err != nil { + return err + } + if verbose { + fmt.Printf("Restored flushed anchor: (merkle:%v)\n", hex.EncodeToString( + a.Merkle)) + } + return nil +} + +// Restore reads JSON encoded database contents and recreates the postgres +// database. +func (pg *Postgres) Restore(f *os.File, verbose bool, location string) error { + d := json.NewDecoder(f) + + // we store each flushed timestamp merkle root in order to insert it + // when restoring digests to the records table + tssMerkles := make(map[int64][sha256.Size]byte) + + state := 0 + for { + switch state { + case 0: + // Type + var t backend.RecordType + err := d.Decode(&t) + if err != nil { + return err + } + + // Check version we understand + if t.Version != backend.RecordTypeVersion { + return fmt.Errorf("unknown version %v", + t.Version) + } + + // Determine record type + switch t.Type { + case backend.RecordTypeDigestReceived: + state = 1 + case backend.RecordTypeFlushRecord: + state = 2 + default: + return fmt.Errorf("invalid record type: %v", + t.Type) + } + case 1: + // DigestReceived + var dr backend.DigestReceived + err := d.Decode(&dr) + if err != nil { + return err + } + // if digest' timestamp was anchored, get anchor' merkle root + // to insert it into records table + anchorRoot := tssMerkles[dr.Timestamp] + digest, err := hex.DecodeString(dr.Digest) + if err != nil { + return err + } + r := Record{ + CollectionTimestamp: dr.Timestamp, + Digest: digest, + AnchorMerkle: anchorRoot[:], + } + err = pg.insertRestoredDigest(r) + if err != nil { + return err + } + state = 0 + case 2: + // Flushrecord + var fr backend.FlushRecordJSON + err := d.Decode(&fr) + if err != nil { + return err + } + err = pg.restoreFlushRecord(verbose, fr) + if err != nil { + return err + } + _, ok := tssMerkles[fr.Timestamp] + if !ok { + tssMerkles[fr.Timestamp] = fr.Root + } + state = 0 + default: + // Illegal + return fmt.Errorf("invalid state %v", state) + } + } +} diff --git a/dcrtimed/backend/postgres/fsck.go b/dcrtimed/backend/postgres/fsck.go new file mode 100644 index 0000000..7adb8a4 --- /dev/null +++ b/dcrtimed/backend/postgres/fsck.go @@ -0,0 +1,310 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package postgres + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/txscript/v2" + "github.com/decred/dcrdata/api/types/v4" + "github.com/decred/dcrtime/dcrtimed/backend" + "github.com/decred/dcrtime/merkle" +) + +const ( + PostgresActionVersion = 1 // All structure versions + + PostgresActionHeader = "header" + PostgresActionDeleteTimestamp = "deletetimestamp" + PostgresActionDeleteDigest = "deletedigest" + PostgresActionDeleteDuplicate = "deleteduplicate" +) + +type PostgresAction struct { + Version uint64 `json:"version"` // Version of structure + Timestamp int64 `json:"timestamp"` // Timestamp of action + Action string `json:"action"` // Following JSON command +} + +type PostgresHeader struct { + Version uint64 `json:"version"` // Version of structure + Start int64 `json:"start"` // Start of fsck + DryRun bool `json:"dryrun"` // Dry run +} + +type PostgresDeleteTimestamp struct { + Version uint64 `json:"version"` // Version of structure + Timestamp int64 `json:"timestamp"` // Timestamp + Directory string `json:"directory"` // Directory name of Timestamp +} + +type PostgresDeleteDigest struct { + Version uint64 `json:"version"` // Version of structure + Timestamp int64 `json:"timestamp"` // Timestamp of digest + GlobalTimestamp int64 `json:"globaltimestamp"` // Global timestamp of digest + Digest string `json:"digest"` // Digest that was deleted +} + +type PostgresDeleteDuplicate struct { + Version uint64 `json:"version"` // Version of structure + Digest string `json:"digest"` // Duplicate digest + Found int64 `json:"found"` // Original timestamp + FoundDirectory string `json:"founddirectory"` // Original directory + Duplicate int64 `json:"duplicate"` // Duplicate timestamp + DuplicateDirectory string `json:"duplicatedirectory"` // Duplicate directory +} + +// validJournalAction returns true if the action is a valid PostgresAction. +func validJournalAction(action string) bool { + switch action { + case PostgresActionHeader: + case PostgresActionDeleteTimestamp: + case PostgresActionDeleteDigest: + case PostgresActionDeleteDuplicate: + default: + return false + } + return true +} + +// journal records what fix occurred at what time if filename != "". +func journal(filename, action string, payload interface{}) error { + // See if we are journaling + if filename == "" { + return nil + } + + // Sanity + if !validJournalAction(action) { + return fmt.Errorf("invalid journal action: %v", action) + } + + f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + return err + } + defer f.Close() + + // Write PostgresAction + e := json.NewEncoder(f) + rt := PostgresAction{ + Version: PostgresActionVersion, + Timestamp: time.Now().Unix(), + Action: action, + } + err = e.Encode(rt) + if err != nil { + return err + } + + // Write payload + err = e.Encode(payload) + if err != nil { + return err + } + fmt.Fprintf(f, "\n") + + return err +} + +// fsckTimestamp verifies that a timestamp is coherent by doing the following: +// 1. Find timestamp' digests in records table & anchor db info if anchored +// 2. Ensure no duplicates in timestamp's digets +// 3. If timestamp was anchored: +// 3.1 Generate merkle root using records table digests and verify against +// anchor's merkle root from db +// 3.2 Verify that the anchor merkle root on db exists on the blockchain. +func (pg *Postgres) fsckTimestamp(options *backend.FsckOptions, ts int64) error { + exists, records, err := pg.getRecordsByServerTs(ts) + if err != nil { + return err + } + // Non fatal error if there is nothing to do + if !exists { + return nil + } + + var ( + anchored bool + fr backend.FlushRecord + ) + digests := make(map[string]int64) + for _, r := range records { + k := hex.EncodeToString(r.Record.Digest[:]) + if _, ok := digests[k]; ok { + // This really can't happen but we check it so that we + // can equate lengths later to determine if the map and + // array are the same. + return fmt.Errorf(" *** ERROR duplicate key: %v", k) + } + digests[k] = ts + if !bytes.Equal(r.Anchor.Merkle, []byte{}) && !anchored { + anchored = true + copy(fr.Root[:], r.Anchor.Merkle[:sha256.Size]) + tx, err := chainhash.NewHash(r.Anchor.TxHash[:]) + if err != nil { + return err + } + fr.Tx = *tx + fr.FlushTimestamp = r.Anchor.FlushTimestamp + } + if options.PrintHashes { + fmt.Printf("Hash : %v\n", + hex.EncodeToString(r.Record.Digest[:])) + } + var digest [sha256.Size]byte + copy(digest[:], r.Record.Digest[:]) + fr.Hashes = append(fr.Hashes, &digest) + } + fmt.Printf("No duplicates found\n") + fmt.Printf("Anchored : %v\n", anchored) + // If anchored generate merkle and compare against merkle in anchors + // table + if anchored { + // Generate merkle + mt := merkle.Tree(fr.Hashes) + // Last element is root + root := *mt[len(mt)-1] + if !bytes.Equal(root[:], fr.Root[:]) { + return fmt.Errorf(" *** ERROR mismatched merkle "+ + "root: %x %x", root, fr.Root) + } + if options.PrintHashes { + fmt.Printf("Anchor's Calculated merkle: %x\n", root[:]) + fmt.Printf("Anchor's merkle from db: %x\n", fr.Root[:]) + } + fmt.Printf("Verfied anchor's merkle root\n") + } + + // 3.3 Verify merkle root in tx + u := options.URL + fr.Tx.String() + "/out" + r, err := http.Get(u) + if err != nil { + return fmt.Errorf(" *** ERROR HTTP Get: %v", err) + } + defer r.Body.Close() + + if r.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return fmt.Errorf(" *** ERROR invalid "+ + "body: %v %v", r.StatusCode, body) + } + return fmt.Errorf(" *** ERROR invalid dcrdata "+ + "answer: %v %s", r.StatusCode, body) + } + + var txOuts []types.TxOut + d := json.NewDecoder(r.Body) + if err := d.Decode(&txOuts); err != nil { + return err + } + + var done bool + for _, v := range txOuts { + if !types.IsNullDataScript(v.ScriptPubKeyDecoded.Type) { + continue + } + script, err := hex.DecodeString(v.ScriptPubKeyDecoded.Hex) + if err != nil { + return fmt.Errorf(" *** ERROR invalid "+ + "dcrdata script: %v", err) + } + data, err := txscript.PushedData(script) + if err != nil { + return fmt.Errorf(" *** ERROR invalid "+ + "script: %v", err) + } + if !bytes.Equal(data[0], fr.Root[:]) { + continue + } + + // Everything is cool so mark it and break out + done = true + break + } + + if !done { + return fmt.Errorf(" *** ERROR merkle root not "+ + "found: tx %v merkle %x", fr.Tx, fr.Root) + } + + if options.PrintHashes { + fmt.Printf("Anchor's tx: %v found on the blockchain\n", fr.Tx) + } else { + fmt.Printf("Anchor's tx found on the blockchain\n") + } + + return nil +} + +func (pg *Postgres) fsckTimestamps(options *backend.FsckOptions) error { + tss, err := pg.getAllRecordsTimestamps() + if err != nil { + return err + } + + for _, ts := range *tss { + if options.Verbose || options.PrintHashes { + fmt.Printf("--- Checking: %v \n", ts) + } + err = pg.fsckTimestamp(options, ts) + if err != nil { + return err + } + if options.Verbose || options.PrintHashes { + fmt.Printf("=== Verified: %v \n", + ts) + } + } + return nil +} + +// Fsck walks all db records and verifies all that there is no apparent data +// corruption and that the anchors indeed exist on the blockchain. +func (pg *Postgres) Fsck(options *backend.FsckOptions) error { + ts := time.Now().Unix() + fmt.Printf("=== FSCK started %v\n", ts) + fmt.Printf("--- Phase 1: checking records table\n") + + if options.File != "" { + // Create journal file + f, err := os.OpenFile(options.File, os.O_RDWR|os.O_CREATE, 0640) + if err != nil { + return err + } + f.Close() + } + + err := journal(options.File, PostgresActionHeader, + PostgresHeader{ + Version: PostgresActionVersion, + Start: ts, + DryRun: !options.Fix, + }) + if err != nil { + return fmt.Errorf(" *** ERROR journal: %v", + err) + } + + if options == nil { + options = &backend.FsckOptions{} + } + + err = pg.fsckTimestamps(options) + if err != nil { + return err + } + return nil +} diff --git a/dcrtimed/backend/postgres/log.go b/dcrtimed/backend/postgres/log.go new file mode 100644 index 0000000..6eeb2e2 --- /dev/null +++ b/dcrtimed/backend/postgres/log.go @@ -0,0 +1,25 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package postgres + +import "github.com/decred/slog" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log = slog.Disabled + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = slog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using slog. +func UseLogger(logger slog.Logger) { + log = logger +} diff --git a/dcrtimed/backend/postgres/models.go b/dcrtimed/backend/postgres/models.go new file mode 100644 index 0000000..94223a4 --- /dev/null +++ b/dcrtimed/backend/postgres/models.go @@ -0,0 +1,35 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package postgres + +import "github.com/decred/dcrtime/merkle" + +// Record records collected digests, it includes the collection timestamp +// and the anchor merkle root if the digest was anchored +type Record struct { + Digest []byte + CollectionTimestamp int64 + AnchorMerkle []byte +} + +// Anchor records anchors information, it includes the merkle root of all +// digests included in a given anchor, the anchor's transaction hash, the flush +// timestamp and the chain timestamp if the transaction has enough +// confirmations. +type Anchor struct { + Merkle []byte + TxHash []byte + ChainTimestamp int64 + FlushTimestamp int64 +} + +// AnchoredRecord is a wrapper struct, used to return record's and it's +// corresponding anchor information if digest was anchored. +type AnchoredRecord struct { + Record Record + Anchor Anchor + // Not stored on db, calculated and returned to client for verification + MerklePath merkle.Branch +} diff --git a/dcrtimed/backend/postgres/postgres.go b/dcrtimed/backend/postgres/postgres.go new file mode 100644 index 0000000..0ea41e9 --- /dev/null +++ b/dcrtimed/backend/postgres/postgres.go @@ -0,0 +1,591 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package postgres + +import ( + "bytes" + "crypto/sha256" + "database/sql" + "errors" + "fmt" + "net/url" + "path/filepath" + "sync" + "time" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrtime/dcrtimed/backend" + "github.com/decred/dcrtime/dcrtimed/dcrtimewallet" + "github.com/decred/dcrtime/merkle" + "github.com/lib/pq" + "github.com/robfig/cron" +) + +const ( + tableRecords = "records" + tableAnchors = "anchors" + dbUser = "dcrtimed" + confirmations = 6 +) + +var ( + _ backend.Backend = (*Postgres)(nil) + + // duration and flushSchedule must match or bad things will happen. By + // matching we mean both are hourly or every so many minutes. + // + // Seconds Minutes Hours Days Months DayOfWeek + // XXX XXX XXX revert minute cycles local dev change XXX XXX XXX + flushSchedule = "10 * * * * *" // On the hour + 10 seconds + duration = time.Minute // Default how often we combine digests + + errEmptySet = errors.New("empty set") +) + +// Postgres is a postgreSQL implementation of a backend, it stores all uploaded +// digests in records table, on flush it stores all anchor info as well and +// link all anchored records with the corresponding anchor. +type Postgres struct { + sync.RWMutex + + cron *cron.Cron // Scheduler for periodic tasks + db *sql.DB // Postgres database + duration time.Duration // How often we combine digests + commit uint // Current version, incremented during flush + + enableCollections bool // Set to true to enable collection query + + wallet *dcrtimewallet.DcrtimeWallet // Wallet context. + + myNow func() time.Time // Override time.Now() +} + +// now returns current time stamp rounded down to 1 hour. All timestamps are +// UTC. +func (pg *Postgres) now() time.Time { + return pg.truncate(pg.myNow().UTC(), pg.duration) +} + +// truncate rounds time down to the provided duration. This is split out in +// order to test. +func (pg *Postgres) truncate(t time.Time, d time.Duration) time.Time { + return t.Truncate(d) +} + +var ( + errInvalidConfirmations = errors.New("invalid confirmations") + errNotEnoughConfirmation = errors.New("not enough confirmations") +) + +// lazyFlush takes a pointer to a flush record and updates the chain anchor +// timestamp of said record and writes it back to the database and returns +// the result of the wallet's Lookup function +// +// IMPORTANT NOTE: We *may* write to the anchors database in case of a lazy +// timestamp update to the anchor timestamo while holding the READ lock. This +// is OK because at worst we are racing multiple atomic writes. +// This is suboptimal but beats taking a write lock for all get* calls. +func (pg *Postgres) lazyFlush(fr *backend.FlushRecord) (*dcrtimewallet.TxLookupResult, error) { + res, err := pg.wallet.Lookup((*fr).Tx) + if err != nil { + return nil, err + } + + log.Debugf("lazyFlush confirmations: %v", res.Confirmations) + + if res.Confirmations == -1 { + return nil, errInvalidConfirmations + } else if res.Confirmations < confirmations { + // Return error & wallet lookup res + // for error handling + return res, errNotEnoughConfirmation + } + + fr.ChainTimestamp = res.Timestamp + + // Update anchor row in database + err = pg.updateAnchorChainTs(Anchor{ + ChainTimestamp: fr.ChainTimestamp, + Merkle: fr.Root[:], + }) + if err != nil { + return nil, err + } + + log.Infof("Flushed anchor timestamp: %v %v", fr.Tx.String(), + res.Timestamp) + + return res, nil +} + +// Get returns timestamp information for given digests. +func (pg *Postgres) Get(digests [][sha256.Size]byte) ([]backend.GetResult, error) { + gdmes := make([]backend.GetResult, 0, len(digests)) + + // We need to be read locked from here on out. Note that we are not + // locking/releasing. This is by design in order to let all readers + // finish before a potential write occurs. + pg.RLock() + defer pg.RUnlock() + + // Iterate over digests and translate results to backend interface. + for _, d := range digests { + gdme := backend.GetResult{ + Digest: d, + } + ar := AnchoredRecord{ + Record: Record{ + Digest: d[:], + }, + } + found, err := pg.getRecordByDigest(&ar) + if err != nil { + return nil, err + } + + if !found { + gdme.ErrorCode = backend.ErrorNotFound + } else { + gdme.ErrorCode = backend.ErrorOK + gdme.MerklePath = ar.MerklePath + copy(gdme.MerkleRoot[:], ar.Anchor.Merkle[:]) + tx, err := chainhash.NewHash(ar.Anchor.TxHash[:]) + gdme.AnchoredTimestamp = ar.Anchor.ChainTimestamp + if err != nil { + return nil, err + } + gdme.Tx = *tx + + // Lazyflush record if it was anchored but blockchain timestamp + // isn't avialable yet + if !bytes.Equal(ar.Anchor.Merkle, []byte{}) && + gdme.AnchoredTimestamp == 0 { + fr := backend.FlushRecord{ + Tx: gdme.Tx, + Root: gdme.MerkleRoot, + } + _, err = pg.lazyFlush(&fr) + if err != nil { + switch err { + case errNotEnoughConfirmation: + // All good, continue without blockchain timestamp + case errInvalidConfirmations: + log.Errorf("%v: Confirmations = -1", + gdme.Tx.String()) + return nil, err + default: + return nil, err + } + } + gdme.AnchoredTimestamp = fr.ChainTimestamp + } + } + gdmes = append(gdmes, gdme) + } + return gdmes, nil +} + +// GetTimestamps is a required interface function. In our case it retrieves +// the digests for a given timestamp. +// +// GetTimestamps satisfies the backend interface. +func (pg *Postgres) GetTimestamps(timestamps []int64) ([]backend.TimestampResult, error) { + gtmes := make([]backend.TimestampResult, 0, len(timestamps)) + + // We need to be read locked from here on out. Note that we are not + // locking/releasing. This is by design in order to let all readers + // finish before a potential write occurs. + pg.RLock() + defer pg.RUnlock() + + // Iterate over timestamps and translate results to backend interface. + for _, ts := range timestamps { + gtme := backend.TimestampResult{ + Timestamp: ts, + } + if pg.enableCollections { + exists, records, err := pg.getRecordsByServerTs(ts) + if err != nil { + return nil, err + } + if !exists { + gtme.ErrorCode = backend.ErrorNotFound + } else { + gtme.ErrorCode = backend.ErrorOK + // copy ts digests + gtme.Digests = make([][sha256.Size]byte, 0, len(records)) + for _, r := range records { + var d [sha256.Size]byte + copy(d[:], r.Record.Digest[:]) + gtme.Digests = append(gtme.Digests, d) + tx, err := chainhash.NewHash(r.Anchor.TxHash) + if err != nil { + return nil, err + } + gtme.Tx = *tx + gtme.AnchoredTimestamp = r.Anchor.ChainTimestamp + copy(gtme.MerkleRoot[:], r.Anchor.Merkle[:sha256.Size]) + } + // Lazyflush record if it was anchored but blockchain timestamp + // isn't avialable yet + if gtme.MerkleRoot != [sha256.Size]byte{} && + gtme.AnchoredTimestamp == 0 { + fr := backend.FlushRecord{ + Tx: gtme.Tx, + Root: gtme.MerkleRoot, + } + _, err = pg.lazyFlush(&fr) + if err != nil { + switch err { + case errNotEnoughConfirmation: + // All good, continue without blockchain timestamp + case errInvalidConfirmations: + log.Errorf("%v: Confirmations = -1", + gtme.Tx.String()) + return nil, err + default: + return nil, err + } + } + gtme.AnchoredTimestamp = fr.ChainTimestamp + } + } + } else { + gtme.ErrorCode = backend.ErrorNotAllowed + } + gtmes = append(gtmes, gtme) + } + return gtmes, nil +} + +// Put stores hashes and return timestamp and associated errors. Put is +// allowed to return transient errors. +func (pg *Postgres) Put(hashes [][sha256.Size]byte) (int64, []backend.PutResult, error) { + // Two-phase commit. + pg.Lock() + commit := pg.commit + pg.Unlock() + + // Get current time rounded down. + ts := pg.now().Unix() + + // Prep return and unwind bits before taking mutex. + me := make([]backend.PutResult, 0, len(hashes)) + + // Create batch transaction + txn, err := pg.db.Begin() + if err != nil { + return 0, []backend.PutResult{}, err + } + + stmt, err := txn.Prepare(pq.CopyIn("records", "digest", + "collection_timestamp")) + if err != nil { + return 0, []backend.PutResult{}, err + } + defer stmt.Close() + + for _, hash := range hashes { + // Check if digest exists + exists, err := pg.isDigestExists(hash[:]) + if err != nil { + return 0, []backend.PutResult{}, err + } + if exists { + me = append(me, backend.PutResult{ + Digest: hash, + ErrorCode: backend.ErrorExists, + }) + continue + } + // Insert record + _, err = stmt.Exec(hash[:], ts) + if err != nil { + return 0, []backend.PutResult{}, err + } + + // Mark as successful + me = append(me, backend.PutResult{ + Digest: hash, + ErrorCode: backend.ErrorOK, + }) + } + + // From this point on the operation must be atomic. + pg.Lock() + defer pg.Unlock() + + // Make sure we are on the same commit. + if commit != pg.commit { + return 0, []backend.PutResult{}, backend.ErrTryAgainLater + } + + // Write to db + _, err = stmt.Exec() + if err != nil { + return 0, []backend.PutResult{}, err + } + err = txn.Commit() + if err != nil { + return 0, []backend.PutResult{}, err + } + + return ts, me, nil +} + +// Close performs cleanup of the backend. In our case closes postgres +// connection +func (pg *Postgres) Close() { + // Block until last command is complete. + pg.Lock() + defer pg.Unlock() + defer log.Infof("Exiting") + + // We need nil tests when in dump/restore mode. + if pg.cron != nil { + pg.cron.Stop() + } + if pg.wallet != nil { + pg.wallet.Close() + } + pg.db.Close() +} + +// GetBalance retrieves balance information for the wallet +// backing this instance +func (pg *Postgres) GetBalance() (*backend.GetBalanceResult, error) { + result, err := pg.wallet.GetWalletBalance() + if err != nil { + return nil, err + } + return &backend.GetBalanceResult{ + Total: result.Total, + Spendable: result.Spendable, + Unconfirmed: result.Unconfirmed, + }, nil +} + +// LastAnchor retrieves last successful anchor details +func (pg *Postgres) LastAnchor() (*backend.LastAnchorResult, error) { + ts, a, err := pg.getLatestAnchoredTimestamp() + if err != nil { + return nil, err + } + if ts == 0 { + return &backend.LastAnchorResult{}, nil + } + + var ( + merkle [sha256.Size]byte + tx *chainhash.Hash + ) + copy(merkle[:], a.Merkle[:sha256.Size]) + tx, err = chainhash.NewHash(a.TxHash) + if err != nil { + return nil, err + } + var me backend.LastAnchorResult + me.Tx = *tx + + // Lookup anchored tx info, + // and update db if info changed. + fr := backend.FlushRecord{ + Tx: *tx, + Root: merkle, + } + txWalletInfo, err := pg.lazyFlush(&fr) + + // If no error, or no enough confirmations + // err continue, else return err. + if err != nil && err != errNotEnoughConfirmation { + return &backend.LastAnchorResult{}, err + } + + me.ChainTimestamp = fr.ChainTimestamp + me.BlockHash = txWalletInfo.BlockHash.String() + me.BlockHeight = txWalletInfo.BlockHeight + return &me, nil +} + +func buildQueryString(rootCert, cert, key string) string { + v := url.Values{} + v.Set("sslmode", "require") + v.Set("sslrootcert", filepath.Clean(rootCert)) + v.Set("sslcert", filepath.Join(cert)) + v.Set("sslkey", filepath.Join(key)) + return v.Encode() +} + +// internalNew creates the Pstgres context but does not launch background +// bits. This is used by the test packages. +func internalNew(host, net, rootCert, cert, key string) (*Postgres, error) { + // Connect to database + dbName := net + "_dcrtime" + h := "postgresql://" + dbUser + "@" + host + "/" + dbName + u, err := url.Parse(h) + if err != nil { + return nil, fmt.Errorf("parse url '%v': %v", h, err) + } + + qs := buildQueryString(rootCert, cert, key) + addr := u.String() + "?" + qs + + db, err := sql.Open("postgres", addr) + if err != nil { + return nil, fmt.Errorf("connect to database '%v': %v", addr, err) + } + + pg := &Postgres{ + cron: cron.New(), + db: db, + duration: duration, + myNow: time.Now, + } + + // Create tables + err = pg.createTables() + if err != nil { + return nil, err + } + + return pg, nil +} + +// doFlush gets all timestamps which have unflushed records and flushes them. +// It skips current timestamp. +// It returns the number of timestamps that were flushed. +// +// This must be called with the WRITE lock held. We may have to consider +// errors out of this function terminal. +func (pg *Postgres) doFlush() (int, error) { + current := pg.now().Unix() + + // Get timestamps with unflushed records. + // Exclude current timestamp. + tss, err := pg.getUnflushedTimestamps(current) + if err != nil { + return 0, err + } + count := 0 + // Flush timestamps' records + for _, ts := range tss { + err = pg.flush(ts) + if err != nil { + e := fmt.Sprintf("flush %v: %v", ts, err) + log.Error(e) + } else { + count++ + } + } + + return count, nil +} + +// flusher is called periodically to flush the current timestamp. +func (pg *Postgres) flusher() { + // From this point on the operation must be atomic. + pg.Lock() + defer pg.Unlock() + start := time.Now() + count, err := pg.doFlush() + end := time.Since(start) + if err != nil { + log.Errorf("flusher: %v", err) + } + + log.Infof("Flusher: timestamps %v in %v", count, end) +} + +// flush flushes all records associated with given timestamp. +// returns nil iff ts records flushed successfully +// +// This function must be called with the WRITE lock held +func (pg *Postgres) flush(ts int64) error { + // Get timestamp's digests + digests, err := pg.getDigestsByTimestamp(ts) + if err != nil { + return err + } + + if len(digests) == 0 { + // This really should not happen + return errEmptySet + } + + // Generate merkle + mt := merkle.Tree(digests) + // Last element is root + root := *mt[len(mt)-1] + a := Anchor{ + Merkle: root[:], + FlushTimestamp: time.Now().Unix(), + } + tx, err := pg.wallet.Construct(root) + if err != nil { + // XXX do something with unsufficient funds here. + return fmt.Errorf("flush Construct tx: %v", err) + } + log.Infof("Flush timestamp: %v digests %v merkle: %x tx: %v", + ts, len(digests), root, tx.String()) + a.TxHash = (*tx)[:] + + // Insert anchor data into db + err = pg.insertAnchor(a) + if err != nil { + return err + } + + // Update timestamp's records merkle root + pg.updateRecordsAnchor(ts, a.Merkle) + + // Update commit. + pg.commit++ + + return nil +} + +// New creates a new backend instance. The caller should issue a Close once +// the Postgres backend is no longer needed. +func New(host, net, rootCert, cert, key, walletCert, walletHost string, enableCollections bool, walletPassphrase []byte) (*Postgres, error) { + log.Tracef("New: %v %v %v %v %v", host, net, rootCert, cert, key) + + pg, err := internalNew(host, net, rootCert, cert, key) + if err != nil { + return nil, err + } + pg.enableCollections = enableCollections + + // Runtime bits + dcrtimewallet.UseLogger(log) + pg.wallet, err = dcrtimewallet.New(walletCert, walletHost, walletPassphrase) + if err != nil { + return nil, err + } + + // Flushing backend reconciles uncommitted work to the anchors table. + start := time.Now() + flushed, err := pg.doFlush() + end := time.Since(start) + if err != nil { + return nil, err + } + + if flushed != 0 { + log.Infof("Startup flusher: timestamps %v in %v", flushed, end) + } + + // Launch cron. + err = pg.cron.AddFunc(flushSchedule, func() { + pg.flusher() + }) + if err != nil { + return nil, err + } + + pg.cron.Start() + + return pg, nil + +} diff --git a/dcrtimed/backend/postgres/schema.md b/dcrtimed/backend/postgres/schema.md new file mode 100644 index 0000000..b931171 --- /dev/null +++ b/dcrtimed/backend/postgres/schema.md @@ -0,0 +1,31 @@ +# Dcrtime Relational Database + +This document describes the SQL relational tables used in dcrtime. + +We have two tables storing all timestamped digests information, `records` +and `anchors` where the first is key/value like table used to store all +timestamped digests and the second for storing anchors information, where each +succesful anchor will result in a new entry. Each `record` which was included +in an `anchor` will be connected to the corresponding entry in the anchors +table using the col `anchor_merkle` which defined as forgein key & indexed in +records table, below you find the detailed description of the two tables: + +### Tables + +**Records:** +| Col Name | Type | Not Null | P. Key | F. Key | Indexed | Unique | Description | +|----------------------|-------------------|----------|--------|--------|---------|--------|------------------------------| +| collection_timestamp | bigint | x | | | x | | Unix timestamp of collection | +| digest | bytea | x | x | | x | x | Timestamped digest | +| anchor_merkle | bytea ) | | | x | x | | Anchor merkle root | + +**Note:** `anchor_merkle` linking to anchors table, nil if not anchored yet + +**Anchors:** +| Col Name | Type | Not Null | P. Key | F. Key | Indexed | Unique | Description | +|------------------|-------------------|----------|--------|--------|---------|--------|---------------------------------| +| merkle | bytea | x | x | | x | x | Anchor merkle root | +| tx_hash | bytea | | | | x | x | Anchor tx hash | +| chain_timestamp | bigint | | | | | | Anchor timestamp on blockchain | +| flush_timestamp | bigint | | | | | | When anchor actually flushed | + diff --git a/dcrtimed/backend/postgres/sql.go b/dcrtimed/backend/postgres/sql.go new file mode 100644 index 0000000..beeb4cd --- /dev/null +++ b/dcrtimed/backend/postgres/sql.go @@ -0,0 +1,453 @@ +package postgres + +import ( + "bytes" + "crypto/sha256" + "database/sql" + + "github.com/decred/dcrtime/merkle" +) + +// insertRestoredDigest accepts a Record model and inserts it to the db +// +// this func used when restoring a backup +func (pg *Postgres) insertRestoredDigest(r Record) error { + q := `INSERT INTO records (collection_timestamp, digest, anchor_merkle) + VALUES($1, $2, $3)` + + err := pg.db.QueryRow(q, r.CollectionTimestamp, r.Digest, r.AnchorMerkle).Scan() + if err != nil { + // The insert command won't return any value, the following error is + // expected and means anchor row inserted successfully + if err.Error() == "sql: no rows in result set" { + return nil + } + return err + } + return nil +} + +// getAllRecordsTimestamps returns all timestamps found in records table +func (pg *Postgres) getAllRecordsTimestamps() (*[]int64, error) { + q := `SELECT DISTINCT collection_timestamp FROM records` + + rows, err := pg.db.Query(q) + if err != nil { + return nil, err + } + defer rows.Close() + + var tss []int64 + var ts int64 + for rows.Next() { + err = rows.Scan(&ts) + if err != nil { + return nil, err + } + tss = append(tss, ts) + } + return &tss, rows.Err() +} + +// getLatestAnchoredTimestamp returns latest anchor information - tx hash and +// merkle root, additionally it returns anchor's collection timestamp +func (pg *Postgres) getLatestAnchoredTimestamp() (int64, Anchor, error) { + q := `SELECT r.collection_timestamp, r.anchor_merkle, an.tx_hash + FROM records as r + LEFT JOIN anchors as an + on r.anchor_merkle = an.merkle + WHERE r.anchor_merkle IS NOT NULL + ORDER BY r.collection_timestamp DESC + LIMIT 1` + + rows, err := pg.db.Query(q) + a := Anchor{} + if err != nil { + return 0, a, err + } + defer rows.Close() + + var ( + serverTs int64 + txHash, mr []byte + ) + for rows.Next() { + err = rows.Scan(&serverTs, &mr, &txHash) + if err != nil { + return 0, a, err + } + a.Merkle = mr + a.TxHash = txHash + } + return serverTs, a, rows.Err() +} + +// updateAnchorChainTs accepts an anchor and updates it's chain timestamp +// on db +func (pg *Postgres) updateAnchorChainTs(a Anchor) error { + q := `UPDATE anchors SET chain_timestamp = $1 + WHERE merkle = $2` + + err := pg.db.QueryRow(q, a.ChainTimestamp, a.Merkle).Scan() + if err != nil { + // The update command won't return any value, the following error is + // expected and means anchor row updated successfully + if err.Error() == "sql: no rows in result set" { + return nil + } + return err + } + return nil +} + +// updateRecordsAnchor accepts a timestamp and anchor's merkle root and +// updates all digests in records table with given merkle +func (pg *Postgres) updateRecordsAnchor(ts int64, merkleRoot []byte) error { + q := `UPDATE records SET anchor_merkle = $1 + WHERE collection_timestamp = $2` + + err := pg.db.QueryRow(q, merkleRoot, ts).Scan() + if err != nil { + return err + } + return nil +} + +// insertAnchor accepts an anchor and inserts it to db +func (pg *Postgres) insertAnchor(a Anchor) error { + q := `INSERT INTO anchors (merkle, tx_hash, flush_timestamp, chain_timestamp) + VALUES($1, $2, $3, $4)` + + err := pg.db.QueryRow(q, a.Merkle, a.TxHash, + a.FlushTimestamp, a.ChainTimestamp).Scan() + if err != nil { + // The insert command won't return any value, the following error is + // expected and means anchor row inserted successfully + if err.Error() == "sql: no rows in result set" { + return nil + } + return err + } + return nil +} + +// getDigestsByMerkleRoot accepts a merkle root, selects all digests from +// records table using given merkle, converts them to arrays([sha256.Size]) +// and then finally returns the result as array of pointers +func (pg *Postgres) getDigestsByMerkleRoot(merkle []byte) ([]*[sha256.Size]byte, error) { + q := `SELECT digest from records WHERE anchor_merkle = $1` + + rows, err := pg.db.Query(q, merkle) + if err != nil { + return nil, err + } + defer rows.Close() + + var digests []*[sha256.Size]byte + for rows.Next() { + var rawDigest []byte + err = rows.Scan(&rawDigest) + if err != nil { + return nil, err + } + var digest [sha256.Size]byte + copy(digest[:], rawDigest[:]) + digests = append(digests, &digest) + } + return digests, rows.Err() +} + +// getDigestsByTimestamp accepts a timestamp, selects all digests from +// records table using given timestamp, converts them to arrays([sha256.Size]) +// and then finally returns the result as array of pointers +func (pg *Postgres) getDigestsByTimestamp(ts int64) ([]*[sha256.Size]byte, error) { + q := `SELECT digest from records WHERE collection_timestamp = $1` + + rows, err := pg.db.Query(q, ts) + if err != nil { + return nil, err + } + defer rows.Close() + + var digests []*[sha256.Size]byte + for rows.Next() { + var rawDigest []byte + err = rows.Scan(&rawDigest) + if err != nil { + return nil, err + } + var digest [sha256.Size]byte + copy(digest[:], rawDigest[:]) + digests = append(digests, &digest) + } + return digests, rows.Err() +} + +// getUnflushedTimestamps accepts current server timestamp and queries records +// table to find all timestamps which aren't flushed yet - has no anchoring +// information +func (pg *Postgres) getUnflushedTimestamps(current int64) ([]int64, error) { + q := `SELECT DISTINCT collection_timestamp FROM records + WHERE collection_timestamp != $1 AND anchor_merkle IS NULL` + + rows, err := pg.db.Query(q, current) + if err != nil { + return nil, err + } + defer rows.Close() + + var ts int64 + tss := []int64{} + for rows.Next() { + err = rows.Scan(&ts) + if err != nil { + return nil, err + } + tss = append(tss, ts) + } + return tss, rows.Err() +} + +// getRecordsByServerTs accepts a server collection timestamps and returns +// all records timetamped during that timestamp cycle, additionally it returns +// the anchor information in case the timestamp's digests anchored on the +// blockchain +func (pg *Postgres) getRecordsByServerTs(ts int64) (bool, []*AnchoredRecord, error) { + q := `SELECT r.anchor_merkle, an.tx_hash, an.chain_timestamp, r.digest, + an.flush_timestamp + FROM records as r + LEFT JOIN anchors as an + on r.anchor_merkle = an.merkle + WHERE r.collection_timestamp = $1` + + rows, err := pg.db.Query(q, ts) + if err != nil { + return false, nil, err + } + defer rows.Close() + + var ( + mr []byte + digest []byte + txHash []byte + chainTs sql.NullInt64 + flushTs int64 + ) + r := []*AnchoredRecord{} + for rows.Next() { + ar := AnchoredRecord{ + Record: Record{ + CollectionTimestamp: ts, + }, + } + err = rows.Scan(&mr, &txHash, &chainTs, &digest, &flushTs) + if err != nil { + return false, nil, err + } + ar.Record.Digest = digest + ar.Anchor = Anchor{ + Merkle: mr, + TxHash: txHash, + FlushTimestamp: flushTs, + } + // chainTs can be NULL - handle safely + if chainTs.Valid { + ar.Anchor.ChainTimestamp = chainTs.Int64 + } + + r = append(r, &ar) + } + + return len(r) > 0, r, rows.Err() +} + +// getRecordByDigest accepts apointer to an AnchoredRecord which initially +// includes only the record hash, it queries the db to get digest's data +// including anchor's data if hash is anchored, it returns a bool to indicate +// wether digest was found on db or not +func (pg *Postgres) getRecordByDigest(ar *AnchoredRecord) (bool, error) { + q := `SELECT r.anchor_merkle, r.collection_timestamp, an.tx_hash, + an.chain_timestamp + FROM records as r + LEFT JOIN anchors as an + ON r.anchor_merkle = an.merkle + WHERE r.digest = $1` + + rows, err := pg.db.Query(q, ar.Record.Digest) + if err != nil { + return false, err + } + defer rows.Close() + + var chainTs sql.NullInt64 + for rows.Next() { + err = rows.Scan(&ar.Anchor.Merkle, &ar.Record.CollectionTimestamp, &ar.Anchor.TxHash, &chainTs) + if err != nil { + return false, err + } + // chainTs can be NULL - handle safely + if chainTs.Valid { + ar.Anchor.ChainTimestamp = chainTs.Int64 + } + if !bytes.Equal(ar.Anchor.Merkle, []byte{}) { + hashes, err := pg.getDigestsByMerkleRoot(ar.Anchor.Merkle) + if err != nil { + return false, err + } + var digest [sha256.Size]byte + copy(digest[:], ar.Record.Digest[:]) + + // That pointer better not be nil! + ar.MerklePath = *merkle.AuthPath(hashes, &digest) + } + return true, nil + } + + return false, rows.Err() +} + +// hasTable accepts a table name and checks if it was created +func (pg *Postgres) hasTable(name string) (bool, error) { + q := `SELECT EXISTS (SELECT + FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = $1)` + + rows, err := pg.db.Query(q, name) + if err != nil { + return false, err + } + defer rows.Close() + + var exists bool + for rows.Next() { + err = rows.Scan(&exists) + if err != nil { + return false, err + } + } + return exists, rows.Err() +} + +// isDigestExists accept a digest and checks if it's already exists in +// records table +func (pg *Postgres) isDigestExists(hash []byte) (bool, error) { + q := `SELECT EXISTS + (SELECT FROM records WHERE digest = $1)` + + rows, err := pg.db.Query(q, hash) + if err != nil { + return false, err + } + defer rows.Close() + + var exists bool + for rows.Next() { + err = rows.Scan(&exists) + if err != nil { + return false, err + } + } + return exists, rows.Err() +} + +// createAnchorsTable creates anchors table +func (pg *Postgres) createAnchorsTable() error { + _, err := pg.db.Exec(`CREATE TABLE public.anchors +( + merkle bytea NOT NULL UNIQUE, + tx_hash bytea UNIQUE, + chain_timestamp bigint, + flush_timestamp bigint, + CONSTRAINT anchors_pkey PRIMARY KEY (merkle) +); +-- Index: idx_chain_timestamp +CREATE INDEX idx_chain_timestamp + ON public.anchors USING btree + (chain_timestamp ASC NULLS LAST) + TABLESPACE pg_default; +-- Index: idx_flush_timestamp +CREATE INDEX idx_flush_timestamp + ON public.anchors USING btree + (flush_timestamp ASC NULLS LAST) + TABLESPACE pg_default; +-- Index: idx_merkle +CREATE UNIQUE INDEX idx_merkle + ON public.anchors USING btree + (merkle ASC NULLS LAST) + TABLESPACE pg_default; +-- Index: idx_tx_hash +CREATE UNIQUE INDEX idx_tx_hash + ON public.anchors USING btree + (tx_hash ASC NULLS LAST) + TABLESPACE pg_default; +`) + if err != nil { + return err + } + log.Infof("Anchors table created") + return nil +} + +// createRecordsTable creates records table +func (pg *Postgres) createRecordsTable() error { + _, err := pg.db.Exec(`CREATE TABLE public.records +( + digest bytea NOT NULL UNIQUE, + anchor_merkle bytea, + collection_timestamp bigint NOT NULL, + CONSTRAINT records_pkey PRIMARY KEY (digest), + CONSTRAINT records_anchors_fkey FOREIGN KEY (anchor_merkle) + REFERENCES public.anchors (merkle) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE NO ACTION + NOT VALID +); + +-- Index: fki_records_anchors_fkey +CREATE INDEX fki_records_anchors_fkey + ON public.records USING btree + (anchor_merkle ASC NULLS LAST) + TABLESPACE pg_default; +-- Index: idx_collection_timestamp +CREATE INDEX idx_collection_timestamp + ON public.records USING btree + (collection_timestamp ASC NULLS LAST) + TABLESPACE pg_default; +-- Index: idx_digest +CREATE UNIQUE INDEX idx_digest + ON public.records USING btree + (digest ASC NULLS LAST) + TABLESPACE pg_default; +`) + if err != nil { + return err + } + log.Infof("Records table created") + return nil +} + +// createsTables creates db tables needed for our postgres backend +// implementation +func (pg *Postgres) createTables() error { + exists, err := pg.hasTable(tableAnchors) + if err != nil { + return err + } + if !exists { + err = pg.createAnchorsTable() + if err != nil { + return err + } + } + exists, err = pg.hasTable(tableRecords) + if err != nil { + return err + } + if !exists { + err = pg.createRecordsTable() + if err != nil { + return err + } + } + return nil +} diff --git a/dcrtimed/backend/postgres/testpostgres/postgres_test.go b/dcrtimed/backend/postgres/testpostgres/postgres_test.go new file mode 100644 index 0000000..e3db0d8 --- /dev/null +++ b/dcrtimed/backend/postgres/testpostgres/postgres_test.go @@ -0,0 +1,279 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package testpostgres + +import ( + "bytes" + "crypto/sha256" + "testing" + "time" + + "github.com/decred/dcrtime/dcrtimed/backend" +) + +func TestGetTimestamp(t *testing.T) { + tp := New() + + // we want to verify collections as well + tp.enableCollections = true + + // Put batch success in current time + var hashes [][sha256.Size]byte + count := 10 + for i := 0; i < count; i++ { + hash := [sha256.Size]byte{} + hash[0] = byte(i) + hashes = append(hashes, hash) + } + timestamp, me, err := tp.Put(hashes) + if err != nil { + t.Fatal(err) + } + if len(me) != count { + t.Fatalf("expected %v multi error", count) + } + + // Get invalid timestamp+1, timestamp+2, timestamp+3 + gtmes, err := tp.GetTimestamps([]int64{timestamp + 1, timestamp + 2, + timestamp + 3}) + if err != nil { + t.Fatal(err) + } + if len(gtmes) != 3 { + t.Fatalf("expected 3 gtmes got %v", len(gtmes)) + } + for _, gtme := range gtmes { + if gtme.ErrorCode != backend.ErrorNotFound { + t.Fatalf("expected ErrorNotFound got %v", + gtme.ErrorCode) + } + } + + // Get invalid timestamp+1, timestamp+2, timestamp+3 and valid timestamp + gtmes, err = tp.GetTimestamps([]int64{timestamp + 1, timestamp + 2, + timestamp + 3, timestamp}) + if err != nil { + t.Fatal(err) + } + if len(gtmes) != 4 { + t.Fatalf("expected 4 gtmes got %v", len(gtmes)) + } + for i, gtme := range gtmes { + if i < len(gtmes)-1 && gtme.ErrorCode != backend.ErrorNotFound { + t.Fatalf("expected ErrorNotFound got %v", + gtme.ErrorCode) + } + if i == len(gtmes)-1 && gtme.ErrorCode != backend.ErrorOK { + t.Fatalf("expected ErrorOK got %v", gtme.ErrorCode) + } + } + + // Get with timestamp + gtmes, err = tp.GetTimestamps([]int64{timestamp}) + if err != nil { + t.Fatal(err) + } + if len(gtmes) != 1 { + t.Fatalf("expected 1 gtmes got %v", len(gtmes)) + } + gtme := gtmes[0] + // Verify we got all the bits back. + if len(gtme.Digests) != count { + t.Fatalf("expected %v digests got %v", count, len(gtme.Digests)) + } + exists := make(map[byte]struct{}) + for _, digest := range gtme.Digests { + if _, ok := exists[digest[0]]; ok { + t.Fatalf("dup %v", digest[0]) + } + exists[digest[0]] = struct{}{} + } + if len(exists) != count { + t.Fatalf("expected %v exists got %v", count, len(exists)) + } + + // Move time forward and flush + tp.myNow = func() time.Time { + return time.Unix(timestamp, 0).Add(tp.duration) + } + + // Flush current container to global database. + err = tp.flush(timestamp) + if err != nil { + t.Fatal(err) + } + + // Get timestamp again despite not being current + gtmes, err = tp.GetTimestamps([]int64{timestamp}) + if err != nil { + t.Fatal(err) + } + if len(gtmes) != 1 { + t.Fatalf("expected 1 gtmes got %v", len(gtmes)) + } + gtme = gtmes[0] + + // Verify we got all the bits back. + if len(gtme.Digests) != count { + t.Fatalf("expected %v digests got %v", count, len(gtme.Digests)) + } + if bytes.Equal(gtme.MerkleRoot[:], []byte{}) { + t.Fatalf("expected non empty merkle root got %x", gtme.MerkleRoot) + } + exists = make(map[byte]struct{}) + for _, digest := range gtme.Digests { + if _, ok := exists[digest[0]]; ok { + t.Fatalf("dup %v", digest[0]) + } + exists[digest[0]] = struct{}{} + } + if len(exists) != count { + t.Fatalf("expected %v exists got %v", count, len(exists)) + } +} + +func TestGetDigests(t *testing.T) { + tp := New() + + timestamp := tp.now().Unix() + tp.myNow = func() time.Time { + return time.Unix(timestamp, 0) + } + + // Put batch success in current time + var hashes [][sha256.Size]byte + count := 10 + for i := 0; i < count; i++ { + hash := [sha256.Size]byte{} + hash[0] = byte(i) + hashes = append(hashes, hash) + } + + _, me, err := tp.Put(hashes) + if err != nil { + t.Fatal(err) + } + if len(me) != count { + t.Fatalf("expected %v multi error", count) + } + + grs, err := tp.Get(hashes) + if err != nil { + t.Fatal(err) + } + if len(grs) != count { + t.Fatalf("expected %v GetResult, got %v", count, len(grs)) + } + + for i, gr := range grs { + if !bytes.Equal(gr.Digest[:], hashes[i][:]) { + t.Fatalf("invalid digest got %x want %x", + gr.Digest[:], hashes[i][:]) + } + } + + // Get mixed success and failure + for i := count; i < count*2; i++ { + hash := [sha256.Size]byte{} + hash[0] = byte(i) + hashes = append(hashes, hash) + } + + grs, err = tp.Get(hashes) + if err != nil { + t.Fatal(err) + } + if len(grs) != count*2 { + t.Fatalf("expected %v GetResult", count*2) + } + + for i, gr := range grs { + if i < count-1 && (!bytes.Equal(gr.Digest[:], hashes[i][:]) || + gr.ErrorCode != backend.ErrorOK) { + t.Fatalf("invalid digest got %x want %x ErrorCode "+ + "got %v want %v", gr.Digest[:], hashes[i][:], + gr.ErrorCode, backend.ErrorOK) + } + if i >= count && gr.ErrorCode != backend.ErrorNotFound { + t.Fatalf("invalid ErrorCode got %x want %x", + gr.ErrorCode, backend.ErrorNotFound) + } + } + + // Flush and repeat mixed success and failure + + // Flush current container + err = tp.flush(timestamp) + if err != nil { + t.Fatal(err) + } + + grs, err = tp.Get(hashes) + if err != nil { + t.Fatal(err) + } + if len(grs) != count*2 { + t.Fatalf("expected %v GetResult", count*2) + } + + // Validate returned merkle root + for i, gr := range grs { + if i < count-1 && (!bytes.Equal(gr.Digest[:], hashes[i][:]) || + bytes.Equal(gr.MerkleRoot[:], []byte{})) { + t.Fatalf("invalid digest got %x want %x Merkle %x", gr.Digest[:], + hashes[i][:], gr.MerkleRoot[:]) + } + if i >= count && gr.ErrorCode != backend.ErrorNotFound { + t.Fatalf("invalid ErrorCode got %x want %x", + gr.ErrorCode, backend.ErrorNotFound) + } + } +} + +func TestPut(t *testing.T) { + tp := New() + + // Put batch success in current time + var hashes [][sha256.Size]byte + count := 10 + for i := 0; i < count; i++ { + hash := [sha256.Size]byte{} + hash[0] = byte(i) + hashes = append(hashes, hash) + } + + _, me, err := tp.Put(hashes) + if err != nil { + t.Fatal(err) + } + if len(me) != count { + t.Fatalf("expected %v multi error", count) + } + + // Verify all return codes + for _, m := range me { + if m.ErrorCode != backend.ErrorOK { + t.Fatalf("expected ErrorCode %v got %v", + backend.ErrorOK, m.ErrorCode) + } + } + + // Try again, now we expect count ErrorExists. + _, me, err = tp.Put(hashes) + if err != nil { + t.Fatal(err) + } + if len(me) != count { + t.Fatalf("expected %v multi error", count) + } + + // Verify all return codes + for _, m := range me { + if m.ErrorCode != backend.ErrorExists { + t.Fatalf("expected ErrorCode %v got %v", + backend.ErrorExists, m.ErrorCode) + } + } +} diff --git a/dcrtimed/backend/postgres/testpostgres/testpostgres.go b/dcrtimed/backend/postgres/testpostgres/testpostgres.go new file mode 100644 index 0000000..9e4105b --- /dev/null +++ b/dcrtimed/backend/postgres/testpostgres/testpostgres.go @@ -0,0 +1,250 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package testpostgres + +import ( + "crypto/sha256" + "encoding/hex" + "os" + "sync" + "time" + + "github.com/decred/dcrtime/dcrtimed/backend" + "github.com/decred/dcrtime/dcrtimed/backend/postgres" + "github.com/decred/dcrtime/merkle" +) + +var duration = time.Minute // Default how often we combine digests + +// TestPostgres provides a implementation of the backend interface that stores +// records in memory and that can be used for testing. +type TestPostgres struct { + sync.RWMutex + + myNow func() time.Time // Override time.Now() + duration time.Duration // How often we combine digests + + enableCollections bool // Set to true to enable collection query + + // in memory data + records map[string]postgres.Record //[hash]Record + anchors map[string]postgres.Anchor //[merkle]Anchor +} + +func (tp *TestPostgres) getRecordsByServerTs(ts int64) (bool, []postgres.Record) { + rs := []postgres.Record{} + + for _, r := range tp.records { + if r.CollectionTimestamp == ts { + rs = append(rs, r) + } + } + return len(rs) > 0, rs +} + +func (tp *TestPostgres) getRecordByDigest(hash []byte) (postgres.Record, bool) { + r, exists := tp.records[hex.EncodeToString(hash)] + return r, exists +} + +func (tp *TestPostgres) insertAnchor(a postgres.Anchor) { + tp.anchors[hex.EncodeToString(a.Merkle)] = a +} + +func (tp *TestPostgres) updateRecordsAnchor(ts int64, merkle []byte) { + for _, r := range tp.records { + if r.CollectionTimestamp == ts { + r.AnchorMerkle = merkle + } + } +} + +// flush flushes all records associated with given timestamp. +// returns nil iff ts records flushed successfully +// +// This function must be called with the WRITE lock held +func (tp *TestPostgres) flush(ts int64) error { + // Get timestamp's digests + _, rs := tp.getRecordsByServerTs(ts) + + if len(rs) == 0 { + // This really should not happen + } + + // Convert bytes slices to sha256 arrays + var digests []*[sha256.Size]byte + for _, r := range rs { + var digest [sha256.Size]byte + copy(digest[:], r.Digest[:]) + digests = append(digests, &digest) + } + + // Generate merkle + mt := merkle.Tree(digests) + // Last element is root + root := *mt[len(mt)-1] + a := postgres.Anchor{ + Merkle: root[:], + FlushTimestamp: time.Now().Unix(), + } + + // Insert anchor data into db + tp.insertAnchor(a) + + // Update timestamp's records merkle root + tp.updateRecordsAnchor(ts, a.Merkle) + + return nil +} + +// GetTimestamps retrieves the digests for a given timestamp. +func (tp *TestPostgres) GetTimestamps(timestamps []int64) ([]backend.TimestampResult, error) { + gtmes := make([]backend.TimestampResult, 0, len(timestamps)) + + tp.RLock() + defer tp.RUnlock() + + for _, ts := range timestamps { + gtme := backend.TimestampResult{ + Timestamp: ts, + } + if tp.enableCollections { + exists, records := tp.getRecordsByServerTs(ts) + if !exists { + gtme.ErrorCode = backend.ErrorNotFound + } else { + gtme.ErrorCode = backend.ErrorOK + // copy ts digests + gtme.Digests = make([][sha256.Size]byte, 0, len(records)) + for _, r := range records { + var d [sha256.Size]byte + copy(d[:], r.Digest[:]) + gtme.Digests = append(gtme.Digests, d) + copy(gtme.MerkleRoot[:], r.AnchorMerkle[:]) + } + } + } else { + gtme.ErrorCode = backend.ErrorNotAllowed + } + gtmes = append(gtmes, gtme) + } + return gtmes, nil +} + +// Get returns timestamp information for given digests. +func (tp *TestPostgres) Get(digests [][sha256.Size]byte) ([]backend.GetResult, error) { + gdmes := make([]backend.GetResult, 0, len(digests)) + + tp.RLock() + defer tp.RUnlock() + + for _, digest := range digests { + gdme := backend.GetResult{ + Digest: digest, + } + r, exists := tp.getRecordByDigest(digest[:]) + + if !exists { + gdme.ErrorCode = backend.ErrorNotFound + } else { + gdme.ErrorCode = backend.ErrorOK + gdme.Timestamp = r.CollectionTimestamp + copy(gdme.MerkleRoot[:], r.AnchorMerkle[:]) + } + gdmes = append(gdmes, gdme) + } + + return gdmes, nil +} + +// Put stores hashes and returns timestamp and associated errors. +func (tp *TestPostgres) Put(hashes [][sha256.Size]byte) (int64, []backend.PutResult, error) { + // Get current time rounded down. + ts := tp.now().Unix() + // Prep return + me := make([]backend.PutResult, 0, len(hashes)) + + tp.Lock() + defer tp.Unlock() + + for _, hash := range hashes { + d := hex.EncodeToString(hash[:]) + // Check if digest exists + _, exists := tp.getRecordByDigest(hash[:]) + if exists { + me = append(me, backend.PutResult{ + Digest: hash, + ErrorCode: backend.ErrorExists, + }) + continue + } + + dslice, err := hex.DecodeString(d) + if err != nil { + return 0, nil, err + } + // Add record to map + tp.records[d] = postgres.Record{ + CollectionTimestamp: ts, + Digest: dslice, + } + + // Mark as successful + me = append(me, backend.PutResult{ + Digest: hash, + ErrorCode: backend.ErrorOK, + }) + } + return ts, me, nil +} + +// now returns current time stamp rounded down to 1 hour. All timestamps are +// UTC. +func (tp *TestPostgres) now() time.Time { + return tp.truncate(tp.myNow().UTC(), tp.duration) +} + +// truncate rounds time down to the provided duration. +func (tp *TestPostgres) truncate(t time.Time, d time.Duration) time.Time { + return t.Truncate(d) +} + +// Close is a stub to satisfy the backend interface. +func (tp *TestPostgres) Close() {} + +// Dump is a stub to satisfy the backend interface. +func (tp *TestPostgres) Dump(f *os.File, verbose bool) error { + return nil +} + +// Restore is a stub to satisfy the backend interface. +func Restore(f *os.File, verbose bool, location string) error { + return nil +} + +// Fsck is a stub to satisfy the backend interface. +func Fsck(options *backend.FsckOptions) error { + return nil +} + +// GetBalance is a stub to satisfy the backend interface. +func GetBalance() (*backend.GetBalanceResult, error) { + return nil, nil +} + +// LastAnchor is a stub to satisfy the backend interface. +func LastAnchor() (*backend.LastAnchorResult, error) { + return nil, nil +} + +// New returns a new testcache context. +func New() *TestPostgres { + return &TestPostgres{ + records: make(map[string]postgres.Record), + anchors: make(map[string]postgres.Anchor), + duration: duration, + myNow: time.Now, + } +} diff --git a/dcrtimed/config.go b/dcrtimed/config.go index 528b81e..336692b 100644 --- a/dcrtimed/config.go +++ b/dcrtimed/config.go @@ -6,7 +6,11 @@ package main import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" "fmt" + "io/ioutil" "net" "os" "path/filepath" @@ -40,6 +44,7 @@ var ( defaultHTTPSCertFile = filepath.Join(defaultHomeDir, "https.cert") defaultLogDir = filepath.Join(defaultHomeDir, defaultLogDirname) defaultAPIVersions = fmt.Sprintf("%v,%v", v1.APIVersion, v2.APIVersion) + defaultBackend = "filesystem" ) // runServiceCommand is only set to a real function on Windows. It is used @@ -73,6 +78,11 @@ type config struct { EnableCollections bool `long:"enablecollections" description:"Allow clients to query collection timestamps."` APITokens []string `long:"apitoken" description:"Token used to grant access to privileged API resources"` APIVersions string `long:"apiversions" description:"Enables API versions on the daemon"` + Backend string `long:"backend" description:"Sets the cache layer type 'filesystem'/'postgres'"` + PostgresHost string `long:"postgreshost" description:"Postgres ip:port"` + PostgresRootCert string `long:"postgresrootcert" description:"File containing the CA certificate for postgres"` + PostgresCert string `long:"postgrescert" description:"File containing the dcrtimed client certificate for postgres"` + PostgresKey string `long:"postgreskey" description:"File containing the dcrtimed client certificate key for postgres"` } // serviceOptions defines the configuration options for the daemon as a service @@ -278,6 +288,7 @@ func loadConfig() (*config, []string, error) { HTTPSCert: defaultHTTPSCertFile, Version: version(), APIVersions: defaultAPIVersions, + Backend: defaultBackend, } // Service options which are only added on Windows. @@ -543,6 +554,49 @@ func loadConfig() (*config, []string, error) { cfg.APITokens = validTokens } + if cfg.Backend == "postgres" { + switch { + case cfg.PostgresHost == "": + return nil, nil, fmt.Errorf("postgres backend can " + + "not be used without the postgreshost param") + case cfg.PostgresRootCert == "": + return nil, nil, fmt.Errorf("postgres backend can " + + "not be used without the postgresrootcert param") + case cfg.PostgresCert == "": + return nil, nil, fmt.Errorf("postgres backend can " + + "not be used without the postgrescert param") + case cfg.PostgresKey == "": + return nil, nil, fmt.Errorf("postgres backend can " + + "not be used without the postgreskey param") + } + + cfg.PostgresRootCert = cleanAndExpandPath(cfg.PostgresRootCert) + cfg.PostgresCert = cleanAndExpandPath(cfg.PostgresCert) + cfg.PostgresKey = cleanAndExpandPath(cfg.PostgresKey) + + // Validate cache root cert. + b, err := ioutil.ReadFile(cfg.PostgresRootCert) + if err != nil { + return nil, nil, fmt.Errorf("read postgresrootcert: %v", err) + } + block, _ := pem.Decode(b) + if block == nil { + return nil, nil, fmt.Errorf("%s is not a valid certificate", + cfg.PostgresRootCert) + } + _, err = x509.ParseCertificate(block.Bytes) + if err != nil { + return nil, nil, fmt.Errorf("parse postgresrootcert: %v", err) + } + + // Validate cache key pair. + _, err = tls.LoadX509KeyPair(cfg.PostgresCert, cfg.PostgresKey) + if err != nil { + return nil, nil, fmt.Errorf("load key pair postgrescert "+ + "and cachekey: %v", err) + } + } + // Warn about missing config file only after all other configuration is // done. This prevents the warning on help messages and invalid // options. Note this should go directly before the return. diff --git a/dcrtimed/dcrtimed.go b/dcrtimed/dcrtimed.go index 1135fa9..d54cd47 100644 --- a/dcrtimed/dcrtimed.go +++ b/dcrtimed/dcrtimed.go @@ -26,6 +26,7 @@ import ( v2 "github.com/decred/dcrtime/api/v2" "github.com/decred/dcrtime/dcrtimed/backend" "github.com/decred/dcrtime/dcrtimed/backend/filesystem" + "github.com/decred/dcrtime/dcrtimed/backend/postgres" "github.com/decred/dcrtime/util" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -1118,7 +1119,7 @@ func (d *DcrtimeStore) verifyV2(w http.ResponseWriter, r *http.Request) { return } - // Translate digest results. + // Translate digest result. var dReply v2.VerifyDigest if len(drs) != 0 { dr := drs[len(drs)-1] @@ -1364,15 +1365,40 @@ func _main() error { } } else { // Setup backend. - filesystem.UseLogger(fsbeLog) - b, err := filesystem.New(loadedCfg.DataDir, - loadedCfg.WalletCert, - loadedCfg.WalletHost, - loadedCfg.EnableCollections, - []byte(loadedCfg.WalletPassphrase)) - - if err != nil { - return err + var b backend.Backend + switch loadedCfg.Backend { + case "filesystem": + filesystem.UseLogger(fsbeLog) + b, err = filesystem.New(loadedCfg.DataDir, + loadedCfg.WalletCert, + loadedCfg.WalletHost, + loadedCfg.EnableCollections, + []byte(loadedCfg.WalletPassphrase)) + if err != nil { + return err + } + case "postgres": + postgres.UseLogger(pgbeLog) + var net string + switch loadedCfg.TestNet { + case true: + net = "testnet" + default: + net = "mainnet" + } + b, err = postgres.New( + loadedCfg.PostgresHost, + net, + loadedCfg.PostgresRootCert, + loadedCfg.PostgresCert, + loadedCfg.PostgresKey, + loadedCfg.WalletCert, + loadedCfg.WalletHost, + loadedCfg.EnableCollections, + []byte(loadedCfg.WalletPassphrase)) + if err != nil { + return err + } } d.backend = b diff --git a/dcrtimed/log.go b/dcrtimed/log.go index 115005b..cffe111 100644 --- a/dcrtimed/log.go +++ b/dcrtimed/log.go @@ -42,12 +42,14 @@ var ( log = backendLog.Logger("DCRT") fsbeLog = backendLog.Logger("FSBE") + pgbeLog = backendLog.Logger("PGBE") ) // subsystemLoggers maps each subsystem identifier to its associated logger. var subsystemLoggers = map[string]slog.Logger{ "DCRT": log, "FSBE": fsbeLog, + "PGBE": pgbeLog, } // initLogRotator initializes the logging rotater to write logs to logFile and diff --git a/dcrtimed/sample-dcrtimed.conf b/dcrtimed/sample-dcrtimed.conf index c55fd87..0bf652f 100644 --- a/dcrtimed/sample-dcrtimed.conf +++ b/dcrtimed/sample-dcrtimed.conf @@ -41,3 +41,18 @@ ; API Versions is a comma-separated list of versions to enable support on the daemon. ;apiversions=1,2 + +; Backend used to set type of cache, possbile values: 'filesystem', 'postgres' +;backend=filesystem + +; PostgreSQL connection information, required when using working with postgres +; backend +; +; PostgreSQL host +;postgreshost=localhost:5432 +; Postgres root cert +;postgresrootcert="~/.postgresql/certs/clients/dcrtimed/root.crt" +; Postgres client cert +;postgrescert="~/.postgresql/certs/clients/dcrtimed/client.dcrtimed.crt" +; Postgres client key +;postgreskey="~/.postgresql/certs/clients/dcrtimed/client.dcrtimed.key" diff --git a/go.mod b/go.mod index ed66d1b..07c1e9c 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/jessevdk/go-flags v1.4.0 github.com/jrick/logrotate v1.0.0 github.com/kr/pretty v0.1.0 // indirect + github.com/lib/pq v1.8.0 github.com/robfig/cron v1.2.0 github.com/syndtr/goleveldb v1.0.0 google.golang.org/grpc v1.27.1 diff --git a/go.sum b/go.sum index c2d2ae3..17bf957 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= +github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= diff --git a/scripts/postgrescerts.sh b/scripts/postgrescerts.sh new file mode 100755 index 0000000..ffb8ab0 --- /dev/null +++ b/scripts/postgrescerts.sh @@ -0,0 +1,149 @@ +#!/usr/bin/env bash +# This script creates the certificates required to run a PostgreSQL node +# locally. This includes creating a CA certificate, a node certificate, one +# root client to connect via cli, and one more client certificate for dcrtimed +# +# NOTE: this scripts creates and copies over the server files (server.crt, +# server.key & root.crt) to postgrees data dir, is uses $PGDATA environment +# variable to determine where to copy the files to, make sure it's exported +# before running the script. +# when done creating & moving certs this script restarts postgres server +# in order to load created server certs. +# +# More information on PostgreSQL ssl connection usage can be found at: +# https://www.postgresql.org/docs/9.5/ssl-tcp.html + +set -ex + +# Database usernames +readonly USER_DCRTIMED="dcrtimed" + +# POSTGRES_DIR is where all of the certificates will be created. +POSTGRES_DIR=$1 +if [ "${POSTGRES_DIR}" == "" ]; then + POSTGRES_DIR="${HOME}/.postgresql" +fi + +# Create postgresdb clients directories. +mkdir -p "${POSTGRES_DIR}/certs/clients/root" +mkdir -p "${POSTGRES_DIR}/certs/clients/${USER_DCRTIMED}" + +# Create a CA private key +echo "Generating root.key, please type a password:" +openssl genrsa -des3 -out root.key 4096 +# Remove passphrase +echo "Removing root.key password, please re-type it:" +openssl rsa -in root.key -out root.key -passout pass:123 + +# Create a root Certificate Authority (CA) +openssl \ + req -new -x509 \ + -days 365 \ + -subj "/CN=CA" \ + -key root.key \ + -out root.crt + +# Create server key +echo "Generating server.key, please type a password:" +openssl genrsa -des3 -out server.key 4096 -passout pass:123 +#Remove a passphrase +echo "Removing server.key password, please re-type it:" +openssl rsa -in server.key -out server.key -passout pass:123 + +# Create a root certificate signing request +openssl \ + req -new \ + -key server.key \ + -subj "/CN=localhost" \ + -text \ + -out server.csr + +# Create server certificate +openssl \ + x509 -req \ + -in server.csr \ + -text \ + -days 365 \ + -CA root.crt \ + -CAkey root.key \ + -CAcreateserial \ + -out server.crt + +# Copy server.key, server.crt & root.crt to postgres' data dir as discribed in +# PostgresSQL ssl connection documentation, it uses environment variable PGDATA +# as postgres' data dir +echo "Copying server.key server.crt root.crt to $PGDATA as postgres sys user" +sudo -u postgres cp server.key server.crt root.crt $PGDATA + +# Create root client key - used to connect via cli +openssl genrsa -out client.root.key 4096 +# Remove passphrase +openssl rsa -in client.root.key -out client.root.key + +chmod og-rwx client.root.key + +# Create client certificate signing request +# Note: CN should be equal to db username +openssl \ + req -new \ + -key client.root.key \ + -subj "/CN=postgres" \ + -out client.root.csr + +# Create client certificate +openssl \ + x509 -req \ + -in client.root.csr \ + -CA root.crt \ + -CAkey root.key \ + -CAcreateserial \ + -days 365 \ + -text \ + -out client.root.crt + +# Copy client to certs dir +cp client.root.key client.root.crt root.crt \ + ${POSTGRES_DIR}/certs/clients/root + +# Create client key for dcrtimed +openssl genrsa -out client.${USER_DCRTIMED}.key 4096 +# Remove passphrase +openssl rsa -in client.${USER_DCRTIMED}.key -out client.${USER_DCRTIMED}.key + +chmod og-rwx client.${USER_DCRTIMED}.key + +# Create client certificate signing request +# Note: CN should be equal to db username +openssl \ + req -new \ + -key client.${USER_DCRTIMED}.key \ + -subj "/CN=${USER_DCRTIMED}" \ + -out client.${USER_DCRTIMED}.csr + +# Create client certificate +openssl \ + x509 -req \ + -in client.${USER_DCRTIMED}.csr \ + -CA root.crt \ + -CAkey root.key \ + -CAcreateserial \ + -days 365 \ + -text \ + -out client.${USER_DCRTIMED}.crt + +# Copy client to certs dir +cp client.${USER_DCRTIMED}.key client.${USER_DCRTIMED}.crt root.crt \ + ${POSTGRES_DIR}/certs/clients/${USER_DCRTIMED} + +# "On Unix systems, the permissions on +# server.key must disallow any access to world or group" +# Source: PostgresSQL docs - link above +# +sudo chmod og-rwx $PGDATA/server.key +sudo chmod og-rwx $POSTGRES_DIR/certs/clients/${USER_DCRTIMED}/client.${USER_DCRTIMED}.key + +# Cleanup +rm *.crt *.key *.srl *.csr + +# Restart postgres to load server certs +sudo -u postgres pg_ctl -D $PGDATA restart diff --git a/scripts/postgressetup.sh b/scripts/postgressetup.sh new file mode 100755 index 0000000..849c070 --- /dev/null +++ b/scripts/postgressetup.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash +# This script sets up the PostgresSQL databases for the dcrtimed cache and +# assigns user privileges. +# This script requires that you have already created PostgresSQL certificates +# using the postgrescerts.sh script and that you have a PostgresSQL instance +# listening on the default port localhost:5432. + +set -ex + +# POSTGRES_DIR must be the same directory that was used with the +# postgrescerts.sh script. +POSTGRES_DIR=$1 +if [ "${POSTGRES_DIR}" == "" ]; then + POSTGRES_DIR="${HOME}/.postgresql" +fi + +# ROOT_CERTS_DIR must contain client.root.crt, client.root.key, and root.crt. +readonly ROOT_CERTS_DIR="${POSTGRES_DIR}/certs/clients/root" + +if [ ! -f "${ROOT_CERTS_DIR}/client.root.crt" ]; then + >&2 echo "error: file not found ${ROOT_CERTS_DIR}/client.root.crt" + exit +elif [ ! -f "${ROOT_CERTS_DIR}/client.root.key" ]; then + >&2 echo "error: file not found ${ROOT_CERTS_DIR}/client.root.key" + exit +elif [ ! -f "${ROOT_CERTS_DIR}/root.crt" ]; then + >&2 echo "error: file not found ${ROOT_CERTS_DIR}/root.crt" + exit +fi + +# Database names. +readonly DB_MAINNET="mainnet_dcrtime" +readonly DB_TESTNET="testnet_dcrtime" + +# Database usernames. +readonly USER_DCRTIMED="dcrtimed" + +# Psql connection string +readonly CONNECTION_STRING="host=localhost \ + sslmode=verify-full \ + sslrootcert=${ROOT_CERTS_DIR}/root.crt \ + sslcert=${ROOT_CERTS_DIR}/client.root.crt \ + sslkey=${ROOT_CERTS_DIR}/client.root.key \ + port=5432 \ + user=postgres" + +# Create the mainnet and testnet databases for the dcrtimed records cache. +psql "$CONNECTION_STRING" \ + -c "CREATE DATABASE ${DB_MAINNET}" + +psql "$CONNECTION_STRING" \ + -c "CREATE DATABASE ${DB_TESTNET}" + +# Create the dcrtimed user(if not exists) and assign privileges. +psql "$CONNECTION_STRING" \ + -c "DO \$\$ +BEGIN +CREATE USER ${USER_DCRTIMED}; +EXCEPTION WHEN duplicate_object THEN RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE; +END +\$\$;"; + +psql "$CONNECTION_STRING" \ + -c "GRANT CREATE \ + ON DATABASE ${DB_MAINNET} TO ${USER_DCRTIMED}" + +psql "$CONNECTION_STRING" \ + -c "GRANT CREATE \ + ON DATABASE ${DB_TESTNET} TO ${USER_DCRTIMED}"