Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 44 additions & 28 deletions Session/Conversations/ConversationVC+Interaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -721,20 +721,24 @@ extension ConversationVC:
)

// If this was a message request then approve it
approveMessageRequestIfNeeded(
// For 1-1 messages if approval is required `messageRequestResponse`
// `MessageRequestResponse` object will be returned
let messageRequestResponse = approveMessageRequestIfNeeded(
for: self.viewModel.threadData.threadId,
threadVariant: self.viewModel.threadData.threadVariant,
displayName: self.viewModel.threadData.displayName,
isDraft: (self.viewModel.threadData.threadIsDraft == true),
timestampMs: (sentTimestampMs - 1) // Set 1ms earlier as this is used for sorting
).sinkUntilComplete(
receiveCompletion: { [weak self] _ in
self?.sendMessage(optimisticData: optimisticData)
}
timestampMs: (sentTimestampMs - 1), // Set 1ms earlier as this is used for sorting
shouldSequenceMessageRequestResponse: true // Skips scheduling of `MessageRequestResponse` sending
)

sendMessage(
optimisticData: optimisticData,
messageRequestResponse: messageRequestResponse
)
}

private func sendMessage(optimisticData: ConversationViewModel.OptimisticMessageData) {
private func sendMessage(optimisticData: ConversationViewModel.OptimisticMessageData, messageRequestResponse: MessageRequestResponse? = nil) {
let threadId: String = self.viewModel.threadData.threadId
let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant

Expand Down Expand Up @@ -831,6 +835,7 @@ extension ConversationVC:
interaction: insertedInteraction,
threadId: threadId,
threadVariant: threadVariant,
messageResponse: messageRequestResponse,
using: dependencies
)
}
Expand Down Expand Up @@ -2948,13 +2953,15 @@ extension ConversationVC: UIDocumentInteractionControllerDelegate {
// MARK: - Message Request Actions

extension ConversationVC {
@discardableResult
fileprivate func approveMessageRequestIfNeeded(
for threadId: String,
threadVariant: SessionThread.Variant,
displayName: String,
isDraft: Bool,
timestampMs: Int64
) -> AnyPublisher<Void, Never> {
timestampMs: Int64,
shouldSequenceMessageRequestResponse: Bool = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking maybe it would be better to always return the MessageRequestResponse/GroupUpdateInviteResponseMessage and have the caller decide how to send it - ie. the sendMessage function would do the /sequence call, and in the acceptMessageRequest() function we would just call:

viewModel.dependencies[singleton: .storage].writeAsync { [dependencies = viewModel.dependencies] db in
            try MessageSender.send(
                db,
                message: message,
                interactionId: nil,
                threadId: self.viewModel.threadData.threadId,
                threadVariant: self.viewModel.threadData.threadVariant,
                using: dependencies
            )
        }

This would allow us to simplify the logic in approveMessageRequestIfNeeded because it would just always return a Message if one needed to be sent

) -> MessageRequestResponse? {
let updateNavigationBackStack: () -> Void = {
// Remove the 'SessionTableViewController<MessageRequestsViewModel>' from the nav hierarchy if present
DispatchQueue.main.async { [weak self] in
Expand Down Expand Up @@ -2982,9 +2989,14 @@ extension ConversationVC {
Contact.fetchOrCreate(db, id: threadId, using: dependencies)
}),
!contact.isApproved
else { return Just(()).eraseToAnyPublisher() }
else { return nil }

return viewModel.dependencies[singleton: .storage]
let messageRequestResponse = MessageRequestResponse(
isApproved: true,
sentTimestampMs: UInt64(timestampMs)
)

viewModel.dependencies[singleton: .storage]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are no longer returning a publisher, you should update both of these database queries so be synchronous (viewModel.dependencies[singleton: .storage].write { [dependencies = viewModel.dependencies] db in)

This would mostly be to avoid weird situations where something else in the code validates one of these changes has occurred before sending messages (I don't think there is a case like that at the moment, but one could be added in the future)

.writePublisher { [dependencies = viewModel.dependencies] db in
/// If this isn't a draft thread (ie. sending a message request) then send a `messageRequestResponse`
/// back to the sender (this allows the sender to know that they have been approved and can now use this
Expand All @@ -3002,17 +3014,16 @@ extension ConversationVC {
using: dependencies
).inserted(db)

try MessageSender.send(
db,
message: MessageRequestResponse(
isApproved: true,
sentTimestampMs: UInt64(timestampMs)
),
interactionId: nil,
threadId: threadId,
threadVariant: threadVariant,
using: dependencies
)
if !shouldSequenceMessageRequestResponse {
try MessageSender.send(
db,
message: messageRequestResponse,
interactionId: nil,
threadId: threadId,
threadVariant: threadVariant,
using: dependencies
)
}
}

// Default 'didApproveMe' to true for the person approving the message request
Expand All @@ -3038,7 +3049,9 @@ extension ConversationVC {
updateNavigationBackStack()
}
)
.eraseToAnyPublisher()
.sinkUntilComplete()

return !isDraft && shouldSequenceMessageRequestResponse ? messageRequestResponse : nil

case .group:
// If the group is not in the invited state then don't bother doing anything
Expand All @@ -3047,9 +3060,9 @@ extension ConversationVC {
try ClosedGroup.fetchOne(db, id: threadId)
}),
group.invited == true
else { return Just(()).eraseToAnyPublisher() }
else { return nil }

return viewModel.dependencies[singleton: .storage]
viewModel.dependencies[singleton: .storage]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably make the same change for the GroupUpdateInviteResponseMessage for consistency (I don't believe it has the same bug, but sending it as part of a /sequence is slightly more efficient and you already have the logic for it so it's not much more work) - will just need to update the function to return a generic Message instead of the MessageRequestResponse

.writePublisher { [dependencies = viewModel.dependencies] db in
/// Remove any existing `infoGroupInfoInvited` interactions from the group (don't want to have a
/// duplicate one from inside the group history)
Expand Down Expand Up @@ -3102,10 +3115,13 @@ extension ConversationVC {
updateNavigationBackStack()
}
)
.eraseToAnyPublisher()
.sinkUntilComplete()

default: return Just(()).eraseToAnyPublisher()
default:
return nil
}

return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this return nil as the code shouldn't be reachable due to the returns from the switch statement

}

func acceptMessageRequest() {
Expand All @@ -3115,7 +3131,7 @@ extension ConversationVC {
displayName: self.viewModel.threadData.displayName,
isDraft: (self.viewModel.threadData.threadIsDraft == true),
timestampMs: viewModel.dependencies[cache: .snodeAPI].currentOffsetTimestampMs()
).sinkUntilComplete()
)
}

func declineMessageRequest() {
Expand Down
71 changes: 59 additions & 12 deletions SessionMessagingKit/Jobs/MessageSendJob.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,36 @@ public enum MessageSendJob: JobExecutor {
default: break
}

let userSessionId: SessionId = dependencies[cache: .general].sessionId

// Convert and prepare the data for sending
let swarmPublicKey: String = {
switch details.destination {
case .contact(let publicKey): return publicKey
case .syncMessage: return userSessionId.hexString
case .closedGroup(let groupPublicKey): return groupPublicKey
case .openGroup, .openGroupInbox: preconditionFailure()
}
}()
Comment on lines +196 to +206
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicating logic from the MessageSender, would be better to consolidate this to avoid the duplication (we have something similar in Authentication) you could add the following to Message.Destination and then just use try details.destination.swarmPublicKey(using: dependencies) both here and in MessageSender:

func swarmPublicKey(using dependencies Dependencies) throws -> String {
    let userSessionId: SessionId = dependencies[cache: .general].sessionId

    switch self {
        case .contact(let publicKey): return publicKey
        case .syncMessage: return userSessionId.hexString
        case .closedGroup(let groupPublicKey): return groupPublicKey
        case .openGroup, .openGroupInbox: throw MessageSenderError.invalidDestination
    }
}

Note: You will need to add a new MessageSenderError.invalidDestination error


// Store the sentTimestamp from the message in case it fails due to a clockOutOfSync error
let originalSentTimestampMs: UInt64? = details.message.sentTimestampMs
let startTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970

// Update `messageType` value used for logging info
var updatedMessageType: String {
guard let messageRequestResponse = job.transientData as? MessageRequestResponse else {
return messageType
}
return "\(messageType) and \(type(of: messageRequestResponse))"
}

/// Perform the actual message sending - this will timeout if the entire process takes longer than `Network.defaultTimeout * 2`
/// which can occur if it needs to build a new onion path (which doesn't actually have any limits so can take forever in rare cases)
///
/// Adds sending for `MessageRequestResponse` along with actual message if `job.transientData` is a valid
/// `MessageRequestResponse` type
///
/// **Note:** No need to upload attachments as part of this process as the above logic splits that out into it's own job
/// so we shouldn't get here until attachments have already been uploaded
dependencies[singleton: .storage]
Expand All @@ -216,15 +239,39 @@ public enum MessageSendJob: JobExecutor {
using: dependencies
)
})
.tryFlatMap { authMethod in
try MessageSender.preparedSend(
message: details.message,
to: details.destination,
namespace: details.destination.defaultNamespace,
interactionId: job.interactionId,
attachments: messageAttachments,
authMethod: authMethod,
onEvent: MessageSender.standardEventHandling(using: dependencies),
.tryFlatMap { authMethod -> AnyPublisher<(ResponseInfoType, Network.BatchResponse), Error> in
return try Network.SnodeAPI.preparedSequence(
requests: []
.appending(try {
guard let messageRequestResponse = job.transientData as? MessageRequestResponse else { return nil }

return try MessageSender.preparedSend(
message: messageRequestResponse,
to: details.destination,
namespace: details.destination.defaultNamespace,
interactionId: nil,
attachments: nil,
authMethod: authMethod,
onEvent: MessageSender.standardEventHandling(using: dependencies),
using: dependencies
)
}())
.appending(contentsOf: [
MessageSender.preparedSend(
message: details.message,
to: details.destination,
namespace: details.destination.defaultNamespace,
interactionId: job.interactionId,
attachments: messageAttachments,
authMethod: authMethod,
onEvent: MessageSender.standardEventHandling(using: dependencies),
using: dependencies
)
]),
requireAllBatchResponses: true,
swarmPublicKey: swarmPublicKey,
snodeRetrievalRetryCount: 0, // This job has it's own retry mechanism
requestAndPathBuildTimeout: Network.defaultTimeout,
using: dependencies
).send(using: dependencies)
}
Expand All @@ -234,12 +281,12 @@ public enum MessageSendJob: JobExecutor {
receiveCompletion: { result in
switch result {
case .finished:
Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).")
Log.info(.cat, "Completed sending \(updatedMessageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).")
dependencies.setAsync(.hasSentAMessage, true)
success(job, false)

case .failure(let error):
Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).")
Log.info(.cat, "Failed to send \(updatedMessageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).")

// Actual error handling
switch (error, details.message) {
Expand All @@ -250,7 +297,7 @@ public enum MessageSendJob: JobExecutor {
failure(job, error, true)

case (SnodeAPIError.clockOutOfSync, _):
Log.error(.cat, "\(originalSentTimestampMs != nil ? "Permanently Failing" : "Failing") to send \(messageType) (\(job.id ?? -1)) due to clock out of sync issue.")
Log.error(.cat, "\(originalSentTimestampMs != nil ? "Permanently Failing" : "Failing") to send \(updatedMessageType) (\(job.id ?? -1)) due to clock out of sync issue.")
failure(job, error, (originalSentTimestampMs != nil))

// Don't bother retrying (it can just send a new one later but allowing retries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extension MessageSender {
threadId: String,
threadVariant: SessionThread.Variant,
isSyncMessage: Bool = false,
messageResponse: MessageRequestResponse? = nil,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the below comment this function would probably need to take additionalMessages: [(message: Message, interactionId: Int64?)]

Not ideal but would support the more generic "multi-sending" approach (you could also add a function which takes an array in Interaction but would still need the additionalMessages variant due to the MessageRequestResponse case 😞)

using dependencies: Dependencies
) throws {
// Only 'VisibleMessage' types can be sent via this method
Expand All @@ -32,6 +33,7 @@ extension MessageSender {
interactionId: interactionId,
to: try Message.Destination.from(db, threadId: threadId, threadVariant: threadVariant),
isSyncMessage: isSyncMessage,
messageResponse: messageResponse,
using: dependencies
)
}
Expand Down Expand Up @@ -64,6 +66,7 @@ extension MessageSender {
interactionId: Int64?,
to destination: Message.Destination,
isSyncMessage: Bool = false,
messageResponse: MessageRequestResponse? = nil,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking - maybe it would be cleaner to make this function take an array of [(message: Message, interactionId: Int64?)] instead of an explicit messageResponse parameter (would make it more reusable for sending multiple messages at once)?

using dependencies: Dependencies
) {
// If it's a sync message then we need to make some slight tweaks before sending so use the proper
Expand All @@ -89,7 +92,8 @@ extension MessageSender {
details: MessageSendJob.Details(
destination: destination,
message: message
)
),
transientData: messageResponse
),
canStartJob: true
)
Expand Down