-
Notifications
You must be signed in to change notification settings - Fork 34
[WIP] Sequenced message request acceptance and message sending #583
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -721,20 +721,24 @@ extension ConversationVC: | |
| ) | ||
|
|
||
| // If this was a message request then approve it | ||
| approveMessageRequestIfNeeded( | ||
| // For 1-1 messages if approval is required `messageRequestResponse` | ||
| // `MessageRequestResponse` object will be returned | ||
| let messageRequestResponse = approveMessageRequestIfNeeded( | ||
| for: self.viewModel.threadData.threadId, | ||
| threadVariant: self.viewModel.threadData.threadVariant, | ||
| displayName: self.viewModel.threadData.displayName, | ||
| isDraft: (self.viewModel.threadData.threadIsDraft == true), | ||
| timestampMs: (sentTimestampMs - 1) // Set 1ms earlier as this is used for sorting | ||
| ).sinkUntilComplete( | ||
| receiveCompletion: { [weak self] _ in | ||
| self?.sendMessage(optimisticData: optimisticData) | ||
| } | ||
| timestampMs: (sentTimestampMs - 1), // Set 1ms earlier as this is used for sorting | ||
| shouldSequenceMessageRequestResponse: true // Skips scheduling of `MessageRequestResponse` sending | ||
| ) | ||
|
|
||
| sendMessage( | ||
| optimisticData: optimisticData, | ||
| messageRequestResponse: messageRequestResponse | ||
| ) | ||
| } | ||
|
|
||
| private func sendMessage(optimisticData: ConversationViewModel.OptimisticMessageData) { | ||
| private func sendMessage(optimisticData: ConversationViewModel.OptimisticMessageData, messageRequestResponse: MessageRequestResponse? = nil) { | ||
| let threadId: String = self.viewModel.threadData.threadId | ||
| let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant | ||
|
|
||
|
|
@@ -831,6 +835,7 @@ extension ConversationVC: | |
| interaction: insertedInteraction, | ||
| threadId: threadId, | ||
| threadVariant: threadVariant, | ||
| messageResponse: messageRequestResponse, | ||
| using: dependencies | ||
| ) | ||
| } | ||
|
|
@@ -2948,13 +2953,15 @@ extension ConversationVC: UIDocumentInteractionControllerDelegate { | |
| // MARK: - Message Request Actions | ||
|
|
||
| extension ConversationVC { | ||
| @discardableResult | ||
| fileprivate func approveMessageRequestIfNeeded( | ||
| for threadId: String, | ||
| threadVariant: SessionThread.Variant, | ||
| displayName: String, | ||
| isDraft: Bool, | ||
| timestampMs: Int64 | ||
| ) -> AnyPublisher<Void, Never> { | ||
| timestampMs: Int64, | ||
| shouldSequenceMessageRequestResponse: Bool = false | ||
| ) -> MessageRequestResponse? { | ||
| let updateNavigationBackStack: () -> Void = { | ||
| // Remove the 'SessionTableViewController<MessageRequestsViewModel>' from the nav hierarchy if present | ||
| DispatchQueue.main.async { [weak self] in | ||
|
|
@@ -2982,9 +2989,14 @@ extension ConversationVC { | |
| Contact.fetchOrCreate(db, id: threadId, using: dependencies) | ||
| }), | ||
| !contact.isApproved | ||
| else { return Just(()).eraseToAnyPublisher() } | ||
| else { return nil } | ||
|
|
||
| return viewModel.dependencies[singleton: .storage] | ||
| let messageRequestResponse = MessageRequestResponse( | ||
| isApproved: true, | ||
| sentTimestampMs: UInt64(timestampMs) | ||
| ) | ||
|
|
||
| viewModel.dependencies[singleton: .storage] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are no longer returning a publisher, you should update both of these database queries so be synchronous ( This would mostly be to avoid weird situations where something else in the code validates one of these changes has occurred before sending messages (I don't think there is a case like that at the moment, but one could be added in the future) |
||
| .writePublisher { [dependencies = viewModel.dependencies] db in | ||
| /// If this isn't a draft thread (ie. sending a message request) then send a `messageRequestResponse` | ||
| /// back to the sender (this allows the sender to know that they have been approved and can now use this | ||
|
|
@@ -3002,17 +3014,16 @@ extension ConversationVC { | |
| using: dependencies | ||
| ).inserted(db) | ||
|
|
||
| try MessageSender.send( | ||
| db, | ||
| message: MessageRequestResponse( | ||
| isApproved: true, | ||
| sentTimestampMs: UInt64(timestampMs) | ||
| ), | ||
| interactionId: nil, | ||
| threadId: threadId, | ||
| threadVariant: threadVariant, | ||
| using: dependencies | ||
| ) | ||
| if !shouldSequenceMessageRequestResponse { | ||
| try MessageSender.send( | ||
| db, | ||
| message: messageRequestResponse, | ||
| interactionId: nil, | ||
| threadId: threadId, | ||
| threadVariant: threadVariant, | ||
| using: dependencies | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // Default 'didApproveMe' to true for the person approving the message request | ||
|
|
@@ -3038,7 +3049,9 @@ extension ConversationVC { | |
| updateNavigationBackStack() | ||
| } | ||
| ) | ||
| .eraseToAnyPublisher() | ||
| .sinkUntilComplete() | ||
|
|
||
| return !isDraft && shouldSequenceMessageRequestResponse ? messageRequestResponse : nil | ||
|
|
||
| case .group: | ||
| // If the group is not in the invited state then don't bother doing anything | ||
|
|
@@ -3047,9 +3060,9 @@ extension ConversationVC { | |
| try ClosedGroup.fetchOne(db, id: threadId) | ||
| }), | ||
| group.invited == true | ||
| else { return Just(()).eraseToAnyPublisher() } | ||
| else { return nil } | ||
|
|
||
| return viewModel.dependencies[singleton: .storage] | ||
| viewModel.dependencies[singleton: .storage] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably make the same change for the |
||
| .writePublisher { [dependencies = viewModel.dependencies] db in | ||
| /// Remove any existing `infoGroupInfoInvited` interactions from the group (don't want to have a | ||
| /// duplicate one from inside the group history) | ||
|
|
@@ -3102,10 +3115,13 @@ extension ConversationVC { | |
| updateNavigationBackStack() | ||
| } | ||
| ) | ||
| .eraseToAnyPublisher() | ||
| .sinkUntilComplete() | ||
|
|
||
| default: return Just(()).eraseToAnyPublisher() | ||
| default: | ||
| return nil | ||
| } | ||
|
|
||
| return nil | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for this |
||
| } | ||
|
|
||
| func acceptMessageRequest() { | ||
|
|
@@ -3115,7 +3131,7 @@ extension ConversationVC { | |
| displayName: self.viewModel.threadData.displayName, | ||
| isDraft: (self.viewModel.threadData.threadIsDraft == true), | ||
| timestampMs: viewModel.dependencies[cache: .snodeAPI].currentOffsetTimestampMs() | ||
| ).sinkUntilComplete() | ||
| ) | ||
| } | ||
|
|
||
| func declineMessageRequest() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,13 +193,36 @@ public enum MessageSendJob: JobExecutor { | |
| default: break | ||
| } | ||
|
|
||
| let userSessionId: SessionId = dependencies[cache: .general].sessionId | ||
|
|
||
| // Convert and prepare the data for sending | ||
| let swarmPublicKey: String = { | ||
| switch details.destination { | ||
| case .contact(let publicKey): return publicKey | ||
| case .syncMessage: return userSessionId.hexString | ||
| case .closedGroup(let groupPublicKey): return groupPublicKey | ||
| case .openGroup, .openGroupInbox: preconditionFailure() | ||
| } | ||
| }() | ||
|
Comment on lines
+196
to
+206
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is duplicating logic from the func swarmPublicKey(using dependencies Dependencies) throws -> String {
let userSessionId: SessionId = dependencies[cache: .general].sessionId
switch self {
case .contact(let publicKey): return publicKey
case .syncMessage: return userSessionId.hexString
case .closedGroup(let groupPublicKey): return groupPublicKey
case .openGroup, .openGroupInbox: throw MessageSenderError.invalidDestination
}
}Note: You will need to add a new |
||
|
|
||
| // Store the sentTimestamp from the message in case it fails due to a clockOutOfSync error | ||
| let originalSentTimestampMs: UInt64? = details.message.sentTimestampMs | ||
| let startTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970 | ||
|
|
||
| // Update `messageType` value used for logging info | ||
| var updatedMessageType: String { | ||
| guard let messageRequestResponse = job.transientData as? MessageRequestResponse else { | ||
| return messageType | ||
| } | ||
| return "\(messageType) and \(type(of: messageRequestResponse))" | ||
| } | ||
|
|
||
| /// Perform the actual message sending - this will timeout if the entire process takes longer than `Network.defaultTimeout * 2` | ||
| /// which can occur if it needs to build a new onion path (which doesn't actually have any limits so can take forever in rare cases) | ||
| /// | ||
| /// Adds sending for `MessageRequestResponse` along with actual message if `job.transientData` is a valid | ||
| /// `MessageRequestResponse` type | ||
| /// | ||
| /// **Note:** No need to upload attachments as part of this process as the above logic splits that out into it's own job | ||
| /// so we shouldn't get here until attachments have already been uploaded | ||
| dependencies[singleton: .storage] | ||
|
|
@@ -216,15 +239,39 @@ public enum MessageSendJob: JobExecutor { | |
| using: dependencies | ||
| ) | ||
| }) | ||
| .tryFlatMap { authMethod in | ||
| try MessageSender.preparedSend( | ||
| message: details.message, | ||
| to: details.destination, | ||
| namespace: details.destination.defaultNamespace, | ||
| interactionId: job.interactionId, | ||
| attachments: messageAttachments, | ||
| authMethod: authMethod, | ||
| onEvent: MessageSender.standardEventHandling(using: dependencies), | ||
| .tryFlatMap { authMethod -> AnyPublisher<(ResponseInfoType, Network.BatchResponse), Error> in | ||
| return try Network.SnodeAPI.preparedSequence( | ||
| requests: [] | ||
| .appending(try { | ||
| guard let messageRequestResponse = job.transientData as? MessageRequestResponse else { return nil } | ||
|
|
||
| return try MessageSender.preparedSend( | ||
| message: messageRequestResponse, | ||
| to: details.destination, | ||
| namespace: details.destination.defaultNamespace, | ||
| interactionId: nil, | ||
| attachments: nil, | ||
| authMethod: authMethod, | ||
| onEvent: MessageSender.standardEventHandling(using: dependencies), | ||
| using: dependencies | ||
| ) | ||
| }()) | ||
| .appending(contentsOf: [ | ||
| MessageSender.preparedSend( | ||
| message: details.message, | ||
| to: details.destination, | ||
| namespace: details.destination.defaultNamespace, | ||
| interactionId: job.interactionId, | ||
| attachments: messageAttachments, | ||
| authMethod: authMethod, | ||
| onEvent: MessageSender.standardEventHandling(using: dependencies), | ||
| using: dependencies | ||
| ) | ||
| ]), | ||
| requireAllBatchResponses: true, | ||
| swarmPublicKey: swarmPublicKey, | ||
| snodeRetrievalRetryCount: 0, // This job has it's own retry mechanism | ||
| requestAndPathBuildTimeout: Network.defaultTimeout, | ||
| using: dependencies | ||
| ).send(using: dependencies) | ||
| } | ||
|
|
@@ -234,12 +281,12 @@ public enum MessageSendJob: JobExecutor { | |
| receiveCompletion: { result in | ||
| switch result { | ||
| case .finished: | ||
| Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).") | ||
| Log.info(.cat, "Completed sending \(updatedMessageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).") | ||
| dependencies.setAsync(.hasSentAMessage, true) | ||
| success(job, false) | ||
|
|
||
| case .failure(let error): | ||
| Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).") | ||
| Log.info(.cat, "Failed to send \(updatedMessageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).") | ||
|
|
||
| // Actual error handling | ||
| switch (error, details.message) { | ||
|
|
@@ -250,7 +297,7 @@ public enum MessageSendJob: JobExecutor { | |
| failure(job, error, true) | ||
|
|
||
| case (SnodeAPIError.clockOutOfSync, _): | ||
| Log.error(.cat, "\(originalSentTimestampMs != nil ? "Permanently Failing" : "Failing") to send \(messageType) (\(job.id ?? -1)) due to clock out of sync issue.") | ||
| Log.error(.cat, "\(originalSentTimestampMs != nil ? "Permanently Failing" : "Failing") to send \(updatedMessageType) (\(job.id ?? -1)) due to clock out of sync issue.") | ||
| failure(job, error, (originalSentTimestampMs != nil)) | ||
|
|
||
| // Don't bother retrying (it can just send a new one later but allowing retries | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ extension MessageSender { | |
| threadId: String, | ||
| threadVariant: SessionThread.Variant, | ||
| isSyncMessage: Bool = false, | ||
| messageResponse: MessageRequestResponse? = nil, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the below comment this function would probably need to take Not ideal but would support the more generic "multi-sending" approach (you could also add a function which takes an array in |
||
| using dependencies: Dependencies | ||
| ) throws { | ||
| // Only 'VisibleMessage' types can be sent via this method | ||
|
|
@@ -32,6 +33,7 @@ extension MessageSender { | |
| interactionId: interactionId, | ||
| to: try Message.Destination.from(db, threadId: threadId, threadVariant: threadVariant), | ||
| isSyncMessage: isSyncMessage, | ||
| messageResponse: messageResponse, | ||
| using: dependencies | ||
| ) | ||
| } | ||
|
|
@@ -64,6 +66,7 @@ extension MessageSender { | |
| interactionId: Int64?, | ||
| to destination: Message.Destination, | ||
| isSyncMessage: Bool = false, | ||
| messageResponse: MessageRequestResponse? = nil, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just thinking - maybe it would be cleaner to make this function take an array of |
||
| using dependencies: Dependencies | ||
| ) { | ||
| // If it's a sync message then we need to make some slight tweaks before sending so use the proper | ||
|
|
@@ -89,7 +92,8 @@ extension MessageSender { | |
| details: MessageSendJob.Details( | ||
| destination: destination, | ||
| message: message | ||
| ) | ||
| ), | ||
| transientData: messageResponse | ||
| ), | ||
| canStartJob: true | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking maybe it would be better to always return the
MessageRequestResponse/GroupUpdateInviteResponseMessageand have the caller decide how to send it - ie. thesendMessagefunction would do the/sequencecall, and in theacceptMessageRequest()function we would just call:This would allow us to simplify the logic in
approveMessageRequestIfNeededbecause it would just always return aMessageif one needed to be sent