Skip to content

Commit

Permalink
Add instrumentation events with Pulsar (#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulcsmith authored Aug 22, 2020
1 parent 4178573 commit 5dd2676
Show file tree
Hide file tree
Showing 19 changed files with 292 additions and 123 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
check_format:
runs-on: ubuntu-latest
container:
image: crystallang/crystal:0.35.0
image: crystallang/crystal:0.35.1
steps:
- uses: actions/checkout@v1
- name: Install shards
Expand All @@ -20,7 +20,7 @@ jobs:
ameba:
runs-on: ubuntu-latest
container:
image: crystallang/crystal:0.35.0
image: crystallang/crystal:0.35.1
steps:
- uses: actions/checkout@v1
- name: Install shards
Expand All @@ -30,7 +30,7 @@ jobs:
specs:
runs-on: ubuntu-latest
container:
image: crystallang/crystal:0.35.0
image: crystallang/crystal:0.35.1
services:
postgres:
image: postgres:10-alpine
Expand Down Expand Up @@ -66,4 +66,4 @@ jobs:
run: crystal spec
env:
BACKUP_DATABASE_URL: postgres://lucky:developer@postgres:5432/sample_backup
DATABASE_URL: postgres://lucky:developer@postgres:5432/avram_dev
DATABASE_URL: postgres://lucky:developer@postgres:5432/avram_dev
3 changes: 3 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ dependencies:
shell-table:
github: luckyframework/shell-table.cr
branch: refactor/setter
pulsar:
github: luckyframework/pulsar
version: ~> 0.2.0

development_dependencies:
ameba:
Expand Down
52 changes: 52 additions & 0 deletions spec/instrumentation_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require "./spec_helper"

describe "Instrumentation" do
it "publishes the query and args" do
TestDatabase.query "SELECT * FROM users"

event = Avram::Events::QueryEvent.logged_events.last
event.query.should eq("SELECT * FROM users")
event.queryable.should be_nil
end

it "labels the query if coming from a Queryable" do
UserQuery.new.name("Bob").first?

event = Avram::Events::QueryEvent.logged_events.last
event.query.should contain("WHERE users.name = $1")
event.args.to_s.should contain("Bob")
event.queryable.should eq("User")
end

it "labels the scalar if coming from a Queryable" do
UserQuery.new.name("Bob").select_count

event = Avram::Events::QueryEvent.logged_events.last
event.query.should contain("WHERE users.name = $1")
event.args.to_s.should contain("Bob")
event.queryable.should eq("User")
end

it "publishes failed queries" do
expect_raises PQ::PQError do
TestDatabase.scalar "NOT VALID SORRY"
end

event = Avram::Events::FailedQueryEvent.logged_events.last
event.query.should contain("NOT VALID SORRY")
end

it "publishes failed operations" do
Task::SaveOperation.create do |_op, _task|
event = Avram::Events::SaveFailedEvent.logged_events.last
event.operation_class.should eq("Task::SaveOperation")
end
end

it "publishes successful operations" do
Employee::SaveOperation.create!(name: "Someone Special")

event = Avram::Events::SaveSuccessEvent.logged_events.last
event.operation_class.should eq("Employee::SaveOperation")
end
end
24 changes: 22 additions & 2 deletions spec/query_logging_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,29 @@ require "./spec_helper"
describe "Query logging" do
it "logs the statement and args" do
Avram::QueryLog.dexter.temp_config do |log_io|
UserQuery.new.name("Bob").select("*").first?
UserQuery.new.name("Bob").first?
log_io.to_s.should contain %(WHERE users.name = $1)
log_io.to_s.should contain %(["Bob"])
log_io.to_s.should contain %(Bob)
log_io.to_s.should contain %(duration)
end
end

it "does not log truncate statements" do
Avram::QueryLog.dexter.temp_config do |log_io|
TestDatabase.truncate
log_io.to_s.should eq("")
end
end

it "logs failed queries" do
Avram::FailedQueryLog.dexter.temp_config do |log_io|
expect_raises(PQ::PQError) do
TestDatabase.scalar "NOT VALID SORRY"
end
log_io.to_s.should contain("syntax error at or near")
log_io.to_s.should contain("NOT VALID SORRY")
# Filter args so failed queries can be safely logged in production
log_io.to_s.should contain("[FILTERED]")
end
end
end
4 changes: 3 additions & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ require "./support/base_model"
require "./support/**"
require "../config/*"

Pulsar.enable_test_mode!

backend = Log::IOBackend.new(STDERR)
backend.formatter = Dexter::JSONLogFormatter.proc
Log.builder.bind("avram.*", :error, Log::IOBackend.new(STDERR))
Log.builder.bind("avram.*", :error, backend)

Db::Create.new(quiet: true).call
Db::Migrate.new(quiet: true).call
Expand Down
51 changes: 50 additions & 1 deletion src/avram.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "lucky_cli"
require "wordsmith"
require "habitat"
require "blank"
require "pulsar"
require "./avram/object_extensions"
require "./avram/criteria"
require "./avram/type"
Expand All @@ -14,7 +15,6 @@ require "./avram/tasks/**"
require "./avram/**"
require "db"
require "pg"
require "./avram/pool_statement_logging"
require "uuid"

module Avram
Expand All @@ -28,4 +28,53 @@ module Avram
QueryLog = Log.for("query")
FailedQueryLog = Log.for("failed_query")
SaveFailedLog = Log.for("save_failed")

def self.initialize_logging
Avram::Events::QueryEvent.subscribe do |event, duration|
next if event.query.starts_with?("TRUNCATE")

Avram::QueryLog.dexter.info do
queryable = event.queryable
log_data = {
query: event.query,
args: event.args,
duration: Pulsar.elapsed_text(duration),
}

if queryable
{model: queryable}.merge(log_data)
else
log_data
end
end
end

Avram::Events::FailedQueryEvent.subscribe do |event|
Avram::FailedQueryLog.dexter.error do
queryable = event.queryable
log_data = {
error_message: event.error_message,
query: event.query,
args: "[FILTERED]",
}

if queryable
{model: queryable}.merge(log_data)
else
log_data
end
end
end

Avram::Events::SaveFailedEvent.subscribe do |event|
Avram::SaveFailedLog.dexter.warn do
{
failed_to_save: event.operation_class,
validation_errors: event.error_messages_as_string,
}
end
end
end
end

Avram.initialize_logging
10 changes: 4 additions & 6 deletions src/avram/base_query_template.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ class Avram::BaseQueryTemplate
end
{% end %}

database.run do |db|
db.exec(
query.statement_for_update(_changes, return_columns: false),
args: query.args_for_update(_changes)
).rows_affected
end
database.exec(
query.statement_for_update(_changes, return_columns: false),
args: query.args_for_update(_changes)
).rows_affected
end

{% for column in columns %}
Expand Down
67 changes: 58 additions & 9 deletions src/avram/database.cr
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,57 @@ abstract class Avram::Database
end
end

def self.run
new.run do |*yield_args|
yield *yield_args
# Methods without a block
{% for crystal_db_alias in [:exec, :scalar, :query, :query_all, :query_one, :query_each] %}
# Same as crystal-db's `DB::QueryMethods#{{ crystal_db_alias.id }}` but with instrumentation
def {{ crystal_db_alias.id }}(query, *args_, args : Array? = nil, queryable : String? = nil, **named_args)
publish_query_event(query, args_, args, queryable) do
run do |db|
db.{{ crystal_db_alias.id }}(query, *args_, args: args)
end
end
end

# Same as crystal-db's `DB::QueryMethods#{{ crystal_db_alias.id }}` but with instrumentation
def self.{{ crystal_db_alias.id }}(query, *args_, args : Array? = nil, queryable : String? = nil, **named_args)
new.{{ crystal_db_alias.id }}(query, *args_, args: args, queryable: queryable)
end
{% end %}

# Methods with a block
{% for crystal_db_alias in [:query, :query_all, :query_one, :query_each] %}
# Same as crystal-db's `DB::QueryMethods#{{ crystal_db_alias }}` but with instrumentation
def {{ crystal_db_alias.id }}(query, *args_, args : Array? = nil, queryable : String? = nil, **named_args)
publish_query_event(query, args_, args, queryable) do
run do |db|
db.{{ crystal_db_alias.id }}(query, *args_, args: args) do |*yield_args|
yield *yield_args
end
end
end
end

# Same as crystal-db's `DB::QueryMethods#{{ crystal_db_alias }}` but with instrumentation
def self.{{ crystal_db_alias.id }}(query, *args_, args : Array? = nil, queryable : String? = nil, **named_args)
new.{{ crystal_db_alias.id }}(query, *args_, args: args, queryable: queryable) do |*yield_args|
yield *yield_args
end
end
{% end %}

def publish_query_event(query, args_, args, queryable)
logging_args = DB::EnumerableConcat.build(args_, args).to_s
Avram::Events::QueryEvent.publish(query: query, args: logging_args, queryable: queryable) do
yield
end
rescue e : PQ::PQError
Avram::Events::FailedQueryEvent.publish(
error_message: e.message.to_s,
query: query,
queryable: queryable,
args: logging_args
)
raise e
end

def self.credentials
Expand All @@ -66,6 +113,12 @@ abstract class Avram::Database
settings.credentials.url
end

def self.run
new.run do |*yield_args|
yield *yield_args
end
end

# :nodoc:
def run
yield current_transaction.try(&.connection) || db
Expand Down Expand Up @@ -174,19 +227,15 @@ abstract class Avram::Database
table_names = database.table_names
return if table_names.empty?
statement = ("TRUNCATE TABLE #{table_names.map { |name| name }.join(", ")} RESTART IDENTITY CASCADE;")
database.run do |db|
db.exec statement
end
database.exec statement
end

def delete
table_names = database.table_names
return if table_names.empty?
table_names.each do |t|
statement = ("DELETE FROM #{t}")
database.run do |db|
db.exec statement
end
database.exec statement
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions src/avram/events/failed_query_event.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class Avram::Events::FailedQueryEvent < Pulsar::Event
getter :error_message, :query, :args, :queryable

def initialize(@error_message : String, @query : String, @args : String?, @queryable : String? = nil)
end
end
6 changes: 6 additions & 0 deletions src/avram/events/query_event.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class Avram::Events::QueryEvent < Pulsar::TimedEvent
getter :query, :args, :queryable

def initialize(@query : String, @args : String?, @queryable : String? = nil)
end
end
19 changes: 19 additions & 0 deletions src/avram/events/save_failed_event.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class Avram::Events::SaveFailedEvent < Pulsar::Event
getter :operation_class, :attributes

def initialize(
@operation_class : String,
@attributes : Array(Avram::GenericAttribute)
)
end

def invalid_attributes
attributes.reject(&.valid?)
end

def error_messages_as_string
invalid_attributes.map do |attribute|
"#{attribute.name} #{attribute.errors.join(", ")}"
end.join(". ")
end
end
9 changes: 9 additions & 0 deletions src/avram/events/save_success_event.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Avram::Events::SaveSuccessEvent < Pulsar::Event
getter :operation_class, :attributes

def initialize(
@operation_class : String,
@attributes : Array(Avram::GenericAttribute)
)
end
end
23 changes: 23 additions & 0 deletions src/avram/generic_attribute.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# A generic version of `Avram::Attribute` that is used for reporting and metrics.
#
# This is a data only version of an `Avram::Atribute`. It is purely for
# retrieving and reporting on data. For example, `Avram::GenericAttribute` is
# used by `Avram::Events::SaveFailedEvent` so that subscribers can
# get information about attributes that failed to save.
class Avram::GenericAttribute
getter :name, :param, :original_value, :value, :param_key, :errors

def initialize(
@name : Symbol,
@param : String?,
@original_value : String?,
@value : String?,
@param_key : String,
@errors : Array(String)
)
end

def valid?
errors.empty?
end
end
Loading

0 comments on commit 5dd2676

Please sign in to comment.