Skip to content

Commit 8642bc3

Browse files
authored
Merge pull request #7 from corvansteijn/FixIdempotency
Fix idempotency
2 parents f74d646 + ced9402 commit 8642bc3

File tree

2 files changed

+180
-19
lines changed

2 files changed

+180
-19
lines changed

Src/LiquidProjections.NHibernate/NHibernateProjector.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,13 @@ public async Task Handle(IReadOnlyList<Transaction> transactions, SubscriptionIn
119119
{
120120
throw new ArgumentNullException(nameof(transactions));
121121
}
122+
123+
long? lastCheckpoint = GetLastCheckpoint();
124+
IEnumerable<IList<Transaction>> transactionBatches = transactions
125+
.Where(t => (!lastCheckpoint.HasValue) || (t.Checkpoint > lastCheckpoint))
126+
.InBatchesOf(BatchSize);
122127

123-
foreach (IList<Transaction> batch in transactions.InBatchesOf(BatchSize))
128+
foreach (IList<Transaction> batch in transactionBatches)
124129
{
125130
await ExecuteWithRetry(() => ProjectTransactionBatch(batch)).ConfigureAwait(false);
126131
}
@@ -150,16 +155,15 @@ private async Task ProjectTransactionBatch(IList<Transaction> batch)
150155
try
151156
{
152157
using (ISession session = sessionFactory())
158+
using (var tx = session.BeginTransaction())
153159
{
154-
session.BeginTransaction();
155-
156160
foreach (Transaction transaction in batch)
157161
{
158162
await ProjectTransaction(transaction, session).ConfigureAwait(false);
159163
}
160164

161165
StoreLastCheckpoint(session, batch.Last());
162-
session.Transaction.Commit();
166+
tx.Commit();
163167
}
164168
}
165169
catch (ProjectionException projectionException)

Tests/LiquidProjections.NHibernate.Specs/NHibernateProjectorSpecs.cs

Lines changed: 172 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Collections.ObjectModel;
34
using System.Linq;
45
using System.Threading.Tasks;
56
using Chill;
@@ -20,7 +21,7 @@ public class Given_a_sqlite_projector_with_an_in_memory_event_source :
2021
{
2122
protected EventMapBuilder<ProductCatalogEntry, string, NHibernateProjectionContext> Events;
2223
protected LruProjectionCache<ProductCatalogEntry, string> Cache;
23-
protected Exception ProjectionException = null;
24+
protected Exception ProjectionException;
2425
private readonly List<INHibernateChildProjector> children = new List<INHibernateChildProjector>();
2526

2627
public Given_a_sqlite_projector_with_an_in_memory_event_source()
@@ -68,11 +69,13 @@ protected void StartProjecting()
6869
{
6970
ProjectionException = e;
7071
}
71-
}
72+
}
7273
}, "");
7374
}
7475
}
7576

77+
#region Mapping Rules
78+
7679
public class When_a_create_was_requested_but_the_database_already_contained_that_projection :
7780
Given_a_sqlite_projector_with_an_in_memory_event_source
7881
{
@@ -151,7 +154,6 @@ public void Then_it_should_throw()
151154
{
152155
ProjectionException.Should()
153156
.BeOfType<ProjectionException>();
154-
155157
}
156158
}
157159

@@ -422,7 +424,7 @@ public When_a_create_or_update_of_an_existing_cached_projection_is_requested()
422424

423425
session.Flush();
424426
}
425-
427+
426428
Cache.Add(new ProductCatalogEntry
427429
{
428430
Id = "c350E",
@@ -1108,6 +1110,10 @@ public void Then_it_should_not_do_anything()
11081110
}
11091111
}
11101112

