Example Pt. 2: Ethereum Event Listener
In this tutorial, we will build a simple event listener that listens to EVM events, and credits an account when a log is heard.
The listener will be configured to listen to a specific contract address, and will listen to the EVM event signature Credit(address,uint256)
This tutorial uses concepts and code from the resolution extension introduced in this part 1: example Kwil credits.
This tutorial is for example purposes only, and should not be used in production. Production implementations should account for transient network failures, subscription disconnections, Ethereum reorgs, and other edge cases. For a production-ready implementation of this event listener, please refer to the Kwil EVM Deposits Listener that is included in all Kwil nodes by default.
Prerequisites
This tutorial assumes that the user is familiar with the following concepts:
- The account credit resolution tutorial
- Building a custom Kwil node with extensions
- Running a Kwil node locally
- EVM event logs and contract ABIs
Step 1. Querying Ethereum
To get started, we will first need a way to query the Ethereum blockchain for new events. We will use the popular go-ethereum
library, which is the
main implementation of Ethereum. We will use this library to connect to an Ethereum node, subscribe to new blocks, and query for logs.
The code below relies on the AccountCreditResolution
struct and the CreditAccountEventType
constant from part 1.
package main
import (
"context"
"math/big"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kwilteam/kwil-db/extensions/listeners"
"github.com/kwilteam/kwil-db/extensions/resolutions/credit"
)
// contractABIStr is the ABI of the smart contract the listener listens to.
// It follows the Ethereum ABI JSON format, and matches the `Credit(address,uint256)` event signature.
const contractABIStr = `[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"_from","type":"address"},{"indexed":false,"internalType":"uint256","name":"_amount","type":"uint256"}],"name":"Credit","type":"event"}]`
// creditEventSignature is the EVM event signature the listener listens to.
var creditEventSignature ethcommon.Hash = crypto.Keccak256Hash([]byte("Credit(address,uint256)"))
// processBlock gets all credit events for a range of blocks and processes them.
// it will broadcast the credit events to the Kwil network.
func processBlock(ctx context.Context, eventstore listeners.EventStore, client *ethclient.Client, blockNumber int64, contract ethcommon.Address) error {
// query the logs for the credit event
logs, err := client.FilterLogs(ctx, ethereum.FilterQuery{
ToBlock: big.NewInt(blockNumber),
FromBlock: big.NewInt(blockNumber),
Addresses: []ethcommon.Address{contract},
Topics: [][]ethcommon.Hash{{creditEventSignature}},
})
if err != nil {
return err
}
// get the abi, so that we can decode the logs+
eventABI, err := abi.JSON(strings.NewReader(contractABIStr))
if err != nil {
return err
}
for _, log := range logs {
data, err := eventABI.Unpack("Credit", log.Data)
if err != nil {
return err
}
// data[0] is the address of the account to credit
// data[1] is the amount to credit the account
account := data[0].(ethcommon.Address)
amount := data[1].(*big.Int)
// get the resolution from the credt resolution extension
resolution := AccountCreditResolution{
Account: account.Bytes(),
Amount: amount,
TxHash: log.TxHash.Bytes(),
}
bts, err := resolution.MarshalBinary()
if err != nil {
return err
}
// broadcast the resolution to the network
err = eventstore.Broadcast(ctx, CreditAccountEventType, bts)
if err != nil {
return err
}
}
return nil
}
Step 2. Listening to Ethereum
In order to listen to Ethereum, we will need to read in our local node's configuration, create a client, and listen for new blocks. If the configuration is not set, or if the configuration is invalid, we will immediately shut down the listener. We will also stop the listener if the context is cancelled.
package main
import (
"context"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kwilteam/kwil-db/extensions/listeners"
)
// EthListener is an event listener that listens to ethereum events and broadcasts corresponding
// account credits to the Kwil network.
func EthListener(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// we will first get the configs that the local node has for the eth_listener extension
config, ok := service.ExtensionConfigs["eth_listener"]
if !ok {
service.Logger.Info("eth_listener: no config found. skipping...")
return nil
}
rpcProvider, ok := config["rpc_provider"]
if !ok {
return fmt.Errorf("eth_listener: rpc_provider not found in config")
}
contractAddressStr, ok := config["contract_address"]
if !ok {
return fmt.Errorf("eth_listener: contract_address not found in config")
}
contractAddress := ethcommon.HexToAddress(contractAddressStr)
// we will now use go-ethereum to subscribe to new blocks
client, err := ethclient.Dial(rpcProvider)
if err != nil {
return fmt.Errorf("eth_listener: failed to connect to ethereum client: %w", err)
}
defer client.Close()
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(ctx, headers)
if err != nil {
return fmt.Errorf("eth_listener: failed to subscribe to new block headers: %w", err)
}
defer sub.Unsubscribe()
// we will now listen for new blocks and process them
// if the context is cancelled, we will stop listening
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
return fmt.Errorf("eth_listener: subscription error: %w", err)
case header := <-headers:
err := processBlock(ctx, eventstore, client, header.Number.Int64(), contractAddress)
if err != nil {
return fmt.Errorf("eth_listener: failed to process block: %w", err)
}
}
}
}
Step 3. Configuration
In order to run the listener, each Kwil node will need to configure an Ethereum RPC provider and smart contract address to listen to.
This can be done in each node's local config.toml
file:
# ...
[app.extensions.eth_listener]
rpc_provider = "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY"
contract_address = "0xYOUR_CONTRACT_ADDRESS"
# ...
The listener will read in the values set in config.toml
, using them to connect to the Ethereum network and listen to the specified contract address.
Step 4. Register the Event Listener
Register the event listener with the Kwil node. This is done by calling the RegisterListener
function in the package's init
function.
package main
import (
"github.com/kwilteam/kwil-db/extensions/listeners"
)
func init() {
err := listeners.RegisterListener("eth_listener", EthListener)
if err != nil {
panic(err)
}
}
Full Example
The full example of the event listener extension implemented in this tutorial is shown below:
credit_listener.go
package main
import (
"context"
"fmt"
"math/big"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
"github.com/kwilteam/kwil-db/extensions/resolutions/credit"
)
// EthListener is an event listener that listens to ethereum events and broadcasts corresponding
// account credits to the Kwil network.
func EthListener(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// we will first get the configs that the local node has for the eth_listener extension
config, ok := service.ExtensionConfigs["eth_listener"]
if !ok {
service.Logger.Info("eth_listener: no config found. skipping...")
return nil
}
rpcProvider, ok := config["rpc_provider"]
if !ok {
return fmt.Errorf("eth_listener: rpc_provider not found in config")
}
contractAddressStr, ok := config["contract_address"]
if !ok {
return fmt.Errorf("eth_listener: contract_address not found in config")
}
contractAddress := ethcommon.HexToAddress(contractAddressStr)
// we will now use go-ethereum to subscribe to new blocks
client, err := ethclient.Dial(rpcProvider)
if err != nil {
return fmt.Errorf("eth_listener: failed to connect to ethereum client: %w", err)
}
defer client.Close()
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(ctx, headers)
if err != nil {
return fmt.Errorf("eth_listener: failed to subscribe to new block headers: %w", err)
}
defer sub.Unsubscribe()
// we will now listen for new blocks and process them
// if the context is cancelled, we will stop listening
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
return fmt.Errorf("eth_listener: subscription error: %w", err)
case header := <-headers:
err := processBlock(ctx, eventstore, client, header.Number.Int64(), contractAddress)
if err != nil {
return fmt.Errorf("eth_listener: failed to process block: %w", err)
}
}
}
}
// contractABIStr is the ABI of the smart contract the listener listens to.
// It follows the Ethereum ABI JSON format, and matches the `Credit(address,uint256)` event signature.
const contractABIStr = `[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"_from","type":"address"},{"indexed":false,"internalType":"uint256","name":"_amount","type":"uint256"}],"name":"Credit","type":"event"}]`
// creditEventSignature is the EVM event signature the listener listens to.
var creditEventSignature ethcommon.Hash = crypto.Keccak256Hash([]byte("Credit(address,uint256)"))
// processBlock gets all credit events for a range of blocks and processes them.
// it will broadcast the credit events to the network.
func processBlock(ctx context.Context, eventstore listeners.EventStore, client *ethclient.Client, blockNumber int64, contract ethcommon.Address) error {
// query the logs for the credit event
logs, err := client.FilterLogs(ctx, ethereum.FilterQuery{
ToBlock: big.NewInt(blockNumber),
FromBlock: big.NewInt(blockNumber),
Addresses: []ethcommon.Address{contract},
Topics: [][]ethcommon.Hash{{creditEventSignature}},
})
if err != nil {
return err
}
// get the abi, so that we can decode the logs+
eventABI, err := abi.JSON(strings.NewReader(contractABIStr))
if err != nil {
return err
}
for _, log := range logs {
data, err := eventABI.Unpack("Credit", log.Data)
if err != nil {
return err
}
// data[0] is the address of the account to credit
// data[1] is the amount to credit the account
account := data[0].(ethcommon.Address)
amount := data[1].(*big.Int)
resolution := AccountCreditResolution{
Account: account.Bytes(),
Amount: amount,
TxHash: log.TxHash.Bytes(),
}
bts, err := resolution.MarshalBinary()
if err != nil {
return err
}
err = eventstore.Broadcast(ctx, CreditAccountEventType, bts)
if err != nil {
return err
}
}
return nil
}
func init() {
err := listeners.RegisterListener("eth_listener", EthListener)
if err != nil {
panic(err)
}
}
To build and run the event listener extension example, follow the standard kwild
build and run process.