Skip to content

Commit

Permalink
Add support for ReQL changefeeds, fixes #180
Browse files Browse the repository at this point in the history
  • Loading branch information
mfenniak committed Oct 31, 2014
1 parent fd41c23 commit 74331f3
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 14 deletions.
76 changes: 64 additions & 12 deletions Examples/RethinkDb.Examples.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using RethinkDb;
using RethinkDb.Configuration;

Expand All @@ -8,25 +10,75 @@ namespace RethinkDb.Examples.ConsoleApp
public static class Program
{
private static IConnectionFactory connectionFactory = ConfigurationAssembler.CreateConnectionFactory("example");
private static CancellationTokenSource stopMonitor = new CancellationTokenSource();

public static void Main(string[] args)
{
var conn = connectionFactory.Get();
Thread changefeedThread;
using (var conn = connectionFactory.Get())
{
// Create DB if needed
if (!conn.Run(Query.DbList()).Contains("test"))
conn.Run(Query.DbCreate("test"));

// Create DB if needed
if (!conn.Run(Query.DbList()).Contains("test"))
conn.Run(Query.DbCreate("test"));
// Create table if needed
if (!conn.Run(Person.Db.TableList()).Contains("people"))
conn.Run(Person.Db.TableCreate("people"));

// Create table if needed
if (!conn.Run(Person.Db.TableList()).Contains("people"))
conn.Run(Person.Db.TableCreate("people"));
// Begin monitoring for database changes.
changefeedThread = new Thread(ChangeFeedMonitor);
changefeedThread.Start();

// Read all the contents of the table
foreach (var person in conn.Run(Person.Table))
Console.WriteLine("Id: {0}, Name: {1}", person.Id, person.Name);
// Read all the contents of the table
foreach (var person in conn.Run(Person.Table))
Console.WriteLine("Id: {0}, Name: {1}", person.Id, person.Name);

// Insert a new record
conn.Run(Person.Table.Insert(new Person() { Name = "Jack Black" }));
// Insert a new record
conn.Run(Person.Table.Insert(new Person() { Name = "Jack Black" }));
}

Console.WriteLine("Press any key to cancel monitoring.");
Console.ReadKey();
stopMonitor.Cancel();

changefeedThread.Join();
}

private static void ChangeFeedMonitor()
{
try
{
using (var conn = connectionFactory.Get())
{
foreach (var change in conn.Run(Person.Table.Changes(), cancellationToken: stopMonitor.Token))
{
string type;
Guid id;
if (change.NewValue == null)
{
type = "DELETE";
id = change.OldValue.Id;
}
else if (change.OldValue == null)
{
type = "INSERT";
id = change.NewValue.Id;
}
else
{
type = "UPDATE";
id = change.NewValue.Id;
}

Console.WriteLine("{0}: Monitored change to Person table, {1} of id {2}", DateTime.Now, type, id);
}
}
}
catch (AggregateException ex)
{
if (!(ex.InnerException is TaskCanceledException))
throw;
}
}
}
}
7 changes: 7 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# rethinkdb-net Release Notes

## Next Release

### Features

* Added support for RethinkDB changefeeds via ```table.Changes()```. This allows a client application to monitor and receive changes to a RethinkDB table as they happen. [Issue #180](https://github.com/mfenniak/rethinkdb-net/issues/180)


## 0.9.1.0 (2014-10-31)

### Features
Expand Down
31 changes: 31 additions & 0 deletions rethinkdb-net-test/Integration/TableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -465,5 +465,36 @@ public void DateTimeOffsetYMDHMSmsTimeSpanConstructor()
})).Single();
Assert.That(result.DTO, Is.EqualTo(new DateTimeOffset(2014, 1, 2, 3, 4, 5, 9, TimeSpan.FromHours(-2.5))));
}

[Test]
public void Changes()
{
DoChanges().Wait();
}

private async Task DoChanges()
{
var enumerator = connection.RunAsync(testTable.Changes());
try
{
var moveNext = enumerator.MoveNext();
using (var secondConnection = ConnectionFactory.Get())
{
await secondConnection.RunAsync(testTable.Insert(new TestObject() { Name = "Jim Brown" }));
}

Assert.That(await moveNext, Is.True);
var change = enumerator.Current;
Assert.That(change.OldValue, Is.Null);
Assert.That(change.NewValue, Is.Not.Null);
Assert.That(change.NewValue.Name, Is.EqualTo("Jim Brown"));
}
finally
{
Console.WriteLine("Disposing iterator...");
enumerator.Dispose().Wait();
Console.WriteLine("Disposed!");
}
}
}
}
4 changes: 3 additions & 1 deletion rethinkdb-net/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ private async Task ReissueQuery(CancellationToken cancellationToken)
{
case Response.ResponseType.SUCCESS_SEQUENCE:
case Response.ResponseType.SUCCESS_PARTIAL:
case Response.ResponseType.SUCCESS_FEED:
break;
case Response.ResponseType.SUCCESS_ATOM:
if (lastResponse.response[0].type != Datum.DatumType.R_ARRAY)
Expand Down Expand Up @@ -524,7 +525,8 @@ public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
return false;
}
else if (lastResponse.type == Response.ResponseType.SUCCESS_PARTIAL)
else if (lastResponse.type == Response.ResponseType.SUCCESS_PARTIAL ||
lastResponse.type == Response.ResponseType.SUCCESS_FEED)
{
query.type = RethinkDb.Spec.Query.QueryType.CONTINUE;
query.query = null;
Expand Down
1 change: 0 additions & 1 deletion rethinkdb-net/DmlResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ public class DmlResponseChange<T>
public T NewValue;
}
}

5 changes: 5 additions & 0 deletions rethinkdb-net/Query.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public static IWriteQuery<DmlResponse> Insert<T>(this ITableQuery<T> target, IEn
return new InsertQuery<T>(target, @objects, conflict);
}

public static ISequenceQuery<DmlResponseChange<TRecord>> Changes<TRecord>(this ITableQuery<TRecord> target)
{
return new ChangesQuery<TRecord>(target);
}

#endregion
#region Query Operations

Expand Down
26 changes: 26 additions & 0 deletions rethinkdb-net/QueryTerm/ChangesQuery.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using RethinkDb.Spec;
using System;
using System.Linq.Expressions;

namespace RethinkDb.QueryTerm
{
public class ChangesQuery<TRecord> : ISequenceQuery<DmlResponseChange<TRecord>>
{
private readonly ITableQuery<TRecord> tableQuery;

public ChangesQuery(ITableQuery<TRecord> tableQuery)
{
this.tableQuery = tableQuery;
}

public Term GenerateTerm(IQueryConverter queryConverter)
{
var term = new Term()
{
type = Term.TermType.CHANGES,
};
term.args.Add(tableQuery.GenerateTerm(queryConverter));
return term;
}
}
}
1 change: 1 addition & 0 deletions rethinkdb-net/rethinkdb-net.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
<Compile Include="Expressions\GuidExpressionConverters.cs" />
<Compile Include="DatumConverters\BinaryDatumConverterFactory.cs" />
<Compile Include="DatumConverters\BoundEnumDatumConverterFactory.cs" />
<Compile Include="QueryTerm\ChangesQuery.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
Expand Down

0 comments on commit 74331f3

Please sign in to comment.