1113+
#endregion
1114+
1115+
#region General
1116+
11111117
public class When_a_custom_state_key_is_set : Given_a_sqlite_projector_with_an_in_memory_event_source
11121118
{
11131119
public When_a_custom_state_key_is_set()
@@ -1167,9 +1173,9 @@ public When_the_projector_state_is_enriched()
11671173
StreamId = "Product1",
11681174
Events = new[]
11691175
{
1170-
new EventEnvelope
1171-
{
1172-
Body = new ProductAddedToCatalogEvent
1176+
new EventEnvelope
1177+
{
1178+
Body = new ProductAddedToCatalogEvent
11731179
{
11741180
ProductKey = "c350E",
11751181
Category = "Hybrid"
@@ -1283,10 +1289,13 @@ public void Then_it_should_use_the_header()
12831289
}
12841290
}
12851291

1292+
#endregion
1293+
1294+
#region Child Projectors
1295+
12861296
public class When_there_is_a_child_projector :
12871297
Given_a_sqlite_projector_with_an_in_memory_event_source
12881298
{
1289-
private Transaction transaction2;
12901299
private readonly List<ChildProjectionState> childProjectionStates = new List<ChildProjectionState>();
12911300

12921301
public When_there_is_a_child_projector()
@@ -1338,7 +1347,7 @@ public When_there_is_a_child_projector()
13381347
}
13391348
};
13401349

1341-
transaction2 = new Transaction
1350+
var transaction2 = new Transaction
13421351
{
13431352
Events = new[]
13441353
{
@@ -1409,6 +1418,7 @@ private class ChildProjectionState
14091418
public bool Entry2Exists { get; set; }
14101419
}
14111420
}
1421+
14121422
public class When_a_child_projector_has_its_own_cache :
14131423
Given_a_sqlite_projector_with_an_in_memory_event_source
14141424
{
@@ -1455,6 +1465,58 @@ public async Task It_should_add_the_item_to_the_cache()
14551465
}
14561466
}
14571467

1468+
#endregion
1469+
1470+
#region Exception Handling
1471+
1472+
public class When_any_exception_is_thrown :
1473+
Given_a_sqlite_projector_with_an_in_memory_event_source
1474+
{
1475+
public When_any_exception_is_thrown()
1476+
{
1477+
Given(() =>
1478+
{
1479+
Events.Map<ProductAddedToCatalogEvent>()
1480+
.AsCreateOf(@event => @event.ProductKey)
1481+
.Using((p, @event) =>
1482+
{
1483+
p.Category = @event.Category;
1484+
p.Name = @event.Name;
1485+
});
1486+
1487+
Events.Map<CategoryDiscontinuedEvent>().As((@event, context) => throw new InvalidOperationException());
1488+
1489+
StartProjecting();
1490+
});
1491+
1492+
When(async () =>
1493+
{
1494+
await The<MemoryEventSource>()
1495+
.Write(new TransactionBuilder()
1496+
.WithEvent(new ProductAddedToCatalogEvent
1497+
{
1498+
Category = "some category",
1499+
Name = "some product",
1500+
ProductKey = "some key",
1501+
})
1502+
.WithEvent(new CategoryDiscontinuedEvent
1503+
{
1504+
Category = "some category"
1505+
})
1506+
.Build());
1507+
});
1508+
}
1509+
1510+
[Fact]
1511+
public void Then_it_should_rollback_the_transaction()
1512+
{
1513+
using (var session = The<ISessionFactory>().OpenSession())
1514+
{
1515+
session.QueryOver<ProductCatalogEntry>().List().Should().BeEmpty();
1516+
}
1517+
}
1518+
}
1519+
14581520
public class When_event_handling_fails :
14591521
Given_a_sqlite_projector_with_an_in_memory_event_source
14601522
{
@@ -1581,6 +1643,60 @@ public void Then_it_should_succeed()
15811643
succeeded.Should().BeTrue();
15821644
}
15831645
}
1646+
1647+
public class When_retrying_a_transaction_that_was_already_handled :
1648+
Given_a_sqlite_projector_with_an_in_memory_event_source
1649+
{
1650+
private ReadOnlyCollection<Transaction> transactions;
1651+
private SubscriptionInfo info;
1652+
1653+
public When_retrying_a_transaction_that_was_already_handled()
1654+
{
1655+
Given(() =>
1656+
{
1657+
Events.Map<ProductAddedToCatalogEvent>()
1658+
.AsCreateOf(@event => @event.ProductKey)
1659+
.Using((p, @event) =>
1660+
{
1661+
p.Category = @event.Category;
1662+
p.Name = @event.Name;
1663+
});
1664+
1665+
1666+
Transaction transaction = new TransactionBuilder()
1667+
.WithCheckpointNumber(5)
1668+
.WithEvent(new ProductAddedToCatalogEvent
1669+
{
1670+
Category = "some category",
1671+
Name = $"some product",
1672+
ProductKey = $"some key",
1673+
})
1674+
.Build();
1675+
1676+
transactions = new List<Transaction>
1677+
{
1678+
transaction
1679+
}
1680+
.AsReadOnly();
1681+
info = new SubscriptionInfo();
1682+
1683+
StartProjecting();
1684+
});
1685+
1686+
When(async () =>
1687+
{
1688+
await Subject.Handle(transactions, info);
1689+
await Subject.Handle(transactions, info);
1690+
}, DeferredExecution = true);
1691+
}
1692+
1693+
[Fact]
1694+
public void Then_it_should_not_handle_the_transactions()
1695+
{
1696+
WhenAction.ShouldNotThrow();
1697+
}
1698+
}
1699+
15841700
public class When_a_projection_exception_occurs :
15851701
Given_a_sqlite_projector_with_an_in_memory_event_source
15861702
{
@@ -1594,12 +1710,12 @@ public When_a_projection_exception_occurs()
15941710
{
15951711
throw The<InvalidOperationException>();
15961712
});
1597-
1713+
15981714
Cache.Add(new ProductCatalogEntry
15991715
{
16001716
Id = "c350E"
16011717
});
1602-
1718+
16031719
StartProjecting();
16041720
});
16051721

