Skip to main content

Event Listeners

Event Listeners allow nodes to listen to external systems and automatically vote on resolutions. This is primarily used for syncing data from external event-driven systems, such as Ethereum. It can also be used to poll external APIs for data, as long as the exact data can eventually be agreed upon by the requisite threshold for the resolution to pass.

Implementing an Event Listener

Event Listeners are long-running processes that are made to subscribe to or poll an external system for data. When the listener receives data, it can persist the data in the event store. The Kwil node will handle broadcasting the event to the network and voting on the resolution.

ListenFunc

The listener can be implemented using the ListenFunc type:

type ListenFunc func(ctx context.Context, service *common.Service, eventstore EventStore) error

Each registered ListenFunc functions will be run once when either:

  • A validator comes online and has synced with the network.
  • A non-validating node that is in-sync with the network is promoted to a validator.

The function is expected to return with a nil error when the context.Context is cancelled. The context.Context will be canceled when either:

  • A validator is demoted, and is no longer a validator.
  • A node shuts down.

If the function returns with a non-nil error at any time, the node will log the error and shut down. It is therefore critically important to handle all errors.

ListenFunc functions should generally follow the following structure:

package main

import (
"context"
"time"

"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
)

func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// do some initialization

for {
select {
case <-ctx.Done():
return nil
case <-time.After(5 * time.Second):
// poll external system and broadcast to the event store
}
}
}

EventStore

The EventStore allows event listeners to persist seen events on their local node. Once persisted, the local node will broadcast the event to the network when appropriate. The EventStore also provides Set, Get, and Delete methods for storing key-value metadata. None of the data stored in Set or Delete will be broadcast to the network. The methods merely exist to aid the listener in keeping track of local state in case of a crash or restart.

When broadcasting an event, the eventType should correspond to a registered resolution type. The data argument will be passed to the resolution's ResolveFunc if enough nodes vote on the resolution.

// EventStore is an interface that allows listeners to persist events,
// and track arbitrary metadata about its external source. It should
// be used to signal to the local Kwil node that the validator has
// seen an event, and that the event should be broadcast to the
// network. Events should be broadcast to the network using the
// Broadcast method. The KV store data is never forwarded to the
// network, and is simply for the convenience of the listener
// implementer to persist metadata about the data source.
type EventStore interface {
// Broadcast broadcasts an event seen by the local node to the
// network. The eventType is a string that identifies the network
// should interpret the data. The eventType should directly
// correspond to a "resolution" type, implemented in the
// resolutions package. The data argument will be passed to the
// resolution's ResolveFunc if enough nodes vote on the resolution.
Broadcast(ctx context.Context, eventType string, data []byte) error

// Set sets a key-value pair in the KV store. The data is scoped
// to the local node, and is not broadcast to the network.
Set(ctx context.Context, key []byte, value []byte) error
// Get gets a value from the local node's KV store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Delete deletes a value from the local node's KV store. The
// data is scoped to the local node, and is not broadcast to the
// network.
Delete(ctx context.Context, key []byte) error
}

Registering the Listener

Listeners can be registered with the Kwil node by calling the RegisterListener. This should always be called in a Golang init function, to ensure that the node can access all listeners when it starts:

package main

import (
"context"

"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
)

func init() {
err := listeners.RegisterListener("delete", func(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// implement your logic here
})
if err != nil {
panic(err)
}
}

Configuration

Since each node will be running its own instance of a listener, each node will also need its own configuration for connecting to the external service. For example, if the listener is connecting to an Ethereum node, each node will need its own Ethereum RPC URL. Configurations can be set in the node's config.toml File, under [app.extensions]:

config.toml
# ...
#######################################################################
### Extension Configuration ###
#######################################################################
[app.extensions]

# custom listener config
[app.extensions.my_custom_listener]
key1 = "value1"
key2 = "value2"
# ...

Configurations can be accessed via the common.Service object:

func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the custom listener config
config, ok := service.ExtensionConfigs["my_custom_listener"]
if !ok {
return errors.New("my_custom_listener config not found")
}
// config["key1"] == "value1"
// config["key2"] == "value2"
}

Best Practices

Event listeners are relatively complex, and thus require careful consideration to implement correctly. Below are some best practices to follow when implementing an event listener.

Transient Error Retries

Foreign network calls and event subscriptions often return transient errors that are fixed by simply retrying a call. It is important to handle these errors gracefully and retry the call with exponential backoff:

package main

import (
"context"
"time"

"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"

"github.com/jpillora/backoff"
)

func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// do some initialization

b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 10 * time.Second,
Factor: 2,
Jitter: true,
}

for {
select {
case <-ctx.Done():
return nil
default:
retry(func() error {
// poll external system and broadcast to the event store
})
}
}
}

// retry will retry the function until it is successful
func retry(fn func() error) {
retrier := &backoff.Backoff{
Min: 1 * time.Second,
Max: 10 * time.Second,
Factor: 2,
Jitter: true,
}

for {
err := fn()
if err == nil {
return nil
}

time.Sleep(retrier.Duration())
}
}

External Metadata Tracking

When consuming from external systems, it may be important to track external metadata about the data source. This can prevent the listener from re-consuming the same data after a crash or restart. The EventStore provides a Set, Get, and Delete method for storing key-value metadata.

For example, when consuming event logs from Ethereum, it may be desireable to track your last recorded block, to ensure that the node does not re-consume the entire Ethereum chain on startup:

func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the last block number we consumed
lastBlock, err := eventstore.Get(ctx, []byte("last_block"))
if err != nil {
return err
}

// do some initialization

for {
select {
case <-ctx.Done():
return nil
case <-time.After(5 * time.Second):
var newHeight int64
// get the latest unprocessed block using `lastBlock`
// ...

var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(h))

// persist the last block number
err := eventstore.Set(ctx, []byte("last_block"), buf)
if err != nil {
return err
}
}
}
}

Optional Configuration

There are many cases where a node may not want to run a listener. It is not a hard requirement for a validator to run a listener, so some listener implementations may leave them to be optional. In this case, implementers can return a nil error if configuration is not found:

func init() {
err := listeners.RegisterListener("delete", func(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the custom listener config
config, ok := service.ExtensionConfigs["my_custom_listener"]
if !ok {
// the node operator has not configured the listener,
// but we can still allow them to run the node.
// return nil as to not shut down the node
return nil
}
// ...
})
if err != nil {
panic(err)
}
}

Failure On Misconfiguration

If a listener is improperly configured by a node operator, it may be helpful to shut down the node to alert the operator of the misconfiguration. Take this case where a node operator needs to configure a URL for the listener to connect to:

func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// get the custom listener config
config, ok := service.ExtensionConfigs["my_custom_listener"]
if !ok {
return errors.New("my_custom_listener config not found")
}

url, ok := config["url"]
if !ok {
return errors.New("config my_custom_listener.url not found")
}
}

Proper Resource Cleanup

Listeners are long-running processes, and thus need to be properly cleaned up when the node shuts down or when the node is demoted from its validator status. Failure to do so may result in resource leaks, unnecessary disk writes, or over-consumption of system resources. Here is an example of how to properly clean up resources:

func listenFunc(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// do some initialization

for {
select {
case <-ctx.Done():
// clean up resources
return nil
case <-time.After(5 * time.Second):
// poll external system and broadcast to the event store
}
}
}