Event Listeners
Event Listeners allow nodes to listen to external systems and automatically vote on resolutions. This is primarily used for syncing data from external event-driven systems, such as Ethereum. It can also be used to poll external APIs for data, as long as the exact data can eventually be agreed upon by the requisite threshold for the resolution to pass.
Implementing an Event Listener
Event Listeners are long-running processes that are made to subscribe to or poll an external system for data. When the listener receives data, it can persist the data in the event store. The Kwil node will handle broadcasting the event to the network and voting on the resolution.
ListenFunc
The listener can be implemented using the ListenFunc
type:
type ListenFunc func(ctx context.Context, service *common.Service, eventstore EventStore) error
Each registered ListenFunc
functions will be run once when either:
- A validator comes online and has synced with the network.
- A non-validating node that is in-sync with the network is promoted to a validator.
The function is expected to return with a nil error when the context.Context
is cancelled.
The context.Context
will be canceled when either:
- A validator is demoted, and is no longer a validator.
- A node shuts down.
If the function returns with a non-nil error at any time, the node will log the error and shut down. It is therefore critically important to handle all errors.
ListenFunc
functions should generally follow the following structure:
package main
import (
"context"
"time"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
)
func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// do some initialization
for {
select {
case <-ctx.Done():
return nil
case <-time.After(5 * time.Second):
// poll external system and broadcast to the event store
}
}
}
EventStore
The EventStore
allows event listeners to persist seen events on their local node. Once persisted, the local node will broadcast the event to the network when appropriate.
The EventStore
also provides Set
, Get
, and Delete
methods for storing key-value metadata. None of the data stored in Set
or Delete
will be broadcast to the network.
The methods merely exist to aid the listener in keeping track of local state in case of a crash or restart.
When broadcasting an event, the eventType
should correspond to a registered resolution type. The data
argument will be passed to the resolution's ResolveFunc
if enough nodes vote on the resolution.
// EventStore is an interface that allows listeners to persist events,
// and track arbitrary metadata about its external source. It should
// be used to signal to the local Kwil node that the validator has
// seen an event, and that the event should be broadcast to the
// network. Events should be broadcast to the network using the
// Broadcast method. The KV store data is never forwarded to the
// network, and is simply for the convenience of the listener
// implementer to persist metadata about the data source.
type EventStore interface {
// Broadcast broadcasts an event seen by the local node to the
// network. The eventType is a string that identifies the network
// should interpret the data. The eventType should directly
// correspond to a "resolution" type, implemented in the
// resolutions package. The data argument will be passed to the
// resolution's ResolveFunc if enough nodes vote on the resolution.
Broadcast(ctx context.Context, eventType string, data []byte) error
// Set sets a key-value pair in the KV store. The data is scoped
// to the local node, and is not broadcast to the network.
Set(ctx context.Context, key []byte, value []byte) error
// Get gets a value from the local node's KV store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Delete deletes a value from the local node's KV store. The
// data is scoped to the local node, and is not broadcast to the
// network.
Delete(ctx context.Context, key []byte) error
}
Registering the Listener
Listeners can be registered with the Kwil node by calling the RegisterListener
. This should always be called in a Golang init
function, to ensure
that the node can access all listeners when it starts:
package main
import (
"context"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
)
func init() {
err := listeners.RegisterListener("delete", func(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// implement your logic here
})
if err != nil {
panic(err)
}
}
Configuration
Since each node will be running its own instance of a listener, each node will also need its own configuration for connecting to the external service. For example,
if the listener is connecting to an Ethereum node, each node will need its own Ethereum RPC URL. Configurations can be set in the node's config.toml
File, under
[app.extensions]
:
# ...
#######################################################################
### Extension Configuration ###
#######################################################################
[app.extensions]
# custom listener config
[app.extensions.my_custom_listener]
key1 = "value1"
key2 = "value2"
# ...
Configurations can be accessed via the common.Service
object:
func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the custom listener config
config, ok := service.ExtensionConfigs["my_custom_listener"]
if !ok {
return errors.New("my_custom_listener config not found")
}
// config["key1"] == "value1"
// config["key2"] == "value2"
}
Best Practices
Event listeners are relatively complex, and thus require careful consideration to implement correctly. Below are some best practices to follow when implementing an event listener.
Transient Error Retries
Foreign network calls and event subscriptions often return transient errors that are fixed by simply retrying a call. It is important to handle these errors gracefully and retry the call with exponential backoff:
package main
import (
"context"
"time"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
"github.com/jpillora/backoff"
)
func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// do some initialization
b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 10 * time.Second,
Factor: 2,
Jitter: true,
}
for {
select {
case <-ctx.Done():
return nil
default:
retry(func() error {
// poll external system and broadcast to the event store
})
}
}
}
// retry will retry the function until it is successful
func retry(fn func() error) {
retrier := &backoff.Backoff{
Min: 1 * time.Second,
Max: 10 * time.Second,
Factor: 2,
Jitter: true,
}
for {
err := fn()
if err == nil {
return nil
}
time.Sleep(retrier.Duration())
}
}
External Metadata Tracking
When consuming from external systems, it may be important to track external metadata about the data source. This can prevent the listener from re-consuming
the same data after a crash or restart. The EventStore
provides a Set
, Get
, and Delete
method for storing key-value metadata.
For example, when consuming event logs from Ethereum, it may be desireable to track your last recorded block, to ensure that the node does not re-consume the entire Ethereum chain on startup:
func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the last block number we consumed
lastBlock, err := eventstore.Get(ctx, []byte("last_block"))
if err != nil {
return err
}
// do some initialization
for {
select {
case <-ctx.Done():
return nil
case <-time.After(5 * time.Second):
var newHeight int64
// get the latest unprocessed block using `lastBlock`
// ...
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(h))
// persist the last block number
err := eventstore.Set(ctx, []byte("last_block"), buf)
if err != nil {
return err
}
}
}
}
Optional Configuration
There are many cases where a node may not want to run a listener. It is not a hard requirement for a validator to run a listener, so some listener implementations may leave them to be optional. In this case, implementers can return a nil error if configuration is not found:
func init() {
err := listeners.RegisterListener("delete", func(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the custom listener config
config, ok := service.ExtensionConfigs["my_custom_listener"]
if !ok {
// the node operator has not configured the listener,
// but we can still allow them to run the node.
// return nil as to not shut down the node
return nil
}
// ...
})
if err != nil {
panic(err)
}
}
Failure On Misconfiguration
If a listener is improperly configured by a node operator, it may be helpful to shut down the node to alert the operator of the misconfiguration. Take this case where a node operator needs to configure a URL for the listener to connect to:
func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the custom listener config
config, ok := service.ExtensionConfigs["my_custom_listener"]
if !ok {
return errors.New("my_custom_listener config not found")
}
url, ok := config["url"]
if !ok {
return errors.New("config my_custom_listener.url not found")
}
}
Proper Resource Cleanup
Listeners are long-running processes, and thus need to be properly cleaned up when the node shuts down or when the node is demoted from its validator status. Failure to do so may result in resource leaks, unnecessary disk writes, or over-consumption of system resources. Here is an example of how to properly clean up resources:
func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// do some initialization
for {
select {
case <-ctx.Done():
// clean up resources
return nil
case <-time.After(5 * time.Second):
// poll external system and broadcast to the event store
}
}
}