@@ -1625,6 +1741,7 @@ public void Then_it_should_completely_clear_the_cache()
16251741
Cache.CurrentCount.Should().Be(0);
16261742
}
16271743
}
1744+
16281745
public class When_a_database_exception_occurs : Given_a_sqlite_projector_with_an_in_memory_event_source
16291746
{
16301747
public When_a_database_exception_occurs()
@@ -1634,7 +1751,7 @@ public When_a_database_exception_occurs()
16341751
Events.Map<ProductAddedToCatalogEvent>()
16351752
.AsCreateOf(e => e.ProductKey)
16361753
.Using((p, e) => p.Name = e.Name);
1637-
1754+
16381755
using (ISession session = The<ISessionFactory>().OpenSession())
16391756
{
16401757
var entry = new ProductCatalogEntry
@@ -1647,12 +1764,12 @@ public When_a_database_exception_occurs()
16471764
session.Save(entry);
16481765
session.Flush();
16491766
}
1650-
1767+
16511768
Cache.Add(new ProductCatalogEntry
16521769
{
16531770
Id = "SomeOtherCachedId"
16541771
});
1655-
1772+
16561773
StartProjecting();
16571774
});
16581775

@@ -1666,7 +1783,6 @@ await The<MemoryEventSource>().Write(new ProductAddedToCatalogEvent
16661783
Category = "Hybrids",
16671784
Name = "DuplicateNameThatWillThrowUniqueConstraintException"
16681785
});
1669-
16701786
}
16711787
catch
16721788
{
@@ -1681,6 +1797,8 @@ public void Then_it_should_completely_clear_the_cache()
16811797
Cache.CurrentCount.Should().Be(0);
16821798
}
16831799
}
1800+
1801+
#endregion
16841802
}
16851803

16861804
#region Supporting Types
@@ -1743,4 +1861,43 @@ public class CategoryDiscontinuedEvent
17431861
}
17441862

17451863
#endregion
1864+
}
1865+
1866+
namespace LiquidProjections.NHibernate.Specs.NHibernateProjectorSpecs
1867+
{
1868+
public class TransactionBuilder
1869+
{
1870+
private readonly List<object> events = new List<object>();
1871+
private long? checkpointNumber;
1872+
1873+
public TransactionBuilder WithEvent(object @event)
1874+
{
1875+
events.Add(@event);
1876+
return this;
1877+
}
1878+
1879+
public Transaction Build()
1880+
{
1881+
Transaction transaction = new Transaction
1882+
{
1883+
Events = events.Select(e => new EventEnvelope
1884+
{
1885+
Body = e
1886+
}).ToArray()
1887+
};
1888+
1889+
if (checkpointNumber.HasValue)
1890+
{
1891+
transaction.Checkpoint = checkpointNumber.Value;
1892+
}
1893+
1894+
return transaction;
1895+
}
1896+
1897+
public TransactionBuilder WithCheckpointNumber(long checkpointNumber)
1898+
{
1899+
this.checkpointNumber = checkpointNumber;
1900+
return this;
1901+
}
1902+
}
17461903
}

0 commit comments

Comments
 (0)